refactor to source based playlist engine framework

This commit is contained in:
aj 2019-09-23 21:08:04 +01:00
parent 91ff0e1bce
commit c34eb4d8d9
7 changed files with 284 additions and 120 deletions

View File

@ -1,7 +1,6 @@
import requests
import os
import logging
import copy
from abc import ABC, abstractmethod
import spotframework.util.monthstrings as monthstrings
from spotframework.engine.processor.added import AddedSince
@ -18,129 +17,75 @@ from requests.models import Response
logger = logging.getLogger(__name__)
class SourceParameter:
def __init__(self,
source_type,
processors: List[AbstractProcessor] = None):
self.processors = processors if processors is not None else []
self.source_type = source_type
class PlaylistEngine:
def __init__(self, net: Network):
self.playlists = []
self.library_tracks = []
self.sources = []
self.net = net
def load_user_playlists(self) -> None:
logger.info('loading')
def init_default_sources(self):
self.sources = [PlaylistSource(self.net), RecommendationSource(self.net)]
playlists = self.net.get_playlists()
if playlists and len(playlists) > 0:
self.playlists = playlists
def check_for_source(self, class_type) -> bool:
source = next((i for i in self.sources if isinstance(i, class_type)), None)
if source:
return True
else:
logger.error('error getting playlists')
return False
def append_user_playlists(self) -> None:
logger.info('loading')
playlists = self.net.get_playlists()
if playlists and len(playlists) > 0:
self.playlists += playlists
else:
logger.error('error getting playlists')
def load_library_tracks(self, track_limit: int = None) -> None:
logger.info('loading')
if track_limit:
tracks = self.net.get_library_tracks(response_limit=track_limit)
else:
tracks = self.net.get_library_tracks()
if tracks and len(tracks) > 0:
self.library_tracks = tracks
def get_playlist_tracks(self,
playlist: SpotifyPlaylist) -> None:
logger.info(f"pulling tracks for {playlist.name}")
tracks = self.net.get_playlist_tracks(playlist.uri)
if tracks and len(tracks) > 0:
playlist.tracks = tracks
else:
logger.error('error getting tracks')
def load_playlist_tracks(self, name: str):
playlist = next((i for i in self.playlists if i.name == name), None)
if playlist is not None:
self.get_playlist_tracks(playlist)
else:
logger.error(f'playlist {name} not found')
def get_source(self, class_type):
return next((i for i in self.sources if isinstance(i, class_type)), None)
def make_playlist(self,
playlist_parts: List[str],
processors: List[AbstractProcessor] = None,
include_recommendations: bool = False,
recommendation_limit: int = 10,
include_library_tracks: bool = False,
library_processors: List[AbstractProcessor] = None) -> List[SpotifyTrack]:
if processors is None:
processors = []
if library_processors is None:
library_processors = []
params: List[SourceParameter],
processors: List[AbstractProcessor] = None) -> List[SpotifyTrack]:
tracks = []
for part in playlist_parts:
play = next((i for i in self.playlists if i.name == part), None)
if play is not None:
if play.has_tracks() is False:
self.get_playlist_tracks(play)
playlist_tracks = copy.deepcopy(play.tracks)
for processor in [i for i in processors if i.has_targets()]:
if play.name in [i for i in processor.playlist_names]:
playlist_tracks = processor.process(playlist_tracks)
tracks += [i for i in playlist_tracks if i.is_local is False]
for param in params:
source = next((i for i in self.sources if isinstance(i, param.source_type)), None)
if source:
if source.loaded is False:
source.load()
if isinstance(source, RecommendationSource) and isinstance(param, RecommendationSource.Params):
tracks += source.process(params=param, uris=[i.uri for i in tracks])
else:
logger.warning(f"requested playlist {part} not found")
if 'SLACKHOOK' in os.environ:
requests.post(os.environ['SLACKHOOK'], json={"text": f"spot playlists: {part} not found"})
tracks += source.process(params=param)
else:
new_source = param.source_type(net=self.net)
new_source.load()
self.sources.append(new_source)
for processor in [i for i in processors if i.has_targets() is False]:
if isinstance(new_source, RecommendationSource) and isinstance(param, RecommendationSource.Params):
tracks += new_source.process(params=param, uris=[i.uri for i in tracks])
else:
tracks += new_source.process(params=param)
logger.info(f'adding {str(param.source_type)} source')
if processors:
for processor in processors:
tracks = processor.process(tracks)
if include_library_tracks:
library_tracks = copy.deepcopy(self.library_tracks)
for processor in library_processors:
library_tracks = processor.process(library_tracks)
tracks += library_tracks
if include_recommendations:
recommendations = self.net.get_recommendations(tracks=[i.uri.object_id for i in tracks],
response_limit=recommendation_limit)
if recommendations and len(recommendations) > 0:
tracks += recommendations
else:
logger.error('error getting recommendations')
return tracks
def get_recent_playlist(self,
params: List[SourceParameter],
boundary_date: datetime,
recent_playlist_parts: List[str],
processors: List[AbstractProcessor] = None,
include_recommendations: bool = False,
recommendation_limit: int = 10,
add_this_month: bool = False,
add_last_month: bool = False) -> List[SpotifyTrack]:
if processors is None:
processors = []
this_month = monthstrings.get_this_month()
last_month = monthstrings.get_last_month()
@ -152,14 +97,12 @@ class PlaylistEngine:
if add_last_month:
month_playlists.append(last_month)
datefilter = AddedSince(boundary_date, recent_playlist_parts + month_playlists)
if PlaylistSource in [i.source_type for i in params]:
processors.append(datefilter)
param = next((i for i in params if i.source_type == PlaylistSource), None)
param.names += month_playlists
return self.make_playlist(recent_playlist_parts + month_playlists,
processors,
include_recommendations=include_recommendations,
recommendation_limit=recommendation_limit)
return self.make_playlist(params=params, processors=processors + [AddedSince(boundary_date)])
def reorder_playlist_by_added_date(self,
name: str = None,
@ -169,10 +112,20 @@ class PlaylistEngine:
logger.error('no playlist name or id provided')
raise ValueError('no playlist name or id provided')
if name:
playlist = next((i for i in self.playlists if i.name == name), None)
playlist_source = self.get_source(PlaylistSource)
if playlist_source:
if playlist_source.loaded is False:
playlist_source.load()
else:
playlist = next((i for i in self.playlists if i.uri == uri), None)
playlist_source = PlaylistSource(self.net)
playlist_source.load()
self.sources.append(playlist_source)
if name:
playlist = next((i for i in playlist_source.playlists if i.name == name), None)
else:
playlist = next((i for i in playlist_source.playlists if i.uri == uri), None)
if playlist is None:
logger.error('playlist not found')
@ -225,3 +178,191 @@ class PlaylistEngine:
return resp
else:
logger.error('error changing description')
class TrackSource(ABC):
def __init__(self, net: Network):
self.net = net
self.loaded = False
@abstractmethod
def load(self) -> None:
self.loaded = True
@abstractmethod
def process(self, params: SourceParameter) -> List[SpotifyTrack]:
pass
class PlaylistSource(TrackSource):
class Params(SourceParameter):
def __init__(self,
names: List[str] = None,
uris: List[Uri] = None,
processors: List[AbstractProcessor] = None):
self.names = names if names is not None else []
self.uris = uris if uris is not None else []
super().__init__(processors=processors, source_type=PlaylistSource)
def __init__(self,
net: Network):
self.playlists = []
super().__init__(net)
def append_user_playlists(self) -> None:
logger.info('appending user playlists')
playlists = self.net.get_playlists()
if playlists and len(playlists) > 0:
self.playlists += playlists
else:
logger.error('error getting playlists')
def get_playlist_tracks(self,
playlist: SpotifyPlaylist) -> None:
logger.info(f"pulling tracks for {playlist.name}")
tracks = self.net.get_playlist_tracks(playlist.uri)
if tracks and len(tracks) > 0:
playlist.tracks = tracks
else:
logger.error('error getting tracks')
def load(self) -> None:
logger.info('loading user playlists')
playlists = self.net.get_playlists()
if playlists and len(playlists) > 0:
self.playlists = playlists
else:
logger.error('error getting playlists')
super().load()
def process(self, params: Params) -> List[SpotifyTrack]:
playlists = []
for name in params.names:
playlist = next((i for i in self.playlists if i.name == name), None)
if playlist is not None:
playlists.append(playlist)
else:
logger.warning(f'could not find playlist {name}')
for uri in params.uris:
playlist = next((i for i in self.playlists if i.uri == uri), None)
if playlist:
playlists.append(playlist)
else:
playlist = self.net.get_playlist(uri)
if playlist:
playlists.append(playlist)
self.playlists.append(playlist)
else:
logger.warning(f'could not find playlist {uri}')
tracks = []
for playlist in playlists:
if playlist.has_tracks() is False:
self.get_playlist_tracks(playlist)
playlist_tracks = copy.deepcopy(playlist.tracks)
for processor in [i for i in params.processors if i.has_targets()]:
if playlist.name in [i for i in processor.playlist_names]\
or playlist.uri in [i for i in processor.playlist_uris]:
playlist_tracks = processor.process(playlist_tracks)
tracks += [i for i in playlist_tracks if i.is_local is False]
for processor in [i for i in params.processors if i.has_targets() is False]:
tracks = processor.process(tracks)
return tracks
class LibraryTrackSource(TrackSource):
class Params(SourceParameter):
def __init__(self, processors: List[AbstractProcessor] = None):
super().__init__(processors=processors, source_type=LibraryTrackSource)
def __init__(self,
net: Network):
self.tracks = []
super().__init__(net)
def load(self) -> None:
logger.info('loading library tracks')
tracks = self.net.get_library_tracks()
if tracks and len(tracks) > 0:
self.tracks = tracks
else:
logger.error('error getting tracks')
super().load()
def process(self, params: SourceParameter) -> List[SpotifyTrack]:
tracks = copy.deepcopy(self.tracks)
for index, track in enumerate(tracks):
for processor in [i for i in params.processors if i.playlist_uris]:
if track.uri in [i for i in processor.playlist_uris]:
new_track = processor.process([track])
if new_track and len(new_track) > 0:
tracks[index] = new_track[0]
else:
tracks[index] = None
tracks = [i for i in tracks if i is not None]
for processor in [i for i in params.processors if i.has_targets() is False]:
tracks = processor.process(tracks)
return tracks
class RecommendationSource(TrackSource):
class Params(SourceParameter):
def __init__(self,
uris: List[Uri] = None,
recommendation_limit: int = None,
processors: List[AbstractProcessor] = None):
self.uris = uris
self.recommendation_limit = recommendation_limit if recommendation_limit is not None else 10
super().__init__(processors=processors, source_type=RecommendationSource)
def load(self):
super().load()
def process(self, params: Params, uris: List[Uri] = None):
query_uris = []
if params.uris is not None:
query_uris += params.uris
if uris is not None:
query_uris += uris
if len(query_uris) > 0:
recommendations = self.net.get_recommendations(tracks=[i.object_id for i in query_uris
if i.object_type == Uri.ObjectType.track],
response_limit=params.recommendation_limit)
if recommendations and len(recommendations) > 0:
pass
else:
logger.error('error getting recommendations')
return recommendations
else:
logger.error('no uris to get recommendations for')

View File

@ -1,15 +1,19 @@
from abc import ABC, abstractmethod
from typing import List
from spotframework.model.track import Track
from spotframework.model.uri import Uri
class AbstractProcessor(ABC):
def __init__(self, names: List[str] = None):
def __init__(self,
names: List[str] = None,
uris: List[Uri] = None):
self.playlist_names = names
self.playlist_uris = uris
def has_targets(self) -> bool:
if self.playlist_names:
if self.playlist_names or self.playlist_uris:
return True
else:
return False
@ -36,10 +40,14 @@ class BatchSingleTypeAwareProcessor(BatchSingleProcessor, ABC):
def __init__(self,
names: List[str] = None,
uris: List[Uri] = None,
instance_check=None,
append_malformed: bool = True):
super().__init__(names)
super().__init__(names, uris)
if isinstance(instance_check, list):
self.instance_check = instance_check
else:
self.instance_check = [instance_check]
self.append_malformed = append_malformed
def process(self, tracks: List[Track]) -> List[Track]:
@ -50,7 +58,7 @@ class BatchSingleTypeAwareProcessor(BatchSingleProcessor, ABC):
for track in tracks:
if isinstance(track, self.instance_check):
if any(isinstance(track, i) for i in self.instance_check):
return_tracks.append(track)
else:
malformed_tracks.append(track)

View File

@ -1,7 +1,8 @@
from .abstract import BatchSingleTypeAwareProcessor
import datetime
from typing import List
from spotframework.model.track import PlaylistTrack
from spotframework.model.track import PlaylistTrack, LibraryTrack
from spotframework.model.uri import Uri
from typing import Optional
@ -10,9 +11,11 @@ class Added(BatchSingleTypeAwareProcessor):
def __init__(self,
boundary: datetime.datetime,
names: List[str] = None,
uris: List[Uri] = None,
append_malformed: bool = True):
super().__init__(names,
instance_check=PlaylistTrack,
super().__init__(names=names,
uris=uris,
instance_check=[PlaylistTrack, LibraryTrack],
append_malformed=append_malformed)
self.boundary = boundary

View File

@ -1,14 +1,17 @@
from spotframework.engine.processor.abstract import BatchSingleProcessor, BatchSingleTypeAwareProcessor
from typing import List
from spotframework.model.track import Track, SpotifyTrack
from spotframework.model.uri import Uri
class DeduplicateByID(BatchSingleTypeAwareProcessor):
def __init__(self,
names: List[str] = None,
uris: List[Uri] = None,
append_malformed: bool = True):
super().__init__(names,
super().__init__(names=names,
uris=uris,
instance_check=SpotifyTrack,
append_malformed=append_malformed)

View File

@ -1,15 +1,18 @@
from spotframework.engine.processor.abstract import BatchSingleTypeAwareProcessor
from typing import List
from spotframework.model.track import SpotifyTrack
from spotframework.model.uri import Uri
class SortPopularity(BatchSingleTypeAwareProcessor):
def __init__(self,
names: List[str] = None,
uris: List[Uri] = None,
append_malformed: bool = True,
reverse: bool = False):
super().__init__(names,
super().__init__(names=names,
uris=uris,
instance_check=SpotifyTrack,
append_malformed=append_malformed)
self.reverse = reverse

View File

@ -2,6 +2,7 @@ from .abstract import AbstractProcessor
import random
from typing import List
from spotframework.model.track import Track
from spotframework.model.uri import Uri
class Shuffle(AbstractProcessor):
@ -15,8 +16,9 @@ class RandomSample(Shuffle):
def __init__(self,
sample_size: int,
names: List[str] = None):
super().__init__(names)
names: List[str] = None,
uris: List[Uri] = None):
super().__init__(names=names, uris=uris)
self.sample_size = sample_size
def process(self, tracks: List[Track]) -> List[Track]:

View File

@ -2,13 +2,15 @@ from abc import ABC
from .abstract import AbstractProcessor, BatchSingleTypeAwareProcessor
from typing import List
from spotframework.model.track import Track, PlaylistTrack
from spotframework.model.uri import Uri
class BasicReversibleSort(AbstractProcessor, ABC):
def __init__(self,
names: List[str] = None,
uris: List[Uri] = None,
reverse: bool = False):
super().__init__(names)
super().__init__(names=names, uris=uris)
self.reverse = reverse
@ -30,9 +32,11 @@ class SortAddedDate(BatchSingleTypeAwareProcessor):
def __init__(self,
names: List[str] = None,
uris: List[Uri] = None,
reverse: bool = False,
append_malformed: bool = True):
super().__init__(names=names,
uris=uris,
instance_check=PlaylistTrack,
append_malformed=append_malformed)
self.reverse = reverse