Redid model to reflect service, using dataclasses

This commit is contained in:
aj 2020-06-21 15:28:29 +01:00
parent 144f198424
commit dd802a0d8f
21 changed files with 681 additions and 1162 deletions

View File

@ -6,8 +6,8 @@ import spotframework.util.monthstrings as monthstrings
from spotframework.engine.processor.added import AddedSince from spotframework.engine.processor.added import AddedSince
from typing import List, Optional from typing import List, Optional
from spotframework.model.track import SpotifyTrack from spotframework.model.track import TrackFull
from spotframework.model.playlist import SpotifyPlaylist from spotframework.model.playlist import FullPlaylist
from spotframework.model.uri import Uri from spotframework.model.uri import Uri
from spotframework.net.network import Network from spotframework.net.network import Network
from spotframework.engine.processor.abstract import AbstractProcessor from spotframework.engine.processor.abstract import AbstractProcessor
@ -47,7 +47,7 @@ class PlaylistEngine:
def make_playlist(self, def make_playlist(self,
params: List[SourceParameter], params: List[SourceParameter],
processors: List[AbstractProcessor] = None) -> List[SpotifyTrack]: processors: List[AbstractProcessor] = None) -> List[TrackFull]:
tracks = [] tracks = []
@ -77,7 +77,7 @@ class PlaylistEngine:
boundary_date: datetime, boundary_date: datetime,
processors: List[AbstractProcessor] = None, processors: List[AbstractProcessor] = None,
add_this_month: bool = False, add_this_month: bool = False,
add_last_month: bool = False) -> List[SpotifyTrack]: add_last_month: bool = False) -> List[TrackFull]:
if processors is None: if processors is None:
processors = [] processors = []
@ -151,7 +151,7 @@ class PlaylistEngine:
tracks_to_sort.remove(counter_track) tracks_to_sort.remove(counter_track)
def execute_playlist(self, def execute_playlist(self,
tracks: List[SpotifyTrack], tracks: List[TrackFull],
uri: Uri) -> Optional[Response]: uri: Uri) -> Optional[Response]:
resp = self.net.replace_playlist_tracks(uri=uri, uris=[i.uri for i in tracks]) resp = self.net.replace_playlist_tracks(uri=uri, uris=[i.uri for i in tracks])
@ -197,7 +197,7 @@ class TrackSource(ABC):
self.loaded = True self.loaded = True
@abstractmethod @abstractmethod
def process(self, params: SourceParameter) -> List[SpotifyTrack]: def process(self, params: SourceParameter) -> List[TrackFull]:
pass pass
@ -227,7 +227,7 @@ class PlaylistSource(TrackSource):
logger.error('error getting playlists') logger.error('error getting playlists')
def get_playlist_tracks(self, def get_playlist_tracks(self,
playlist: SpotifyPlaylist) -> None: playlist: FullPlaylist) -> None:
logger.info(f"pulling tracks for {playlist.name}") logger.info(f"pulling tracks for {playlist.name}")
tracks = self.net.get_playlist_tracks(playlist.uri) tracks = self.net.get_playlist_tracks(playlist.uri)
@ -247,7 +247,7 @@ class PlaylistSource(TrackSource):
super().load() super().load()
def process(self, params: Params) -> List[SpotifyTrack]: def process(self, params: Params) -> List[TrackFull]:
playlists = [] playlists = []
@ -313,7 +313,7 @@ class LibraryTrackSource(TrackSource):
super().load() super().load()
def process(self, params: SourceParameter) -> List[SpotifyTrack]: def process(self, params: SourceParameter) -> List[TrackFull]:
tracks = copy.deepcopy(self.tracks) tracks = copy.deepcopy(self.tracks)

View File

@ -1,6 +1,6 @@
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import List from typing import List
from spotframework.model.track import Track from spotframework.model.track import SimplifiedTrack
from spotframework.model.uri import Uri from spotframework.model.uri import Uri
@ -19,20 +19,20 @@ class AbstractProcessor(ABC):
return False return False
@abstractmethod @abstractmethod
def process(self, tracks: List[Track]) -> List[Track]: def process(self, tracks: List[SimplifiedTrack]) -> List[SimplifiedTrack]:
pass pass
class BatchSingleProcessor(AbstractProcessor, ABC): class BatchSingleProcessor(AbstractProcessor, ABC):
@staticmethod @staticmethod
def process_single(track: Track) -> Track: def process_single(track: SimplifiedTrack) -> SimplifiedTrack:
return track return track
def process_batch(self, tracks: List[Track]) -> List[Track]: def process_batch(self, tracks: List[SimplifiedTrack]) -> List[SimplifiedTrack]:
return [self.process_single(track) for track in tracks] return [self.process_single(track) for track in tracks]
def process(self, tracks: List[Track]) -> List[Track]: def process(self, tracks: List[SimplifiedTrack]) -> List[SimplifiedTrack]:
return [i for i in self.process_batch(tracks) if i is not None] return [i for i in self.process_batch(tracks) if i is not None]
@ -50,7 +50,7 @@ class BatchSingleTypeAwareProcessor(BatchSingleProcessor, ABC):
self.instance_check = [instance_check] self.instance_check = [instance_check]
self.append_malformed = append_malformed self.append_malformed = append_malformed
def process(self, tracks: List[Track]) -> List[Track]: def process(self, tracks: List[SimplifiedTrack]) -> List[SimplifiedTrack]:
if self.instance_check: if self.instance_check:
return_tracks = [] return_tracks = []

View File

@ -1,7 +1,7 @@
from spotframework.engine.processor.abstract import BatchSingleProcessor from spotframework.engine.processor.abstract import BatchSingleProcessor
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import List from typing import List
from spotframework.model.track import SpotifyTrack from spotframework.model.track import TrackFull
from spotframework.model.uri import Uri from spotframework.model.uri import Uri
@ -15,14 +15,14 @@ class AudioFeaturesProcessor(BatchSingleProcessor, ABC):
uris=uris) uris=uris)
self.append_malformed = append_malformed self.append_malformed = append_malformed
def process(self, tracks: List[SpotifyTrack]) -> List[SpotifyTrack]: def process(self, tracks: List[TrackFull]) -> List[TrackFull]:
return_tracks = [] return_tracks = []
malformed_tracks = [] malformed_tracks = []
for track in tracks: for track in tracks:
if isinstance(track, SpotifyTrack) and track.audio_features is not None: if isinstance(track, TrackFull) and track.audio_features is not None:
return_tracks.append(track) return_tracks.append(track)
else: else:
malformed_tracks.append(track) malformed_tracks.append(track)
@ -50,10 +50,10 @@ class FloatFilter(AudioFeaturesProcessor, ABC):
self.greater_than = greater_than self.greater_than = greater_than
@abstractmethod @abstractmethod
def get_variable_value(self, track: SpotifyTrack) -> float: def get_variable_value(self, track: TrackFull) -> float:
pass pass
def process_single(self, track: SpotifyTrack): def process_single(self, track: TrackFull):
if self.greater_than: if self.greater_than:
if self.get_variable_value(track) > self.boundary: if self.get_variable_value(track) > self.boundary:
return track return track
@ -67,25 +67,25 @@ class FloatFilter(AudioFeaturesProcessor, ABC):
class EnergyFilter(FloatFilter): class EnergyFilter(FloatFilter):
def get_variable_value(self, track: SpotifyTrack) -> float: def get_variable_value(self, track: TrackFull) -> float:
return track.audio_features.energy return track.audio_features.energy
class ValenceFilter(FloatFilter): class ValenceFilter(FloatFilter):
def get_variable_value(self, track: SpotifyTrack) -> float: def get_variable_value(self, track: TrackFull) -> float:
return track.audio_features.valence return track.audio_features.valence
class TempoFilter(FloatFilter): class TempoFilter(FloatFilter):
def get_variable_value(self, track: SpotifyTrack) -> float: def get_variable_value(self, track: TrackFull) -> float:
return track.audio_features.tempo return track.audio_features.tempo
class DanceabilityFilter(FloatFilter): class DanceabilityFilter(FloatFilter):
def get_variable_value(self, track: SpotifyTrack) -> float: def get_variable_value(self, track: TrackFull) -> float:
return track.audio_features.danceability return track.audio_features.danceability
class AcousticnessFilter(FloatFilter): class AcousticnessFilter(FloatFilter):
def get_variable_value(self, track: SpotifyTrack) -> float: def get_variable_value(self, track: TrackFull) -> float:
return track.audio_features.acousticness return track.audio_features.acousticness

View File

@ -1,7 +1,7 @@
from spotframework.engine.processor.abstract import BatchSingleProcessor, BatchSingleTypeAwareProcessor from spotframework.engine.processor.abstract import BatchSingleProcessor, BatchSingleTypeAwareProcessor
from typing import List from typing import List
import logging import logging
from spotframework.model.track import Track, SpotifyTrack from spotframework.model.track import SimplifiedTrack, TrackFull
from spotframework.model.uri import Uri from spotframework.model.uri import Uri
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -15,10 +15,10 @@ class DeduplicateByID(BatchSingleTypeAwareProcessor):
append_malformed: bool = True): append_malformed: bool = True):
super().__init__(names=names, super().__init__(names=names,
uris=uris, uris=uris,
instance_check=SpotifyTrack, instance_check=TrackFull,
append_malformed=append_malformed) append_malformed=append_malformed)
def process_batch(self, tracks: List[SpotifyTrack]) -> List[SpotifyTrack]: def process_batch(self, tracks: List[TrackFull]) -> List[TrackFull]:
return_tracks = [] return_tracks = []
for track in tracks: for track in tracks:
@ -30,7 +30,7 @@ class DeduplicateByID(BatchSingleTypeAwareProcessor):
class DeduplicateByName(BatchSingleProcessor): class DeduplicateByName(BatchSingleProcessor):
def process_batch(self, tracks: List[Track]) -> List[Track]: def process_batch(self, tracks: List[SimplifiedTrack]) -> List[SimplifiedTrack]:
return_tracks = [] return_tracks = []
for to_check in tracks: for to_check in tracks:

View File

@ -1,6 +1,6 @@
from spotframework.engine.processor.abstract import BatchSingleTypeAwareProcessor from spotframework.engine.processor.abstract import BatchSingleTypeAwareProcessor
from typing import List from typing import List
from spotframework.model.track import SpotifyTrack from spotframework.model.track import TrackFull
from spotframework.model.uri import Uri from spotframework.model.uri import Uri
@ -13,10 +13,10 @@ class SortPopularity(BatchSingleTypeAwareProcessor):
reverse: bool = False): reverse: bool = False):
super().__init__(names=names, super().__init__(names=names,
uris=uris, uris=uris,
instance_check=SpotifyTrack, instance_check=TrackFull,
append_malformed=append_malformed) append_malformed=append_malformed)
self.reverse = reverse self.reverse = reverse
def process_batch(self, tracks: List[SpotifyTrack]) -> List[SpotifyTrack]: def process_batch(self, tracks: List[TrackFull]) -> List[TrackFull]:
tracks.sort(key=lambda x: x.popularity, reverse=self.reverse) tracks.sort(key=lambda x: x.popularity, reverse=self.reverse)
return tracks return tracks

View File

@ -1,13 +1,13 @@
from .abstract import AbstractProcessor from .abstract import AbstractProcessor
import random import random
from typing import List from typing import List
from spotframework.model.track import Track from spotframework.model.track import SimplifiedTrack
from spotframework.model.uri import Uri from spotframework.model.uri import Uri
class Shuffle(AbstractProcessor): class Shuffle(AbstractProcessor):
def process(self, tracks: List[Track]) -> List[Track]: def process(self, tracks: List[SimplifiedTrack]) -> List[SimplifiedTrack]:
random.shuffle(tracks) random.shuffle(tracks)
return tracks return tracks
@ -21,5 +21,5 @@ class RandomSample(Shuffle):
super().__init__(names=names, uris=uris) super().__init__(names=names, uris=uris)
self.sample_size = sample_size self.sample_size = sample_size
def process(self, tracks: List[Track]) -> List[Track]: def process(self, tracks: List[SimplifiedTrack]) -> List[SimplifiedTrack]:
return super().process(tracks)[:self.sample_size] return super().process(tracks)[:self.sample_size]

View File

@ -1,7 +1,7 @@
from abc import ABC from abc import ABC
from .abstract import AbstractProcessor, BatchSingleTypeAwareProcessor from .abstract import AbstractProcessor, BatchSingleTypeAwareProcessor
from typing import List from typing import List
from spotframework.model.track import Track, PlaylistTrack from spotframework.model.track import SimplifiedTrack, PlaylistTrack
from spotframework.model.uri import Uri from spotframework.model.uri import Uri
@ -16,7 +16,7 @@ class BasicReversibleSort(AbstractProcessor, ABC):
class SortReleaseDate(BasicReversibleSort): class SortReleaseDate(BasicReversibleSort):
def process(self, tracks: List[Track]) -> List[Track]: def process(self, tracks: List[SimplifiedTrack]) -> List[SimplifiedTrack]:
tracks.sort(key=lambda x: (x.artists[0].name.lower(), tracks.sort(key=lambda x: (x.artists[0].name.lower(),
x.album.name.lower(), x.album.name.lower(),
x.track_number)) x.track_number))
@ -26,7 +26,7 @@ class SortReleaseDate(BasicReversibleSort):
class SortArtistName(BasicReversibleSort): class SortArtistName(BasicReversibleSort):
def process(self, tracks: List[Track]) -> List[Track]: def process(self, tracks: List[SimplifiedTrack]) -> List[SimplifiedTrack]:
tracks.sort(key=lambda x: (x.album.name.lower(), tracks.sort(key=lambda x: (x.album.name.lower(),
x.track_number)) x.track_number))
tracks.sort(key=lambda x: x.artists[0].name.lower(), reverse=self.reverse) tracks.sort(key=lambda x: x.artists[0].name.lower(), reverse=self.reverse)

View File

@ -1,4 +1,8 @@
from typing import List from typing import List
import logging
from spotframework.model.track import SimplifiedTrack, LibraryTrack, PlayedTrack, PlaylistTrack
logger = logging.getLogger(__name__)
def remove_local(tracks: List, include_malformed=True) -> List: def remove_local(tracks: List, include_malformed=True) -> List:
@ -14,3 +18,20 @@ def remove_local(tracks: List, include_malformed=True) -> List:
return_tracks.append(track) return_tracks.append(track)
return return_tracks return return_tracks
def get_track_objects(tracks: List) -> (List, List):
inner_tracks = []
whole_tracks = []
for track in tracks:
if isinstance(track, SimplifiedTrack):
inner_tracks.append(track)
whole_tracks.append(track)
elif isinstance(track, (PlaylistTrack, PlayedTrack, LibraryTrack)):
inner_tracks.append(track.track)
whole_tracks.append(track)
else:
logger.warning(f'invalid type found for {track} ({type(track)}), discarding')
return inner_tracks, whole_tracks

View File

@ -1,9 +1,9 @@
import logging import logging
from typing import List from typing import List
from spotframework.model.track import SpotifyTrack from spotframework.model.track import TrackFull
from spotframework.model.album import SpotifyAlbum
from spotframework.model.uri import Uri from spotframework.model.uri import Uri
from spotframework.filter import get_track_objects
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -12,13 +12,13 @@ def deduplicate_by_id(tracks: List, include_malformed=True) -> List:
prop = 'uri' prop = 'uri'
return_tracks = [] return_tracks = []
for track in tracks: for inner_track, whole_track in zip(*get_track_objects(tracks)):
if hasattr(track, prop) and isinstance(getattr(track, prop), Uri): if hasattr(inner_track, prop) and isinstance(getattr(inner_track, prop), Uri):
if getattr(track, prop) not in [getattr(i, prop) for i in return_tracks]: if getattr(inner_track, prop) not in [getattr(i, prop) for i in return_tracks]:
return_tracks.append(track) return_tracks.append(whole_track)
else: else:
if include_malformed: if include_malformed:
return_tracks.append(track) return_tracks.append(whole_track)
return return_tracks return return_tracks
@ -26,35 +26,27 @@ def deduplicate_by_id(tracks: List, include_malformed=True) -> List:
def deduplicate_by_name(tracks: List, include_malformed=True) -> List: def deduplicate_by_name(tracks: List, include_malformed=True) -> List:
return_tracks = [] return_tracks = []
for track in tracks: for inner_track, whole_track in zip(*get_track_objects(tracks)):
if isinstance(track, SpotifyTrack): if isinstance(inner_track, TrackFull):
to_check_artists = [i.name.lower() for i in track.artists] to_check_artists = [i.name.lower() for i in inner_track.artists]
for index, _track in enumerate(return_tracks): for index, (_inner_track, _whole_track) in enumerate(zip(*get_track_objects(return_tracks))):
if track.name.lower() == _track.name.lower(): if inner_track.name.lower() == _inner_track.name.lower():
_track_artists = [i.name.lower() for i in _track.artists] _track_artists = [i.name.lower() for i in _inner_track.artists]
if all((i in _track_artists for i in to_check_artists)): # CHECK ARTISTS MATCH if all((i in _track_artists for i in to_check_artists)): # CHECK ARTISTS MATCH
if not isinstance(track.album, SpotifyAlbum):
logger.warning(f'{track.name} album not of type SpotifyAlbum')
continue
if not isinstance(_track.album, SpotifyAlbum):
logger.warning(f'{_track.name} album not of type SpotifyAlbum')
continue
# CHECK ALBUM TYPE, PREFER ALBUMS OVER SINGLES ETC # CHECK ALBUM TYPE, PREFER ALBUMS OVER SINGLES ETC
if track.album.album_type.value > _track.album.album_type.value: if inner_track.album.album_type.value > _inner_track.album.album_type.value:
logger.debug(f'better track source found, {track} ({track.album.album_type}) ' logger.debug(f'better track source found, {inner_track} ({inner_track.album.album_type}) '
f'> {_track} ({_track.album.album_type})') f'> {_inner_track} ({_inner_track.album.album_type})')
return_tracks[index] = track # REPLACE return_tracks[index] = whole_track # REPLACE
break # FOUND, ESCAPE break # FOUND, ESCAPE
else: else:
return_tracks.append(track) # NOT FOUND, ADD TO RETURN return_tracks.append(whole_track) # NOT FOUND, ADD TO RETURN
else: else:
if include_malformed: if include_malformed:
return_tracks.append(track) return_tracks.append(whole_track)
return return_tracks return return_tracks

View File

@ -1,47 +1,50 @@
import logging import logging
from typing import List from typing import List
from datetime import datetime
from spotframework.model.track import Track from spotframework.model.album import SimplifiedAlbum
from spotframework.filter import get_track_objects
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def sort_by_popularity(tracks: List, reverse: bool = False, include_malformed=False) -> List: def sort_by_popularity(tracks: List, reverse: bool = False) -> List:
prop = 'popularity' prop = 'popularity'
return [j for i, j
return_tracks = sorted([i for i in tracks if hasattr(i, prop) and isinstance(getattr(i, prop), int)], in sorted([(k, l) for k, l in zip(*get_track_objects(tracks))
key=lambda x: x.popularity, reverse=reverse) if hasattr(k, prop) and isinstance(getattr(k, prop), int)],
key=lambda x: x[0].popularity, reverse=reverse
if include_malformed: )
return_tracks += [i for i in tracks ]
if not hasattr(i, prop)
or (hasattr(i, prop) and not isinstance(getattr(i, prop), int))]
return return_tracks
def sort_by_release_date(tracks: List, reverse: bool = False) -> List: def sort_by_release_date(tracks: List, reverse: bool = False) -> List:
return sorted(sort_artist_album_track_number(tracks), return [j for i, j
key=lambda x: x.album.release_date, reverse=reverse) in sorted([(k, l) for k, l in sort_artist_album_track_number(tracks, inner_tracks_only=False)
if hasattr(k, 'album') and isinstance(getattr(k, 'album'), SimplifiedAlbum)],
key=lambda x: x[0].album.release_date, reverse=reverse
def sort_by_artist_name(tracks: List, reverse: bool = False) -> List: )
return_tracks = sorted([i for i in tracks if isinstance(i, Track)], ]
key=lambda x: (x.album.name.lower(),
x.track_number))
return_tracks.sort(key=lambda x: x.artists[0].name.lower(), reverse=reverse)
return return_tracks
def sort_by_added_date(tracks: List, reverse: bool = False) -> List: def sort_by_added_date(tracks: List, reverse: bool = False) -> List:
return sorted(sort_artist_album_track_number(tracks), return [j for i, j
key=lambda x: x.added_at, in sorted([(k, l) for k, l in sort_artist_album_track_number(tracks, inner_tracks_only=False)
reverse=reverse) if hasattr(l, 'added_at') and isinstance(getattr(l, 'added_at'), datetime)],
key=lambda x: x[1].added_at,
reverse=reverse
)
]
def sort_artist_album_track_number(tracks: List) -> List: def sort_artist_album_track_number(tracks: List, inner_tracks_only: bool = False) -> List:
return sorted([i for i in tracks if isinstance(i, Track)], sorted_tracks = sorted([(i, w) for i, w in zip(*get_track_objects(tracks))
key=lambda x: (x.artists[0].name.lower(), if hasattr(i, 'album') and isinstance(getattr(i, 'album'), SimplifiedAlbum)],
x.album.name.lower(), key=lambda x: (x[0].artists[0].name.lower(),
x.track_number)) x[0].album.name.lower(),
x[0].track_number))
if inner_tracks_only:
return [i for i, w in sorted_tracks]
return sorted_tracks

View File

@ -1,4 +1,4 @@
from spotframework.model.service import CurrentlyPlaying from spotframework.model.track import CurrentlyPlaying
from spotframework.net.network import Network from spotframework.net.network import Network
from typing import Optional from typing import Optional

View File

@ -1,23 +1,60 @@
from __future__ import annotations from __future__ import annotations
from datetime import datetime
from enum import Enum from enum import Enum
from typing import TYPE_CHECKING from dataclasses import dataclass
from datetime import datetime
from typing import List, Union from typing import List, Union
from spotframework.util.console import Color
from spotframework.model.uri import Uri from spotframework.model.uri import Uri
if TYPE_CHECKING: import spotframework.model.artist
from spotframework.model.artist import Artist import spotframework.model.service
from spotframework.model.track import Track import spotframework.model.track
class Album: @dataclass
def __init__(self, name: str, artists: List[Artist], tracks: List[Track] = None): class SimplifiedAlbum:
self.name = name class Type(Enum):
self.artists = artists single = 0
if tracks is not None: compilation = 1
self.tracks = tracks album = 2
else:
self.tracks = [] album_type: SimplifiedAlbum.Type
artists: List[spotframework.model.artist.SimplifiedArtist]
available_markets: List[str]
external_urls: dict
href: str
id: str
images: List[spotframework.model.service.Image]
name: str
release_date: datetime
release_date_precision: str
type: str
uri: Union[str, Uri]
total_tracks: int = None
def __post_init__(self):
if isinstance(self.album_type, str):
self.album_type = SimplifiedAlbum.Type[self.album_type.strip().lower()]
if isinstance(self.uri, str):
self.uri = Uri(self.uri)
if self.uri:
if self.uri.object_type != Uri.ObjectType.album:
raise TypeError('provided uri not for an album')
if all((isinstance(i, dict) for i in self.artists)):
self.artists = [spotframework.model.artist.SimplifiedArtist(**i) for i in self.artists]
if all((isinstance(i, dict) for i in self.images)):
self.images = [spotframework.model.service.Image(**i) for i in self.images]
if isinstance(self.release_date, str):
if self.release_date_precision == 'year':
self.release_date = datetime.strptime(self.release_date, '%Y')
elif self.release_date_precision == 'month':
self.release_date = datetime.strptime(self.release_date, '%Y-%m')
elif self.release_date_precision == 'day':
self.release_date = datetime.strptime(self.release_date, '%Y-%m-%d')
@property @property
def artists_names(self) -> str: def artists_names(self) -> str:
@ -32,110 +69,56 @@ class Album:
return f'{self.name} / {artists}' return f'{self.name} / {artists}'
def __repr__(self):
return Color.DARKCYAN + Color.BOLD + 'Album' + Color.END + \
f': {self.name}, [{self.artists}]'
def __len__(self): @dataclass
return len(self.tracks) class AlbumFull(SimplifiedAlbum):
@staticmethod copyrights: List[dict] = None
def wrap(name: str = None, external_ids: dict = None
artists: Union[str, List[str]] = None): genres: List[str] = None
return Album(name=name, artists=[Artist(i) for i in artists])
label: str = None
class SpotifyAlbum(Album):
class Type(Enum):
single = 0
compilation = 1
album = 2
def __init__(self,
name: str,
artists: List[Artist],
album_type: Type,
href: str = None,
uri: Union[str, Uri] = None,
genres: List[str] = None,
tracks: List[Track] = None,
release_date: str = None,
release_date_precision: str = None,
label: str = None,
popularity: int = None popularity: int = None
): tracks: List[spotframework.model.track.SimplifiedTrack] = None
super().__init__(name, artists, tracks=tracks)
self.href = href def __post_init__(self):
if isinstance(uri, str):
self.uri = Uri(uri) if isinstance(self.album_type, str):
else: self.album_type = SimplifiedAlbum.Type[self.album_type]
self.uri = uri
if isinstance(self.uri, str):
self.uri = Uri(self.uri)
if self.uri: if self.uri:
if self.uri.object_type != Uri.ObjectType.album: if self.uri.object_type != Uri.ObjectType.album:
raise TypeError('provided uri not for an album') raise TypeError('provided uri not for an album')
self.album_type = album_type if all((isinstance(i, dict) for i in self.artists)):
self.artists = [spotframework.model.artist.SimplifiedArtist(**i) for i in self.artists]
self.genres = genres if all((isinstance(i, dict) for i in self.images)):
self.images = [spotframework.model.service.Image(**i) for i in self.images]
self.release_date = release_date if all((isinstance(i, dict) for i in self.tracks)):
self.release_date_precision = release_date_precision self.tracks = [spotframework.model.track.SimplifiedTrack(**i) for i in self.tracks]
self.label = label if isinstance(self.release_date, str):
self.popularity = popularity if self.release_date_precision == 'year':
self.release_date = datetime.strptime(self.release_date, '%Y')
def __repr__(self): elif self.release_date_precision == 'month':
return Color.DARKCYAN + Color.BOLD + 'SpotifyAlbum' + Color.END + \ self.release_date = datetime.strptime(self.release_date, '%Y-%m')
f': {self.name}, {self.artists}, {self.uri}, {self.tracks}' elif self.release_date_precision == 'day':
self.release_date = datetime.strptime(self.release_date, '%Y-%m-%d')
@staticmethod
def wrap(uri: Uri = None,
name: str = None,
artists: Union[str, List[str]] = None):
if uri:
return SpotifyAlbum(name=None, artists=None, uri=uri)
else:
return super().wrap(name=name, artists=artists)
class LibraryAlbum(SpotifyAlbum): @dataclass
def __init__(self, class LibraryAlbum:
name: str, added_at: datetime
artists: List[Artist], album: AlbumFull
album_type: SpotifyAlbum.Type, def __post_init__(self):
if isinstance(self.album, dict):
self.album = AlbumFull(**self.album)
href: str = None, if isinstance(self.added_at, str):
uri: Union[str, Uri] = None, self.added_at = datetime.strptime(self.added_at, '%Y-%m-%dT%H:%M:%S%z')
genres: List[str] = None,
tracks: List = None,
release_date: str = None,
release_date_precision: str = None,
label: str = None,
popularity: int = None,
added_at: datetime = None
):
super().__init__(name=name,
artists=artists,
album_type=album_type,
href=href,
uri=uri,
genres=genres,
tracks=tracks,
release_date=release_date,
release_date_precision=release_date_precision,
label=label,
popularity=popularity)
self.added_at = added_at

View File

@ -1,51 +1,43 @@
from dataclasses import dataclass
from typing import List, Union from typing import List, Union
from spotframework.util.console import Color
from spotframework.model.uri import Uri from spotframework.model.uri import Uri
from spotframework.model.service import Image
class Artist: @dataclass
def __init__(self, name: str): class SimplifiedArtist:
self.name = name name: str
external_urls: dict
href: str
id: str
uri: Union[str, Uri]
type: str
def __str__(self): def __post_init__(self):
return f'{self.name}' if isinstance(self.uri, str):
self.uri = Uri(self.uri)
def __repr__(self):
return Color.PURPLE + Color.BOLD + 'Artist' + Color.END + \
f': {self.name}'
class SpotifyArtist(Artist):
def __init__(self,
name: str,
href: str = None,
uri: Union[str, Uri] = None,
genres: List[str] = None,
popularity: int = None
):
super().__init__(name)
self.href = href
if isinstance(uri, str):
self.uri = Uri(uri)
else:
self.uri = uri
if self.uri: if self.uri:
if self.uri.object_type != Uri.ObjectType.artist: if self.uri.object_type != Uri.ObjectType.artist:
raise TypeError('provided uri not for an artist') raise TypeError('provided uri not for an artist')
self.genres = genres def __str__(self):
return f'{self.name}'
self.popularity = popularity
def __repr__(self): @dataclass
return Color.PURPLE + Color.BOLD + 'SpotifyArtist' + Color.END + \ class ArtistFull(SimplifiedArtist):
f': {self.name}, {self.uri}' genres: List[str]
images: List[Image]
popularity: int
@staticmethod def __post_init__(self):
def wrap(uri: Uri): if isinstance(self.uri, str):
return SpotifyArtist(name=None, uri=uri) self.uri = Uri(self.uri)
if self.uri:
if self.uri.object_type != Uri.ObjectType.artist:
raise TypeError('provided uri not for an artist')
if all((isinstance(i, dict) for i in self.images)):
self.images = [Image(**i) for i in self.images]

View File

@ -1,7 +1,8 @@
from spotframework.model.user import User from dataclasses import dataclass
from spotframework.model.track import Track, SpotifyTrack, PlaylistTrack from spotframework.model.user import PublicUser
from spotframework.model.track import TrackFull, PlaylistTrack
from spotframework.model.uri import Uri from spotframework.model.uri import Uri
from spotframework.util.console import Color from spotframework.model.service import Image
from tabulate import tabulate from tabulate import tabulate
from typing import List, Union from typing import List, Union
import logging import logging
@ -9,68 +10,65 @@ import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class Playlist: @dataclass
class SimplifiedPlaylist:
collaborative: bool
description: str
external_urls: dict
href: str
id: str
images: List[Image]
name: str
owner: PublicUser
primary_color: str
public: bool
snapshot_id: str
tracks: List[PlaylistTrack]
type: str
uri: Union[str, Uri]
def __init__(self, def __post_init__(self):
name: str = None, if isinstance(self.tracks, dict):
description: str = None): self.tracks = []
self._tracks = []
self.name = name if isinstance(self.uri, str):
self.description = description self.uri = Uri(self.uri)
if self.uri:
if self.uri.object_type != Uri.ObjectType.playlist:
raise TypeError('provided uri not for a playlist')
if all((isinstance(i, dict) for i in self.images)):
self.images = [Image(**i) for i in self.images]
if isinstance(self.owner, dict):
self.owner = PublicUser(**self.owner)
def has_tracks(self) -> bool: def has_tracks(self) -> bool:
if len(self.tracks) > 0: return bool(len(self.tracks) > 0)
return True
else:
return False
def __len__(self): def __len__(self):
return len(self.tracks) return len(self.tracks)
def __getitem__(self, item) -> Track: def __getitem__(self, item) -> PlaylistTrack:
return self.tracks[item] return self.tracks[item]
def __iter__(self): def __iter__(self):
return iter(self.tracks) return iter(self.tracks)
@property
def tracks(self) -> List[Track]:
return self._tracks
@tracks.setter
def tracks(self, value: List[Track]):
tracks = []
not_tracks = []
for track in value:
if isinstance(track, Track):
tracks.append(track)
else:
not_tracks.append(track)
if len(not_tracks) > 0:
logger.error('playlist tracks must be off type Track')
self._tracks = tracks
def __str__(self): def __str__(self):
prefix = f'\n==={self.name}===\n\n' if self.name is not None else '' prefix = f'\n==={self.name}===\n\n' if self.name is not None else ''
table = prefix + self.get_tracks_string() + '\n' + f'total: {len(self)}' table = prefix + self.get_tracks_string() + '\n' + f'total: {len(self)}'
return table return table
def __repr__(self):
return Color.GREEN + Color.BOLD + 'Playlist' + Color.END + \
f': {self.name}, ({len(self)})'
def __add__(self, other): def __add__(self, other):
if isinstance(other, Track): if isinstance(other, PlaylistTrack):
self.tracks.append(other) self.tracks.append(other)
return self return self
elif isinstance(other, list): elif isinstance(other, list):
if all((isinstance(i, Track) for i in other)): if all((isinstance(i, PlaylistTrack) for i in other)):
self.tracks += other self.tracks += other
return self return self
else: else:
@ -82,12 +80,12 @@ class Playlist:
raise TypeError('list of tracks needed to add') raise TypeError('list of tracks needed to add')
def __sub__(self, other): def __sub__(self, other):
if isinstance(other, Track): if isinstance(other, PlaylistTrack):
self.tracks.remove(other) self.tracks.remove(other)
return self return self
elif isinstance(other, list): elif isinstance(other, list):
if all((isinstance(i, Track) for i in other)): if all((isinstance(i, PlaylistTrack) for i in other)):
self.tracks -= other self.tracks -= other
return self return self
else: else:
@ -103,12 +101,12 @@ class Playlist:
rows = [] rows = []
headers = ['name', 'album', 'artist', 'added at', 'popularity', 'uri'] headers = ['name', 'album', 'artist', 'added at', 'popularity', 'uri']
for track in self.tracks: for track in self.tracks:
track_row = [track.name, track_row = [track.track.name,
track.album.name, track.track.album.name,
track.artists_names, track.track.artists_names,
track.added_at if isinstance(track, PlaylistTrack) else '', track.added_at if isinstance(track, PlaylistTrack) else '',
track.popularity if isinstance(track, SpotifyTrack) else '', track.popularity if isinstance(track, TrackFull) else '',
track.uri if isinstance(track, SpotifyTrack) else ''] track.uri if isinstance(track, TrackFull) else '']
rows.append(track_row) rows.append(track_row)
@ -117,49 +115,14 @@ class Playlist:
return table return table
class SpotifyPlaylist(Playlist): @dataclass
class FullPlaylist(SimplifiedPlaylist):
def __init__(self, followers: dict = None
uri: Union[str, Uri],
name: str = None,
owner: User = None,
description: str = None,
href: str = None,
collaborative: bool = None,
public: bool = None,
ext_spotify: str = None,
images: List[str] = None):
super().__init__(name=name, description=description)
self.owner = owner
self.href = href
if isinstance(uri, str):
self.uri = Uri(uri)
else:
self.uri = uri
if self.uri.object_type != Uri.ObjectType.playlist:
raise TypeError('provided uri not for a playlist')
self.collaborative = collaborative
self.public = public
self.ext_spotify = ext_spotify
self.images = images
def __str__(self): def __str__(self):
prefix = f'\n==={self.name}===\n\n' if self.name is not None else '' prefix = f'\n==={self.name}===\n\n' if self.name is not None else ''
prefix += f'uri: {self.uri}\n' if self.uri is not None else '' prefix += f'uri: {self.uri}\n' if self.uri is not None else ''
table = prefix + self.get_tracks_string() + '\n' + f'total: {len(self)}' table = prefix + self.get_tracks_string() + '\n' + f'total: {len(self)}'
return table return table
def __repr__(self):
return Color.GREEN + Color.BOLD + 'SpotifyPlaylist' + Color.END + \
f': {self.name} ({self.owner}), ({len(self)}), {self.uri}'

View File

@ -1,116 +1,12 @@
from datetime import datetime from dataclasses import dataclass
from spotframework.model.track import Track
from spotframework.model.uri import Uri
from enum import Enum
from typing import Union
class Context: @dataclass
def __init__(self, class Image:
uri: Union[str, Uri], height: int
object_type: str = None, width: int
href: str = None, url: str
external_spot: str = None):
if isinstance(uri, str):
self.uri = Uri(uri)
else:
self.uri = uri
if self.uri.object_type not in [Uri.ObjectType.album, Uri.ObjectType.artist, Uri.ObjectType.playlist]:
raise TypeError('context uri must be one of album, artist, playlist')
self.object_type = object_type
self.href = href
self.external_spot = external_spot
def __eq__(self, other):
return isinstance(other, Context) and other.uri == self.uri
def __repr__(self):
return f'Context: {self.object_type} uri({self.uri})'
def __str__(self):
return str(self.uri)
class Device:
class DeviceType(Enum):
COMPUTER = 1
TABLET = 2
SMARTPHONE = 3
SPEAKER = 4
TV = 5
AVR = 6
STB = 7
AUDIODONGLE = 8
GAMECONSOLE = 9
CASTVIDEO = 10
CASTAUDIO = 11
AUTOMOBILE = 12
UNKNOWN = 13
def __init__(self,
device_id: str,
is_active: bool,
is_private_session: bool,
is_restricted: bool,
name: str,
object_type: DeviceType,
volume: int):
self.device_id = device_id
self.is_active = is_active
self.is_private_session = is_private_session
self.is_restricted = is_restricted
self.name = name
self.object_type = object_type
self.volume = volume
def __repr__(self):
return f'Device: {self.name} active({self.is_active}) type({self.object_type}) vol({self.volume})'
def __str__(self):
return self.name
class CurrentlyPlaying:
def __init__(self,
context: Context,
timestamp: datetime,
progress_ms: int,
is_playing: bool,
track: Track,
device: Device,
shuffle: bool,
repeat: bool,
currently_playing_type: str):
self.context = context
self.timestamp = timestamp
self.progress_ms = progress_ms
self.is_playing = is_playing
self.track = track
self.device = device
self.shuffle = shuffle
self.repeat = repeat
self.currently_playing_type = currently_playing_type
def __repr__(self):
return f'CurrentlyPlaying: is_playing({self.is_playing}) progress({self.progress_ms}) ' \
f'context({self.context}) track({self.track}) device({self.device}) shuffle({self.shuffle}) ' \
f'repeat({self.repeat}) time({self.timestamp})'
def __eq__(self, other):
return isinstance(other, CurrentlyPlaying) and other.track == self.track and other.context == self.context
@staticmethod @staticmethod
def _format_duration(duration): def wrap(**kwargs):
total_seconds = duration / 1000 return Image(**kwargs)
minutes = int((total_seconds/60) % 60)
seconds = int(total_seconds % 60)
return f'{minutes}:{seconds}'
def __str__(self):
if self.is_playing:
playing = 'playing'
else:
playing = '(paused)'
return f'{playing} {self.track} on {self.device} from {self.context} ({self._format_duration(self.progress_ms)})'

View File

@ -1,322 +1,182 @@
from __future__ import annotations from __future__ import annotations
from typing import TYPE_CHECKING from typing import TYPE_CHECKING, Union, List
from typing import List, Union
from datetime import datetime from datetime import datetime
from dataclasses import dataclass, field
import spotframework.model
from spotframework.model.uri import Uri from spotframework.model.uri import Uri
from spotframework.util.console import Color
from spotframework.util import convert_ms_to_minute_string
from enum import Enum from enum import Enum
import spotframework.model.album
import spotframework.model.artist
import spotframework.model.service
if TYPE_CHECKING: if TYPE_CHECKING:
from spotframework.model.album import Album, SpotifyAlbum from spotframework.model.user import PublicUser
from spotframework.model.artist import Artist
from spotframework.model.user import User
from spotframework.model.service import Context
class Track: @dataclass
def __init__(self, class SimplifiedTrack:
name: str, artists: List[spotframework.model.artist.SimplifiedArtist]
album: Album, available_markets: List[str]
artists: List[Artist], disc_number: int
duration_ms: int
external_urls: dict
explicit: bool
href: str
id: str
name: str
preview_url: str
track_number: int
type: str
uri: Union[str, Uri]
is_local: bool
is_playable: bool = None
episode: bool = None
track: bool = None
disc_number: int = None, def __post_init__(self):
track_number: int = None, if isinstance(self.uri, str):
duration_ms: int = None, self.uri = Uri(self.uri)
excplicit: bool = None
):
self.name = name
self.album = album
self.artists = artists
self.disc_number = disc_number if self.uri:
self.track_number = track_number if self.uri.object_type != Uri.ObjectType.track:
self.duration_ms = duration_ms raise TypeError('provided uri not for a track')
self.explicit = excplicit
if all((isinstance(i, dict) for i in self.artists)):
self.artists = [spotframework.model.artist.SimplifiedArtist(**i) for i in self.artists]
@property @property
def artists_names(self) -> str: def artists_names(self) -> str:
return self._join_strings([i.name for i in self.artists]) return self._join_strings([i.name for i in self.artists])
@property
def album_artists_names(self) -> str:
return self.album.artists_names
@staticmethod @staticmethod
def _join_strings(string_list: List[str]): def _join_strings(string_list: List[str]):
return ', '.join(string_list) return ', '.join(string_list)
def __str__(self): def __str__(self):
album = self.album.name if self.album is not None else 'n/a'
artists = ', '.join([i.name for i in self.artists]) if self.artists is not None else 'n/a' artists = ', '.join([i.name for i in self.artists]) if self.artists is not None else 'n/a'
return f'{self.name} / {album} / {artists}' return f'{self.name} / {artists}'
def __repr__(self):
return Color.YELLOW + Color.BOLD + 'Track' + Color.END + \
f': {self.name}, ({self.album}), {self.artists}'
def __eq__(self, other): def __eq__(self, other):
return isinstance(other, Track) and other.name == self.name and other.artists == self.artists return isinstance(other, SimplifiedTrack) and other.name == self.name and other.artists == self.artists
@staticmethod
def wrap(name: str = None,
artists: List[str] = None,
album: str = None,
album_artists: List[str] = None):
return Track(name=name,
album=Album.wrap(name=album, artists=album_artists),
artists=[Artist(i) for i in artists])
class SpotifyTrack(Track): @dataclass
def __init__(self, class TrackFull(SimplifiedTrack):
name: str, album: spotframework.model.album.SimplifiedAlbum = None
album: SpotifyAlbum, external_ids: dict = None
artists: List[Artist], popularity: int = None
href: str = None, @property
uri: Union[str, Uri] = None, def album_artists_names(self) -> str:
return self.album.artists_names
disc_number: int = None, def __post_init__(self):
track_number: int = None, if isinstance(self.uri, str):
duration_ms: int = None, self.uri = Uri(self.uri)
explicit: bool = None,
is_playable: bool = None,
popularity: int = None,
audio_features: AudioFeatures = None
):
super().__init__(name=name, album=album, artists=artists,
disc_number=disc_number,
track_number=track_number,
duration_ms=duration_ms,
excplicit=explicit)
self.href = href
if isinstance(uri, str):
self.uri = Uri(uri)
else:
self.uri = uri
if self.uri:
if self.uri.object_type != Uri.ObjectType.track: if self.uri.object_type != Uri.ObjectType.track:
raise TypeError('provided uri not for a track') raise TypeError('provided uri not for a track')
self.is_playable = is_playable if all((isinstance(i, dict) for i in self.artists)):
self.artists = [spotframework.model.artist.SimplifiedArtist(**i) for i in self.artists]
self.popularity = popularity if isinstance(self.album, dict):
self.album = spotframework.model.album.SimplifiedAlbum(**self.album)
self.audio_features = audio_features
def __repr__(self):
string = Color.BOLD + Color.YELLOW + 'SpotifyTrack' + Color.END + \
f': {self.name}, ({self.album}), {self.artists}, {self.uri}'
if self.audio_features is not None:
string += ' ' + repr(self.audio_features)
return string
def __eq__(self, other): def __eq__(self, other):
return isinstance(other, SpotifyTrack) and other.uri == self.uri return isinstance(other, TrackFull) and other.uri == self.uri
@staticmethod
def wrap(uri: Uri = None,
name: str = None,
artists: Union[str, List[str]] = None,
album: str = None,
album_artists: Union[str, List[str]] = None):
if uri:
return SpotifyTrack(name=None, album=None, artists=None, uri=uri)
else:
return super().wrap(name=name, artists=artists, album=album, album_artists=album_artists)
class LibraryTrack(SpotifyTrack): @dataclass
def __init__(self, class LibraryTrack:
name: str, added_at: datetime
album: SpotifyAlbum, track: TrackFull
artists: List[Artist],
href: str = None, def __post_init__(self):
uri: Union[str, Uri] = None, if isinstance(self.track, dict):
self.track = TrackFull(**self.track)
disc_number: int = None, if isinstance(self.added_at, str):
track_number: int = None, self.added_at = datetime.strptime(self.added_at, '%Y-%m-%dT%H:%M:%S%z')
duration_ms: int = None,
explicit: bool = None,
is_playable: bool = None,
popularity: int = None,
audio_features: AudioFeatures = None,
added_at: datetime = None
):
super().__init__(name=name, album=album, artists=artists,
href=href,
uri=uri,
disc_number=disc_number,
track_number=track_number,
duration_ms=duration_ms,
explicit=explicit,
is_playable=is_playable,
popularity=popularity,
audio_features=audio_features)
self.added_at = added_at
def __repr__(self):
string = Color.BOLD + Color.YELLOW + 'LibraryTrack' + Color.END + \
f': {self.name}, ({self.album}), {self.artists}, {self.uri}, {self.added_at}'
if self.audio_features is not None:
string += ' ' + repr(self.audio_features)
return string
class PlaylistTrack(SpotifyTrack): @dataclass
def __init__(self, class PlaylistTrack:
name: str, added_at: datetime
album: SpotifyAlbum, added_by: PublicUser
artists: List[Artist], is_local: bool
primary_color: str
track: TrackFull
video_thumbnail: dict
added_at: datetime, def __post_init__(self):
added_by: User, if isinstance(self.track, dict):
is_local: bool, self.track = TrackFull(**self.track)
href: str = None, if isinstance(self.added_at, str):
uri: Union[str, Uri] = None, self.added_at = datetime.strptime(self.added_at, '%Y-%m-%dT%H:%M:%S%z')
disc_number: int = None,
track_number: int = None,
duration_ms: int = None,
explicit: bool = None,
is_playable: bool = None,
popularity: int = None,
audio_features: AudioFeatures = None
):
super().__init__(name=name, album=album, artists=artists,
href=href,
uri=uri,
disc_number=disc_number,
track_number=track_number,
duration_ms=duration_ms,
explicit=explicit,
is_playable=is_playable,
popularity=popularity,
audio_features=audio_features)
self.added_at = added_at
self.added_by = added_by
self.is_local = is_local
def __repr__(self):
string = Color.BOLD + Color.YELLOW + 'PlaylistTrack' + Color.END + \
f': {self.name}, ({self.album}), {self.artists}, {self.uri}, {self.added_at}'
if self.audio_features is not None:
string += ' ' + repr(self.audio_features)
return string
class PlayedTrack(SpotifyTrack): @dataclass
def __init__(self, class PlayedTrack:
name: str, played_at: datetime
album: SpotifyAlbum, context: Context
artists: List[Artist], track: SimplifiedTrack
href: str = None, def __post_init__(self):
uri: Union[str, Uri] = None, if isinstance(self.context, dict):
self.context = Context(**self.context)
disc_number: int = None, if isinstance(self.track, dict):
track_number: int = None, self.track = TrackFull(**self.track)
duration_ms: int = None, if isinstance(self.played_at, str):
explicit: bool = None, self.played_at = datetime.strptime(self.played_at, '%Y-%m-%dT%H:%M:%S%z')
is_playable: bool = None,
popularity: int = None,
audio_features: AudioFeatures = None,
played_at: datetime = None,
context: Context = None
):
super().__init__(name=name, album=album, artists=artists,
href=href,
uri=uri,
disc_number=disc_number,
duration_ms=duration_ms,
track_number=track_number,
explicit=explicit,
is_playable=is_playable,
popularity=popularity,
audio_features=audio_features)
self.played_at = played_at
self.context = context
def __repr__(self):
string = Color.BOLD + Color.YELLOW + 'PlayedTrack' + Color.END + \
f': {self.name}, ({self.album}), {self.artists}, {self.uri}, {self.played_at}'
if self.audio_features is not None:
string += ' ' + repr(self.audio_features)
return string
@dataclass
class AudioFeatures: class AudioFeatures:
acousticness: float
analysis_url: str
danceability: float
duration_ms: int
energy: float
uri: Uri
instrumentalness: float
key: int
liveness: float
loudness: float
mode: AudioFeatures.Mode
speechiness: float
tempo: float
time_signature: int
track_href: str
valence: float
type: str
id: str
class Mode(Enum): class Mode(Enum):
MINOR = 0 MINOR = 0
MAJOR = 1 MAJOR = 1
def __init__(self, def __post_init__(self):
acousticness: float, self.acousticness = self.check_float(self.acousticness)
analysis_url: str, self.danceability = self.check_float(self.danceability)
danceability: float, self.energy = self.check_float(self.energy)
duration_ms: int, self.instrumentalness = self.check_float(self.instrumentalness)
energy: float, self.liveness = self.check_float(self.liveness)
uri: Uri,
instrumentalness: float,
key: int,
liveness: float,
loudness: float,
mode: int,
speechiness: float,
tempo: float,
time_signature: int,
track_href: str,
valence: float):
self.acousticness = self.check_float(acousticness)
self.analysis_url = analysis_url
self.danceability = self.check_float(danceability)
self.duration_ms = duration_ms
self.energy = self.check_float(energy)
self.uri = uri
self.instrumentalness = self.check_float(instrumentalness)
self._key = key
self.liveness = self.check_float(liveness)
self.loudness = loudness
if mode == 0: if self.mode == 0:
self.mode = self.Mode.MINOR self.mode = self.Mode.MINOR
elif mode == 1: elif self.mode == 1:
self.mode = self.Mode.MAJOR self.mode = self.Mode.MAJOR
else: else:
raise ValueError('illegal value for mode') raise ValueError('illegal value for mode')
self.speechiness = self.check_float(speechiness) self.speechiness = self.check_float(self.speechiness)
self.tempo = tempo self.valence = self.check_float(self.valence)
self.time_signature = time_signature
self.track_href = track_href if isinstance(self.mode, int):
self.valence = self.check_float(valence) self.mode = AudioFeatures.Mode(self.mode)
def to_dict(self): def to_dict(self):
return { return {
@ -340,7 +200,7 @@ class AudioFeatures:
} }
@property @property
def key(self) -> str: def key_str(self) -> str:
legend = { legend = {
0: 'C', 0: 'C',
1: 'C#', 1: 'C#',
@ -355,21 +215,22 @@ class AudioFeatures:
10: 'A#', 10: 'A#',
11: 'B' 11: 'B'
} }
if legend.get(self._key, None): if legend.get(self.key, None):
return legend.get(self._key, None) return legend.get(self.key, None)
else: else:
raise ValueError('key value out of bounds') raise ValueError('key value out of bounds')
@key.setter @key_str.setter
def key(self, value): def key_str(self, value):
if isinstance(value, int): if isinstance(value, int):
if 0 <= value <= 11: if 0 <= value <= 11:
self._key = value self.key = value
else: else:
raise ValueError('key value out of bounds') raise ValueError('key value out of bounds')
else: else:
raise ValueError('key value not integer') raise ValueError('key value not integer')
@property
def is_live(self): def is_live(self):
if self.liveness is not None: if self.liveness is not None:
if self.liveness > 0.8: if self.liveness > 0.8:
@ -379,6 +240,7 @@ class AudioFeatures:
else: else:
raise ValueError('no value for liveness') raise ValueError('no value for liveness')
@property
def is_instrumental(self): def is_instrumental(self):
if self.instrumentalness is not None: if self.instrumentalness is not None:
if self.instrumentalness > 0.5: if self.instrumentalness > 0.5:
@ -388,6 +250,7 @@ class AudioFeatures:
else: else:
raise ValueError('no value for instrumentalness') raise ValueError('no value for instrumentalness')
@property
def is_spoken_word(self): def is_spoken_word(self):
if self.speechiness is not None: if self.speechiness is not None:
if self.speechiness > 0.66: if self.speechiness > 0.66:
@ -409,10 +272,122 @@ class AudioFeatures:
else: else:
raise ValueError(f'value {value} is not float') raise ValueError(f'value {value} is not float')
def __repr__(self):
return Color.BOLD + Color.DARKCYAN + 'AudioFeatures' + Color.END + \ @dataclass
f': acoustic:{self.acousticness}, dance:{self.danceability}, ' \ class Context:
f'duration:{convert_ms_to_minute_string(self.duration_ms)}, energy:{self.energy}, ' \ uri: Union[str, Uri]
f'instrumental:{self.instrumentalness}, key:{self.key}, live:{self.liveness}, ' \ type: str = None
f'volume:{self.loudness}db, mode:{self.mode.name}, speech:{self.speechiness}, tempo:{self.tempo}, ' \ href: str = None
f'time_sig:{self.time_signature}, valence:{self.valence}' external_urls: dict = field(default_factory=dict)
def __post_init__(self):
if isinstance(self.uri, str):
self.uri = Uri(self.uri)
if self.uri:
if self.uri.object_type not in [Uri.ObjectType.album, Uri.ObjectType.artist, Uri.ObjectType.playlist]:
raise TypeError('context uri must be one of album, artist, playlist')
def __eq__(self, other):
return isinstance(other, Context) and other.uri == self.uri
def __str__(self):
return str(self.uri)
@dataclass
class Device:
class DeviceType(Enum):
COMPUTER = 1
TABLET = 2
SMARTPHONE = 3
SPEAKER = 4
TV = 5
AVR = 6
STB = 7
AUDIODONGLE = 8
GAMECONSOLE = 9
CASTVIDEO = 10
CASTAUDIO = 11
AUTOMOBILE = 12
UNKNOWN = 13
id: str
is_active: bool
is_private_session: bool
is_restricted: bool
name: str
type: DeviceType
volume_percent: int
def __post_init__(self):
if isinstance(self.type, str):
self.type = Device.DeviceType[self.type.upper()]
def __str__(self):
return self.name
@dataclass
class CurrentlyPlaying:
context: Context
timestamp: str
progress_ms: int
is_playing: bool
item: spotframework.model.track.SimplifiedTrack
device: Device
shuffle_state: bool
repeat_state: bool
currently_playing_type: str
actions: dict
def __post_init__(self):
if isinstance(self.context, Context):
self.context = Context(**self.context)
if isinstance(self.item, spotframework.model.track.SimplifiedTrack):
self.item = spotframework.model.track.SimplifiedTrack(**self.item)
if isinstance(self.device, Device):
self.device = Device(**self.device)
def __eq__(self, other):
return isinstance(other, CurrentlyPlaying) and other.item == self.item and other.context == self.context
@staticmethod
def _format_duration(duration):
total_seconds = duration / 1000
minutes = int((total_seconds/60) % 60)
seconds = int(total_seconds % 60)
return f'{minutes}:{seconds}'
def __str__(self):
if self.is_playing:
playing = 'playing'
else:
playing = '(paused)'
return f'{playing} {self.item} on {self.device} from {self.context} ({self._format_duration(self.progress_ms)})'
@dataclass
class RecommendationsSeed:
afterFilteringSize: int
afterRelinkingSize: int
href: str
id: str
initialPoolSize: int
type: str
@dataclass
class Recommendations:
seeds: List[RecommendationsSeed]
tracks: List[spotframework.model.track.SimplifiedTrack]
def __post_init__(self):
if all((isinstance(i, dict) for i in self.seeds)):
self.seeds = [RecommendationsSeed(**i) for i in self.seeds]
if all((isinstance(i, dict) for i in self.tracks)):
self.tracks = [spotframework.model.track.TrackFull(**i) for i in self.tracks]

View File

@ -1,31 +1,39 @@
from spotframework.util.console import Color from typing import Union, List
from dataclasses import dataclass, field
from spotframework.model.uri import Uri from spotframework.model.uri import Uri
from typing import Union from spotframework.model.service import Image
class User: @dataclass
def __init__(self, class PublicUser:
username: str, href: str
id: str
uri: Union[str, Uri]
display_name: str
external_urls: dict
type: str
href: str = None, followers: dict = field(default_factory=dict)
uri: Union[str, Uri] = None, images: List[Image] = field(default_factory=list)
display_name: str = None, def __post_init__(self):
ext_spotify: str = None): if isinstance(self.uri, str):
self.username = username self.uri = Uri(self.uri)
self.href = href if self.uri:
if isinstance(uri, str): if self.uri.object_type != Uri.ObjectType.user:
self.uri = Uri(uri) raise TypeError('provided uri not for a user')
else:
self.uri = uri
self.display_name = display_name if all((isinstance(i, dict) for i in self.images)):
self.ext_spotify = ext_spotify self.images = [Image(**i) for i in self.images]
def __str__(self): def __str__(self):
return f'{self.username}' return f'{self.display_name}'
@dataclass
class PrivateUser(PublicUser):
country: str = None
email: str = None
product: str = None
def __repr__(self):
return Color.RED + Color.BOLD + 'User' + Color.END + \
f': {self.username}, {self.display_name}, {self.uri}'

View File

@ -2,17 +2,18 @@ import requests
import random import random
import logging import logging
import time import time
from dataclasses import dataclass
from typing import List, Optional, Union from typing import List, Optional, Union
import datetime import datetime
from spotframework.model.artist import SpotifyArtist from spotframework.model.artist import ArtistFull
from spotframework.model.user import User from spotframework.model.user import PublicUser
from . import const from . import const
from spotframework.net.user import NetworkUser from spotframework.net.user import NetworkUser
from spotframework.model.playlist import SpotifyPlaylist from spotframework.model.playlist import SimplifiedPlaylist, FullPlaylist
from spotframework.model.track import Track, SpotifyTrack, PlaylistTrack, PlayedTrack, LibraryTrack, AudioFeatures from spotframework.model.track import SimplifiedTrack, TrackFull, PlaylistTrack, PlayedTrack, LibraryTrack, \
from spotframework.model.album import LibraryAlbum, SpotifyAlbum AudioFeatures, Device, CurrentlyPlaying, Recommendations
from spotframework.model.service import CurrentlyPlaying, Device, Context from spotframework.model.album import AlbumFull, LibraryAlbum, SimplifiedAlbum
from spotframework.model.uri import Uri from spotframework.model.uri import Uri
from requests.models import Response from requests.models import Response
@ -21,16 +22,12 @@ limit = 50
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@dataclass
class SearchResponse: class SearchResponse:
def __init__(self, tracks: List[TrackFull]
tracks: List[SpotifyTrack], albums: List[SimplifiedAlbum]
albums: List[SpotifyAlbum], artists: List[ArtistFull]
artists: List[SpotifyArtist], playlists: List[SimplifiedPlaylist]
playlists: List[SpotifyPlaylist]):
self.tracks = tracks
self.albums = albums
self.artists = artists
self.playlists = playlists
@property @property
def all(self): def all(self):
@ -95,7 +92,7 @@ class Network:
elif req.status_code == 401: elif req.status_code == 401:
logger.warning(f'{method} access token expired, refreshing') logger.warning(f'{method} access token expired, refreshing')
self.user.refresh_token() self.user.refresh_access_token()
if self.refresh_counter < 5: if self.refresh_counter < 5:
self.refresh_counter += 1 self.refresh_counter += 1
return self.get_request(method, url, params, headers) return self.get_request(method, url, params, headers)
@ -153,7 +150,7 @@ class Network:
elif req.status_code == 401: elif req.status_code == 401:
logger.warning(f'{method} access token expired, refreshing') logger.warning(f'{method} access token expired, refreshing')
self.user.refresh_token() self.user.refresh_access_token()
if self.refresh_counter < 5: if self.refresh_counter < 5:
self.refresh_counter += 1 self.refresh_counter += 1
return self.post_request(method, url, params, json, headers) return self.post_request(method, url, params, json, headers)
@ -211,7 +208,7 @@ class Network:
elif req.status_code == 401: elif req.status_code == 401:
logger.warning(f'{method} access token expired, refreshing') logger.warning(f'{method} access token expired, refreshing')
self.user.refresh_token() self.user.refresh_access_token()
if self.refresh_counter < 5: if self.refresh_counter < 5:
self.refresh_counter += 1 self.refresh_counter += 1
return self.put_request(method, url, params, json, headers) return self.put_request(method, url, params, json, headers)
@ -232,7 +229,7 @@ class Network:
def get_playlist(self, def get_playlist(self,
uri: Uri = None, uri: Uri = None,
uri_string: str = None, uri_string: str = None,
tracks: bool = True) -> Optional[SpotifyPlaylist]: tracks: bool = True) -> Optional[FullPlaylist]:
"""get playlist object with tracks for uri """get playlist object with tracks for uri
:param uri: target request uri :param uri: target request uri
@ -252,19 +249,19 @@ class Network:
resp = self.get_request('getPlaylist', f'playlists/{uri.object_id}') resp = self.get_request('getPlaylist', f'playlists/{uri.object_id}')
if resp: if resp:
playlist = self.parse_playlist(resp) playlist = FullPlaylist(**resp)
if tracks and resp.get('tracks'): if resp.get('tracks'):
if 'next' in resp['tracks']: if 'next' in resp['tracks']:
logger.debug(f'paging tracks for {uri}') logger.debug(f'paging tracks for {uri}')
track_pager = PageCollection(net=self, page=resp['tracks']) track_pager = PageCollection(net=self, page=resp['tracks'])
track_pager.continue_iteration() track_pager.continue_iteration()
playlist.tracks = [self.parse_track(i) for i in track_pager.items] playlist.tracks = [PlaylistTrack(**i) for i in track_pager.items]
else: else:
logger.debug(f'parsing {len(resp.get("tracks"))} tracks for {uri}') logger.debug(f'parsing {len(resp.get("tracks"))} tracks for {uri}')
playlist.tracks = [self.parse_track(i) for i in resp.get('tracks', [])] playlist.tracks = [PlaylistTrack(**i) for i in resp.get('tracks', [])]
return playlist return playlist
else: else:
@ -276,7 +273,7 @@ class Network:
name: str = 'New Playlist', name: str = 'New Playlist',
public: bool = True, public: bool = True,
collaborative: bool = False, collaborative: bool = False,
description: bool = None) -> Optional[SpotifyPlaylist]: description: bool = None) -> Optional[FullPlaylist]:
"""create playlist for user """create playlist for user
:param username: username for playlist creation :param username: username for playlist creation
@ -297,12 +294,12 @@ class Network:
req = self.post_request('createPlaylist', f'users/{username}/playlists', json=json) req = self.post_request('createPlaylist', f'users/{username}/playlists', json=json)
if 200 <= req.status_code < 300: if 200 <= req.status_code < 300:
return self.parse_playlist(req.json()) return FullPlaylist(**req.json())
else: else:
logger.error('error creating playlist') logger.error('error creating playlist')
return None return None
def get_playlists(self, response_limit: int = None) -> Optional[List[SpotifyPlaylist]]: def get_playlists(self, response_limit: int = None) -> Optional[List[FullPlaylist]]:
"""get current users playlists """get current users playlists
:param response_limit: max playlists to return :param response_limit: max playlists to return
@ -316,7 +313,7 @@ class Network:
pager.total_limit = response_limit pager.total_limit = response_limit
pager.iterate() pager.iterate()
return_items = [self.parse_playlist(i) for i in pager.items] return_items = [SimplifiedPlaylist(**i) for i in pager.items]
if len(return_items) == 0: if len(return_items) == 0:
logger.error('no playlists returned') logger.error('no playlists returned')
@ -337,7 +334,7 @@ class Network:
pager.total_limit = response_limit pager.total_limit = response_limit
pager.iterate() pager.iterate()
return_items = [self.parse_album(i) for i in pager.items] return_items = [LibraryAlbum(**i) for i in pager.items]
if len(return_items) == 0: if len(return_items) == 0:
logger.error('no albums returned') logger.error('no albums returned')
@ -358,14 +355,14 @@ class Network:
pager.total_limit = response_limit pager.total_limit = response_limit
pager.iterate() pager.iterate()
return_items = [self.parse_track(i) for i in pager.items] return_items = [LibraryTrack(**i) for i in pager.items]
if len(return_items) == 0: if len(return_items) == 0:
logger.error('no tracks returned') logger.error('no tracks returned')
return return_items return return_items
def get_user_playlists(self) -> Optional[List[SpotifyPlaylist]]: def get_user_playlists(self) -> Optional[List[FullPlaylist]]:
"""retrieve user owned playlists """retrieve user owned playlists
:return: List of user owned playlists if available :return: List of user owned playlists if available
@ -375,12 +372,12 @@ class Network:
playlists = self.get_playlists() playlists = self.get_playlists()
if self.user.username is None: if self.user.user.id is None:
logger.debug('no user info, refreshing for filter') logger.debug('no user info, refreshing for filter')
self.user.refresh_info() self.user.refresh_info()
if playlists is not None: if playlists is not None:
return list(filter(lambda x: x.owner.username == self.user.username, playlists)) return list(filter(lambda x: x.owner.id == self.user.user.id, playlists))
else: else:
logger.error('no playlists returned to filter') logger.error('no playlists returned to filter')
@ -409,7 +406,7 @@ class Network:
pager.total_limit = response_limit pager.total_limit = response_limit
pager.iterate() pager.iterate()
return_items = [self.parse_track(i) for i in pager.items] return_items = [PlaylistTrack(**i) for i in pager.items]
if len(return_items) == 0: if len(return_items) == 0:
logger.error('no tracks returned') logger.error('no tracks returned')
@ -425,7 +422,7 @@ class Network:
if resp: if resp:
if len(resp['devices']) == 0: if len(resp['devices']) == 0:
logger.error('no devices returned') logger.error('no devices returned')
return [self.parse_device(i) for i in resp['devices']] return [Device(**i) for i in resp['devices']]
else: else:
logger.error('no devices returned') logger.error('no devices returned')
return None return None
@ -462,7 +459,7 @@ class Network:
pager.total_limit = 20 pager.total_limit = 20
pager.continue_iteration() pager.continue_iteration()
return [self.parse_track(i) for i in pager.items] return [PlayedTrack(**i) for i in pager.items]
else: else:
logger.error('no tracks returned') logger.error('no tracks returned')
@ -473,7 +470,7 @@ class Network:
resp = self.get_request('getPlayer', 'me/player') resp = self.get_request('getPlayer', 'me/player')
if resp: if resp:
return self.parse_currently_playing(resp) return CurrentlyPlaying(**resp)
else: else:
logger.info('no player returned') logger.info('no player returned')
@ -490,12 +487,21 @@ class Network:
if devices: if devices:
device = next((i for i in devices if i.name == device_name), None) device = next((i for i in devices if i.name == device_name), None)
if device: if device:
return device.device_id return device.id
else: else:
logger.error(f'{device_name} not found') logger.error(f'{device_name} not found')
else: else:
logger.error('no devices returned') logger.error('no devices returned')
def get_current_user(self) -> Optional[PublicUser]:
logger.info(f"getting current user")
resp = self.get_request('getCurrentUser', 'me')
if resp:
return PublicUser(**resp)
else:
logger.info('no user returned')
def change_playback_device(self, device_id: str): def change_playback_device(self, device_id: str):
"""migrate playback to different device""" """migrate playback to different device"""
@ -733,7 +739,7 @@ class Network:
def get_recommendations(self, def get_recommendations(self,
tracks: List[str] = None, tracks: List[str] = None,
artists: List[str] = None, artists: List[str] = None,
response_limit=10) -> Optional[List[Track]]: response_limit=10) -> Optional[Recommendations]:
logger.info(f'getting {response_limit} recommendations, ' logger.info(f'getting {response_limit} recommendations, '
f'tracks: {len(tracks) if tracks is not None else 0}, ' f'tracks: {len(tracks) if tracks is not None else 0}, '
@ -754,17 +760,13 @@ class Network:
else: else:
resp = self.get_request('getRecommendations', 'recommendations', params=params) resp = self.get_request('getRecommendations', 'recommendations', params=params)
if resp: if resp:
if 'tracks' in resp: return Recommendations(**resp)
return [self.parse_track(i) for i in resp['tracks']]
else:
logger.error('no tracks returned')
return None
else: else:
logger.error('error getting recommendations') logger.error('error getting recommendations')
return None return None
def write_playlist_object(self, def write_playlist_object(self,
playlist: SpotifyPlaylist, playlist: FullPlaylist,
append_tracks: bool = False): append_tracks: bool = False):
logger.info(f'writing {playlist.name}, append tracks: {append_tracks}') logger.info(f'writing {playlist.name}, append tracks: {append_tracks}')
@ -775,10 +777,10 @@ class Network:
elif playlist.tracks: elif playlist.tracks:
if append_tracks: if append_tracks:
self.add_playlist_tracks(playlist.uri, [i.uri for i in playlist.tracks if self.add_playlist_tracks(playlist.uri, [i.uri for i in playlist.tracks if
isinstance(i, SpotifyTrack)]) isinstance(i, SimplifiedTrack)])
else: else:
self.replace_playlist_tracks(uri=playlist.uri, uris=[i.uri for i in playlist.tracks if self.replace_playlist_tracks(uri=playlist.uri, uris=[i.uri for i in playlist.tracks if
isinstance(i, SpotifyTrack)]) isinstance(i, SimplifiedTrack)])
if playlist.name or playlist.collaborative or playlist.public or playlist.description: if playlist.name or playlist.collaborative or playlist.public or playlist.description:
self.change_playlist_details(playlist.uri, self.change_playlist_details(playlist.uri,
@ -819,7 +821,7 @@ class Network:
else: else:
logger.error('error reordering playlist') logger.error('error reordering playlist')
def get_track_audio_features(self, uris: List[Uri]): def get_track_audio_features(self, uris: List[Uri]) -> Optional[List[AudioFeatures]]:
logger.info(f'getting {len(uris)} features') logger.info(f'getting {len(uris)} features')
audio_features = [] audio_features = []
@ -831,13 +833,7 @@ class Network:
if resp: if resp:
if resp.get('audio_features', None): if resp.get('audio_features', None):
return [AudioFeatures(**i) for i in resp['audio_features']]
for feature in resp['audio_features']:
if feature is not None:
audio_features.append(self.parse_audio_features(feature))
else:
audio_features.append(None)
else: else:
logger.error('no audio features included') logger.error('no audio features included')
else: else:
@ -848,10 +844,10 @@ class Network:
else: else:
logger.error('mismatched length of input and response') logger.error('mismatched length of input and response')
def populate_track_audio_features(self, tracks=Union[SpotifyTrack, List[SpotifyTrack]]): def populate_track_audio_features(self, tracks=Union[TrackFull, List[TrackFull]]):
logger.info(f'populating {len(tracks)} features') logger.info(f'populating {len(tracks)} features')
if isinstance(tracks, SpotifyTrack): if isinstance(tracks, TrackFull):
audio_features = self.get_track_audio_features([tracks.uri]) audio_features = self.get_track_audio_features([tracks.uri])
if audio_features: if audio_features:
@ -864,7 +860,7 @@ class Network:
logger.error(f'no audio features returned for {tracks.uri}') logger.error(f'no audio features returned for {tracks.uri}')
elif isinstance(tracks, List): elif isinstance(tracks, List):
if all(isinstance(i, SpotifyTrack) for i in tracks): if all(isinstance(i, TrackFull) for i in tracks):
audio_features = self.get_track_audio_features([i.uri for i in tracks]) audio_features = self.get_track_audio_features([i.uri for i in tracks])
if audio_features: if audio_features:
@ -882,7 +878,7 @@ class Network:
def get_tracks(self, def get_tracks(self,
uris: List[Uri] = None, uris: List[Uri] = None,
uri_strings: List[str] = None) -> List[SpotifyTrack]: uri_strings: List[str] = None) -> List[TrackFull]:
if uris is None and uri_strings is None: if uris is None and uri_strings is None:
raise NameError('no uris provided') raise NameError('no uris provided')
@ -900,11 +896,11 @@ class Network:
for chunk in chunked_uris: for chunk in chunked_uris:
resp = self.get_request(method='getTracks', url='tracks', params={'ids': ','.join([i.object_id for i in chunk])}) resp = self.get_request(method='getTracks', url='tracks', params={'ids': ','.join([i.object_id for i in chunk])})
if resp: if resp:
tracks += [self.parse_track(i) for i in resp.get('tracks', [])] tracks += [TrackFull(**i) for i in resp.get('tracks', [])]
return tracks return tracks
def get_track(self, uri: Uri = None, uri_string: str = None) -> Optional[SpotifyTrack]: def get_track(self, uri: Uri = None, uri_string: str = None) -> Optional[TrackFull]:
if uri is None and uri_string is None: if uri is None and uri_string is None:
raise NameError('no uri provided') raise NameError('no uri provided')
@ -918,7 +914,7 @@ class Network:
else: else:
return None return None
def get_albums(self, uris: List[Uri] = None, uri_strings: List[str] = None) -> List[SpotifyAlbum]: def get_albums(self, uris: List[Uri] = None, uri_strings: List[str] = None) -> List[AlbumFull]:
if uris is None and uri_strings is None: if uris is None and uri_strings is None:
raise NameError('no uris provided') raise NameError('no uris provided')
@ -936,11 +932,11 @@ class Network:
for chunk in chunked_uris: for chunk in chunked_uris:
resp = self.get_request(method='getAlbums', url='albums', params={'ids': ','.join([i.object_id for i in chunk])}) resp = self.get_request(method='getAlbums', url='albums', params={'ids': ','.join([i.object_id for i in chunk])})
if resp: if resp:
albums += [self.parse_album(i) for i in resp.get('albums', [])] albums += [AlbumFull(**i) for i in resp.get('albums', [])]
return albums return albums
def get_album(self, uri: Uri = None, uri_string: str = None) -> Optional[SpotifyAlbum]: def get_album(self, uri: Uri = None, uri_string: str = None) -> Optional[AlbumFull]:
if uri is None and uri_string is None: if uri is None and uri_string is None:
raise NameError('no uri provided') raise NameError('no uri provided')
@ -954,7 +950,7 @@ class Network:
else: else:
return None return None
def get_artists(self, uris: List[Uri] = None, uri_strings: List[str] = None) -> List[SpotifyArtist]: def get_artists(self, uris: List[Uri] = None, uri_strings: List[str] = None) -> List[ArtistFull]:
if uris is None and uri_strings is None: if uris is None and uri_strings is None:
raise NameError('no uris provided') raise NameError('no uris provided')
@ -972,11 +968,11 @@ class Network:
for chunk in chunked_uris: for chunk in chunked_uris:
resp = self.get_request(method='getArtists', url='artists', params={'ids': ','.join([i.object_id for i in chunk])}) resp = self.get_request(method='getArtists', url='artists', params={'ids': ','.join([i.object_id for i in chunk])})
if resp: if resp:
artists += [self.parse_artist(i) for i in resp.get('artists', [])] artists += [ArtistFull(**i) for i in resp.get('artists', [])]
return artists return artists
def get_artist(self, uri: Uri = None, uri_string: str = None) -> Optional[SpotifyArtist]: def get_artist(self, uri: Uri = None, uri_string: str = None) -> Optional[ArtistFull]:
if uri is None and uri_string is None: if uri is None and uri_string is None:
raise NameError('no uri provided') raise NameError('no uri provided')
@ -1022,321 +1018,13 @@ class Network:
resp = self.get_request(method='search', url='search', params=params) resp = self.get_request(method='search', url='search', params=params)
albums = [self.parse_album(i) for i in resp.get('albums', {}).get('items', [])] albums = [SimplifiedAlbum(**i) for i in resp.get('albums', {}).get('items', [])]
artists = [self.parse_artist(i) for i in resp.get('artists', {}).get('items', [])] artists = [ArtistFull(**i) for i in resp.get('artists', {}).get('items', [])]
tracks = [self.parse_track(i) for i in resp.get('tracks', {}).get('items', [])] tracks = [TrackFull(**i) for i in resp.get('tracks', {}).get('items', [])]
playlists = [self.parse_playlist(i) for i in resp.get('playlists', {}).get('items', [])] playlists = [SimplifiedPlaylist(**i) for i in resp.get('playlists', {}).get('items', [])]
return SearchResponse(tracks=tracks, albums=albums, artists=artists, playlists=playlists) return SearchResponse(tracks=tracks, albums=albums, artists=artists, playlists=playlists)
@staticmethod
def parse_artist(artist_dict) -> SpotifyArtist:
name = artist_dict.get('name', None)
href = artist_dict.get('href', None)
uri = artist_dict.get('uri', None)
genres = artist_dict.get('genres', None)
popularity = artist_dict.get('popularity', None)
if name is None:
raise KeyError('artist name not found')
return SpotifyArtist(name,
href=href,
uri=uri,
genres=genres,
popularity=popularity)
def parse_album(self, album_dict) -> Union[SpotifyAlbum, LibraryAlbum]:
if 'album' in album_dict:
album = album_dict.get('album', None)
else:
album = album_dict
name = album.get('name', None)
if name is None:
raise KeyError('album name not found')
artists = [self.parse_artist(i) for i in album.get('artists', [])]
if album.get("album_type") is not None:
album_type = SpotifyAlbum.Type[album.get('album_type').lower()]
else:
album_type = SpotifyAlbum.Type.single
href = album.get('href', None)
uri = album.get('uri', None)
genres = album.get('genres', None)
if album.get('tracks'):
if 'next' in album['tracks']:
track_pager = PageCollection(net=self, page=album['tracks'])
track_pager.continue_iteration()
tracks = [self.parse_track(i) for i in track_pager.items]
else:
tracks = [self.parse_track(i) for i in album.get('tracks', [])]
else:
tracks = []
release_date = album.get('release_date', None)
release_date_precision = album.get('release_date_precision', None)
label = album.get('label', None)
popularity = album.get('popularity', None)
added_at = album_dict.get('added_at', None)
if added_at:
added_at = datetime.datetime.strptime(added_at, '%Y-%m-%dT%H:%M:%S%z')
if added_at:
return LibraryAlbum(name=name,
artists=artists,
album_type=album_type,
href=href,
uri=uri,
genres=genres,
tracks=tracks,
release_date=release_date,
release_date_precision=release_date_precision,
label=label,
popularity=popularity,
added_at=added_at)
else:
return SpotifyAlbum(name=name,
artists=artists,
album_type=album_type,
href=href,
uri=uri,
genres=genres,
tracks=tracks,
release_date=release_date,
release_date_precision=release_date_precision,
label=label,
popularity=popularity)
def parse_track(self, track_dict) -> Union[Track, SpotifyTrack, PlaylistTrack, PlayedTrack, LibraryTrack]:
if 'track' in track_dict:
track = track_dict.get('track', None)
else:
track = track_dict
name = track.get('name', None)
if name is None:
raise KeyError('track name not found')
if track.get('album', None):
album = self.parse_album(track['album'])
else:
album = None
artists = [self.parse_artist(i) for i in track.get('artists', [])]
href = track.get('href', None)
uri = track.get('uri', None)
disc_number = track.get('disc_number', None)
track_number = track.get('track_number', None)
duration_ms = track.get('duration_ms', None)
explicit = track.get('explicit', None)
is_playable = track.get('is_playable', None)
popularity = track.get('popularity', None)
added_by = self.parse_user(track_dict.get('added_by')) if track_dict.get('added_by', None) else None
added_at = track_dict.get('added_at', None)
if added_at:
added_at = datetime.datetime.strptime(added_at, '%Y-%m-%dT%H:%M:%S%z')
is_local = track_dict.get('is_local', None)
played_at = track_dict.get('played_at', None)
if played_at:
played_at = datetime.datetime.strptime(played_at, '%Y-%m-%dT%H:%M:%S.%f%z')
context = track_dict.get('context', None)
if context:
context = self.parse_context(context)
if added_by or is_local:
return PlaylistTrack(name=name,
album=album,
artists=artists,
added_at=added_at,
added_by=added_by,
is_local=is_local,
href=href,
uri=uri,
disc_number=disc_number,
track_number=track_number,
duration_ms=duration_ms,
explicit=explicit,
is_playable=is_playable,
popularity=popularity)
elif added_at:
return LibraryTrack(name=name,
album=album,
artists=artists,
href=href,
uri=uri,
disc_number=disc_number,
track_number=track_number,
duration_ms=duration_ms,
explicit=explicit,
is_playable=is_playable,
popularity=popularity,
added_at=added_at)
elif played_at or context:
return PlayedTrack(name=name,
album=album,
artists=artists,
href=href,
uri=uri,
disc_number=disc_number,
track_number=track_number,
duration_ms=duration_ms,
explicit=explicit,
is_playable=is_playable,
popularity=popularity,
played_at=played_at,
context=context)
else:
return SpotifyTrack(name=name,
album=album,
artists=artists,
href=href,
uri=uri,
disc_number=disc_number,
track_number=track_number,
duration_ms=duration_ms,
explicit=explicit,
is_playable=is_playable,
popularity=popularity)
@staticmethod
def parse_user(user_dict) -> User:
display_name = user_dict.get('display_name', None)
spotify_id = user_dict.get('id', None)
href = user_dict.get('href', None)
uri = user_dict.get('uri', None)
return User(spotify_id,
href=href,
uri=uri,
display_name=display_name)
def parse_playlist(self, playlist_dict) -> SpotifyPlaylist:
collaborative = playlist_dict.get('collaborative', None)
ext_spotify = None
if playlist_dict.get('external_urls', None):
if playlist_dict['external_urls'].get('spotify', None):
ext_spotify = playlist_dict['external_urls']['spotify']
href = playlist_dict.get('href', None)
description = playlist_dict.get('description', None)
name = playlist_dict.get('name', None)
if playlist_dict.get('owner', None):
owner = self.parse_user(playlist_dict.get('owner'))
else:
owner = None
public = playlist_dict.get('public', None)
uri = playlist_dict.get('uri', None)
images = playlist_dict.get('images', [])
images.sort(key=lambda x: x.get('height', 0))
images = [i.get('url') for i in images]
return SpotifyPlaylist(uri=uri,
name=name,
owner=owner,
description=description,
href=href,
collaborative=collaborative,
public=public,
ext_spotify=ext_spotify,
images=images)
@staticmethod
def parse_context(context_dict) -> Context:
return Context(object_type=context_dict['type'],
href=context_dict['href'],
external_spot=context_dict['external_urls']['spotify'],
uri=context_dict['uri'])
def parse_currently_playing(self, play_dict) -> CurrentlyPlaying:
return CurrentlyPlaying(
context=self.parse_context(play_dict['context']) if play_dict['context'] is not None else None,
timestamp=datetime.datetime.fromtimestamp(play_dict['timestamp'] / 1000),
progress_ms=play_dict['progress_ms'],
is_playing=play_dict['is_playing'],
track=self.parse_track(play_dict['item']),
device=self.parse_device(play_dict['device']),
shuffle=play_dict['shuffle_state'],
repeat=play_dict['repeat_state'],
currently_playing_type=play_dict['currently_playing_type'])
@staticmethod
def parse_device(device_dict) -> Device:
return Device(device_id=device_dict['id'],
is_active=device_dict['is_active'],
is_private_session=device_dict['is_private_session'],
is_restricted=device_dict['is_restricted'],
name=device_dict['name'],
object_type=Device.DeviceType[device_dict['type'].upper()],
volume=device_dict['volume_percent'])
@staticmethod
def parse_audio_features(feature_dict) -> AudioFeatures:
return AudioFeatures(acousticness=feature_dict['acousticness'],
analysis_url=feature_dict['analysis_url'],
danceability=feature_dict['danceability'],
duration_ms=feature_dict['duration_ms'],
energy=feature_dict['energy'],
uri=Uri(feature_dict['uri']),
instrumentalness=feature_dict['instrumentalness'],
key=feature_dict['key'],
liveness=feature_dict['liveness'],
loudness=feature_dict['loudness'],
mode=feature_dict['mode'],
speechiness=feature_dict['speechiness'],
tempo=feature_dict['tempo'],
time_signature=feature_dict['time_signature'],
track_href=feature_dict['track_href'],
valence=feature_dict['valence'])
@staticmethod @staticmethod
def chunk(l, n): def chunk(l, n):
for i in range(0, len(l), n): for i in range(0, len(l), n):

View File

@ -1,37 +1,34 @@
from __future__ import annotations from __future__ import annotations
import requests import requests
from spotframework.model.user import User from spotframework.model.user import PublicUser
from spotframework.util.console import Color from spotframework.util.console import Color
from dataclasses import dataclass, field
from base64 import b64encode from base64 import b64encode
import logging import logging
import time import time
from typing import Optional from typing import Optional, List
from datetime import datetime, timezone from datetime import datetime, timezone
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class NetworkUser(User): @dataclass
class NetworkUser:
def __init__(self, client_id, client_secret, refresh_token, access_token=None): access_token: str
super().__init__(None) refresh_token: str
self.access_token = access_token client_id: str
self.refresh_token = refresh_token client_secret: str
self.client_id = client_id user: PublicUser = field(default=None, init=False)
self.client_secret = client_secret
self.last_refreshed = None last_refreshed: datetime = field(default=None, init=False)
self.token_expiry = None token_expiry: datetime = field(default=None, init=False)
self.on_refresh = [] on_refresh: List = field(default_factory=list, init=False)
self.refresh_counter = 0 refresh_counter: int = field(default=0, init=False)
def __repr__(self):
return Color.RED + Color.BOLD + 'NetworkUser' + Color.END + \
f': {self.username}, {self.display_name}, {self.uri}'
def refresh_access_token(self) -> NetworkUser: def refresh_access_token(self) -> NetworkUser:
@ -70,7 +67,7 @@ class NetworkUser(User):
if retry_after: if retry_after:
logger.warning(f'rate limit reached: retrying in {retry_after} seconds') logger.warning(f'rate limit reached: retrying in {retry_after} seconds')
time.sleep(int(retry_after) + 1) time.sleep(int(retry_after) + 1)
return self.refresh_token() return self.refresh_access_token()
else: else:
logger.error('rate limit reached: cannot find Retry-After header') logger.error('rate limit reached: cannot find Retry-After header')
@ -82,23 +79,7 @@ class NetworkUser(User):
return self return self
def refresh_info(self) -> None: def refresh_info(self) -> None:
info = self.get_info() self.user = PublicUser(**self.get_info())
if info.get('display_name', None):
self.display_name = info['display_name']
if info.get('external_urls', None):
if info['external_urls'].get('spotify', None):
self.ext_spotify = info['external_urls']['spotify']
if info.get('href', None):
self.href = info['href']
if info.get('id', None):
self.username = info['id']
if info.get('uri', None):
self.uri = info['uri']
def get_info(self) -> Optional[dict]: def get_info(self) -> Optional[dict]:
@ -123,7 +104,7 @@ class NetworkUser(User):
elif req.status_code == 401: elif req.status_code == 401:
logger.warning('access token expired, refreshing') logger.warning('access token expired, refreshing')
self.refresh_token() self.refresh_access_token()
if self.refresh_counter < 5: if self.refresh_counter < 5:
self.refresh_counter += 1 self.refresh_counter += 1
return self.get_info() return self.get_info()

View File

@ -1,8 +1,8 @@
from spotframework.net.network import Network from spotframework.net.network import Network
from spotframework.model.track import SpotifyTrack from spotframework.model.track import SimplifiedTrack, Context, Device
from spotframework.model.album import SpotifyAlbum from spotframework.model.album import AlbumFull
from spotframework.model.playlist import SpotifyPlaylist from spotframework.model.playlist import FullPlaylist
from spotframework.model.service import Context, Device from spotframework.model.uri import Uri
from typing import List, Union from typing import List, Union
import logging import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -16,7 +16,7 @@ class Player:
self.last_status = None self.last_status = None
def __str__(self): def __str__(self):
return f'{self.net.user.username} - {self.status}' return f'{self.net.user.user.display_name} - {self.status}'
def __repr__(self): def __repr__(self):
return f'Player: {self.net.user} - {self.status}' return f'Player: {self.net.user} - {self.status}'
@ -33,8 +33,9 @@ class Player:
return self.last_status return self.last_status
def play(self, def play(self,
context: Union[Context, SpotifyAlbum, SpotifyPlaylist] = None, context: Union[Context, AlbumFull, FullPlaylist] = None,
tracks: List[SpotifyTrack] = None, tracks: List[SimplifiedTrack] = None,
uris: List = None,
device: Device = None, device: Device = None,
device_name: str = None): device_name: str = None):
if device_name: if device_name:
@ -42,23 +43,30 @@ class Player:
if searched_device: if searched_device:
device = searched_device device = searched_device
if context and tracks: if context and (tracks or uris):
raise Exception('cant execute context and track list') raise Exception('cant execute context and track list')
if context: if context:
if device: if device:
self.net.play(uri=context.uri, deviceid=device.device_id) self.net.play(uri=context.uri, deviceid=device.id)
else: else:
self.net.play(uri=context.uri) self.net.play(uri=context.uri)
elif tracks: elif tracks or uris:
if tracks is None:
tracks = []
if uris is None:
uris = []
if device: if device:
self.net.play(uris=[i.uri for i in tracks], deviceid=device.device_id) self.net.play(uris=[i.uri for i in tracks] + uris, deviceid=device.id)
else: else:
self.net.play(uris=[i.uri for i in tracks]) self.net.play(uris=[i.uri for i in tracks] + uris)
else: else:
self.net.play() self.net.play()
def change_device(self, device: Device): def change_device(self, device: Device):
self.net.change_playback_device(device.device_id) self.net.change_playback_device(device.id)
def pause(self): def pause(self):
self.net.pause() self.net.pause()
@ -88,7 +96,7 @@ class Player:
raise TypeError(f'{state} is not bool') raise TypeError(f'{state} is not bool')
else: else:
status = self.status status = self.status
if status.shuffle: if status.shuffle_state:
self.shuffle(state=False) self.shuffle(state=False)
else: else:
self.shuffle(state=True) self.shuffle(state=True)
@ -97,7 +105,7 @@ class Player:
if 0 <= int(value) <= 100: if 0 <= int(value) <= 100:
if device: if device:
self.net.set_volume(value, deviceid=device.device_id) self.net.set_volume(value, deviceid=device.id)
else: else:
self.net.set_volume(value) self.net.set_volume(value)
else: else:

View File

@ -1,4 +1,5 @@
import math import math
from spotframework.model.uri import Uri
def convert_ms_to_minute_string(ms): def convert_ms_to_minute_string(ms):
@ -6,3 +7,11 @@ def convert_ms_to_minute_string(ms):
minutes = math.floor(seconds / 60) minutes = math.floor(seconds / 60)
return f'{minutes}:{math.floor(seconds%60)}' return f'{minutes}:{math.floor(seconds%60)}'
def validate_uri_string(uri_string: str):
try:
uri = Uri(uri_string)
return uri
except ValueError:
return False