diff --git a/spotframework/engine/processor/audio_features.py b/spotframework/engine/processor/audio_features.py new file mode 100644 index 0000000..a0556ae --- /dev/null +++ b/spotframework/engine/processor/audio_features.py @@ -0,0 +1,91 @@ +from spotframework.engine.processor.abstract import BatchSingleProcessor +from abc import ABC, abstractmethod +from typing import List +from spotframework.model.track import SpotifyTrack +from spotframework.model.uri import Uri + + +class AudioFeaturesProcessor(BatchSingleProcessor, ABC): + + def __init__(self, + names: List[str] = None, + uris: List[Uri] = None, + append_malformed: bool = True): + super().__init__(names=names, + uris=uris) + self.append_malformed = append_malformed + + def process(self, tracks: List[SpotifyTrack]) -> List[SpotifyTrack]: + + return_tracks = [] + malformed_tracks = [] + + for track in tracks: + + if isinstance(track, SpotifyTrack) and track.audio_features is not None: + return_tracks.append(track) + else: + malformed_tracks.append(track) + + return_tracks = super().process(return_tracks) + + if self.append_malformed: + return_tracks += malformed_tracks + + return return_tracks + + +class FloatFilter(AudioFeaturesProcessor, ABC): + + def __init__(self, + names: List[str] = None, + uris: List[Uri] = None, + append_malformed: bool = True, + boundary: float = None, + greater_than: bool = True): + super().__init__(names=names, + uris=uris, + append_malformed=append_malformed) + self.boundary = boundary + self.greater_than = greater_than + + @abstractmethod + def get_variable_value(self, track: SpotifyTrack) -> float: + pass + + def process_single(self, track: SpotifyTrack): + if self.greater_than: + if self.get_variable_value(track) > self.boundary: + return track + else: + return None + else: + if self.get_variable_value(track) < self.boundary: + return track + else: + return None + + +class EnergyFilter(FloatFilter): + def get_variable_value(self, track: SpotifyTrack) -> float: + return track.audio_features.energy + + +class ValenceFilter(FloatFilter): + def get_variable_value(self, track: SpotifyTrack) -> float: + return track.audio_features.valence + + +class TempoFilter(FloatFilter): + def get_variable_value(self, track: SpotifyTrack) -> float: + return track.audio_features.tempo + + +class DanceabilityFilter(FloatFilter): + def get_variable_value(self, track: SpotifyTrack) -> float: + return track.audio_features.danceability + + +class AcousticnessFilter(FloatFilter): + def get_variable_value(self, track: SpotifyTrack) -> float: + return track.audio_features.acousticness diff --git a/spotframework/model/album.py b/spotframework/model/album.py index 54843c2..4986f4c 100644 --- a/spotframework/model/album.py +++ b/spotframework/model/album.py @@ -30,6 +30,11 @@ class Album: return Color.DARKCYAN + Color.BOLD + 'Album' + Color.END + \ f': {self.name}, [{self.artists}]' + @staticmethod + def wrap(name: str = None, + artists: Union[str, List[str]] = None): + return Album(name=name, artists=[Artist(i) for i in artists]) + class SpotifyAlbum(Album): def __init__(self, @@ -56,6 +61,9 @@ class SpotifyAlbum(Album): else: self.uri = uri + if self.uri.object_type != Uri.ObjectType.album: + raise TypeError('provided uri not for an album') + self.genres = genres self.tracks = tracks @@ -69,6 +77,16 @@ class SpotifyAlbum(Album): return Color.DARKCYAN + Color.BOLD + 'SpotifyAlbum' + Color.END + \ f': {self.name}, {self.artists}, {self.uri}, {self.tracks}' + @staticmethod + def wrap(uri: Uri = None, + name: str = None, + artists: Union[str, List[str]] = None): + + if uri: + return SpotifyAlbum(name=None, artists=None, uri=uri) + else: + return super().wrap(name=name, artists=artists) + class LibraryAlbum(SpotifyAlbum): def __init__(self, diff --git a/spotframework/model/artist.py b/spotframework/model/artist.py index a0cd859..db9c51c 100644 --- a/spotframework/model/artist.py +++ b/spotframework/model/artist.py @@ -34,6 +34,9 @@ class SpotifyArtist(Artist): else: self.uri = uri + if self.uri.object_type != Uri.ObjectType.artist: + raise TypeError('provided uri not for an artist') + self.genres = genres self.popularity = popularity @@ -41,3 +44,7 @@ class SpotifyArtist(Artist): def __repr__(self): return Color.PURPLE + Color.BOLD + 'SpotifyArtist' + Color.END + \ f': {self.name}, {self.uri}' + + @staticmethod + def wrap(uri: Uri): + return SpotifyArtist(name=None, uri=uri) diff --git a/spotframework/model/playlist.py b/spotframework/model/playlist.py index 364ad0c..4d4678c 100644 --- a/spotframework/model/playlist.py +++ b/spotframework/model/playlist.py @@ -142,6 +142,9 @@ class SpotifyPlaylist(Playlist): else: self.uri = uri + if self.uri.object_type != Uri.ObjectType.playlist: + raise TypeError('provided uri not for a playlist') + self.collaborative = collaborative self.public = public self.ext_spotify = ext_spotify diff --git a/spotframework/model/track.py b/spotframework/model/track.py index e03e1b3..fd305ec 100644 --- a/spotframework/model/track.py +++ b/spotframework/model/track.py @@ -4,6 +4,8 @@ from typing import List, Union from datetime import datetime from spotframework.model.uri import Uri from spotframework.util.console import Color +from spotframework.util import convert_ms_to_minute_string +from enum import Enum if TYPE_CHECKING: from spotframework.model.album import Album from spotframework.model.artist import Artist @@ -51,6 +53,15 @@ class Track: return Color.YELLOW + Color.BOLD + 'Track' + Color.END + \ f': {self.name}, ({self.album}), {self.artists}' + @staticmethod + def wrap(name: str = None, + artists: List[str] = None, + album: str = None, + album_artists: List[str] = None): + return Track(name=name, + album=Album.wrap(name=album, artists=album_artists), + artists=[Artist(i) for i in artists]) + class SpotifyTrack(Track): def __init__(self, @@ -66,7 +77,9 @@ class SpotifyTrack(Track): explicit: bool = None, is_playable: bool = None, - popularity: int = None + popularity: int = None, + + audio_features: AudioFeatures = None ): super().__init__(name=name, album=album, artists=artists, disc_number=disc_number, @@ -78,17 +91,35 @@ class SpotifyTrack(Track): self.uri = Uri(uri) else: self.uri = uri + + if self.uri.object_type != Uri.ObjectType.track: + raise TypeError('provided uri not for a track') + self.is_playable = is_playable self.popularity = popularity + self.audio_features = audio_features + def __repr__(self): - return Color.BOLD + Color.YELLOW + 'SpotifyTrack' + Color.END + \ + string = Color.BOLD + Color.YELLOW + 'SpotifyTrack' + Color.END + \ f': {self.name}, ({self.album}), {self.artists}, {self.uri}' + if self.audio_features is not None: + string += ' ' + repr(self.audio_features) + + return string + @staticmethod - def get_uri_shell(uri): - return SpotifyTrack(name=None, album=None, artists=None, uri=uri) + def wrap(uri: Uri = None, + name: str = None, + artists: Union[str, List[str]] = None, + album: str = None, + album_artists: Union[str, List[str]] = None): + if uri: + return SpotifyTrack(name=None, album=None, artists=None, uri=uri) + else: + return super().wrap(name=name, artists=artists, album=album, album_artists=album_artists) class LibraryTrack(SpotifyTrack): @@ -107,6 +138,8 @@ class LibraryTrack(SpotifyTrack): popularity: int = None, + audio_features: AudioFeatures = None, + added_at: datetime = None ): super().__init__(name=name, album=album, artists=artists, @@ -117,14 +150,20 @@ class LibraryTrack(SpotifyTrack): duration_ms=duration_ms, explicit=explicit, is_playable=is_playable, - popularity=popularity) + popularity=popularity, + audio_features=audio_features) self.added_at = added_at def __repr__(self): - return Color.BOLD + Color.YELLOW + 'LibraryTrack' + Color.END + \ + string = Color.BOLD + Color.YELLOW + 'LibraryTrack' + Color.END + \ f': {self.name}, ({self.album}), {self.artists}, {self.uri}, {self.added_at}' + if self.audio_features is not None: + string += ' ' + repr(self.audio_features) + + return string + class PlaylistTrack(SpotifyTrack): def __init__(self, @@ -144,7 +183,9 @@ class PlaylistTrack(SpotifyTrack): explicit: bool = None, is_playable: bool = None, - popularity: int = None + popularity: int = None, + + audio_features: AudioFeatures = None ): super().__init__(name=name, album=album, artists=artists, href=href, @@ -154,16 +195,22 @@ class PlaylistTrack(SpotifyTrack): duration_ms=duration_ms, explicit=explicit, is_playable=is_playable, - popularity=popularity) + popularity=popularity, + audio_features=audio_features) self.added_at = added_at self.added_by = added_by self.is_local = is_local def __repr__(self): - return Color.BOLD + Color.YELLOW + 'PlaylistTrack' + Color.END + \ + string = Color.BOLD + Color.YELLOW + 'PlaylistTrack' + Color.END + \ f': {self.name}, ({self.album}), {self.artists}, {self.uri}, {self.added_at}' + if self.audio_features is not None: + string += ' ' + repr(self.audio_features) + + return string + class PlayedTrack(SpotifyTrack): def __init__(self, @@ -181,6 +228,8 @@ class PlayedTrack(SpotifyTrack): popularity: int = None, + audio_features: AudioFeatures = None, + played_at: datetime = None, context: Context = None ): @@ -192,10 +241,141 @@ class PlayedTrack(SpotifyTrack): duration_ms=duration_ms, explicit=explicit, is_playable=is_playable, - popularity=popularity) + popularity=popularity, + audio_features=audio_features) self.played_at = played_at self.context = context def __repr__(self): - return Color.BOLD + Color.YELLOW + 'PlayedTrack' + Color.END + \ + string = Color.BOLD + Color.YELLOW + 'PlayedTrack' + Color.END + \ f': {self.name}, ({self.album}), {self.artists}, {self.uri}, {self.played_at}' + + if self.audio_features is not None: + string += ' ' + repr(self.audio_features) + + return string + + +class AudioFeatures: + + class Mode(Enum): + MINOR = 0 + MAJOR = 1 + + def __init__(self, + acousticness: float, + analysis_url: str, + danceability: float, + duration_ms: int, + energy: float, + uri: Uri, + instrumentalness: float, + key: int, + liveness: float, + loudness: float, + mode: int, + speechiness: float, + tempo: float, + time_signature: int, + track_href: str, + valence: float): + self.acousticness = self.check_float(acousticness) + self.analysis_url = analysis_url + self.danceability = self.check_float(danceability) + self.duration_ms = duration_ms + self.energy = self.check_float(energy) + self.uri = uri + self.instrumentalness = self.check_float(instrumentalness) + self._key = key + self.liveness = self.check_float(liveness) + self.loudness = loudness + + if mode == 0: + self.mode = self.Mode.MINOR + elif mode == 1: + self.mode = self.Mode.MAJOR + else: + raise ValueError('illegal value for mode') + self.speechiness = self.check_float(speechiness) + self.tempo = tempo + self.time_signature = time_signature + self.track_href = track_href + self.valence = self.check_float(valence) + + @property + def key(self) -> str: + legend = { + 0: 'C', + 1: 'C#', + 2: 'D', + 3: 'D#', + 4: 'E', + 5: 'F', + 6: 'F#', + 7: 'G', + 8: 'G#', + 9: 'A', + 10: 'A#', + 11: 'B' + } + if legend.get(self._key, None): + return legend.get(self._key, None) + else: + raise ValueError('key value out of bounds') + + @key.setter + def key(self, value): + if isinstance(value, int): + if 0 <= value <= 11: + self._key = value + else: + raise ValueError('key value out of bounds') + else: + raise ValueError('key value not integer') + + def is_live(self): + if self.liveness is not None: + if self.liveness > 0.8: + return True + else: + return False + else: + raise ValueError('no value for liveness') + + def is_instrumental(self): + if self.instrumentalness is not None: + if self.instrumentalness > 0.5: + return True + else: + return False + else: + raise ValueError('no value for instrumentalness') + + def is_spoken_word(self): + if self.speechiness is not None: + if self.speechiness > 0.66: + return True + else: + return False + else: + raise ValueError('no value for speechiness') + + @staticmethod + def check_float(value): + value = float(value) + + if isinstance(value, float): + if 0 <= value <= 1: + return value + else: + raise ValueError(f'value {value} out of bounds') + else: + raise ValueError(f'value {value} is not float') + + def __repr__(self): + return Color.BOLD + Color.DARKCYAN + 'AudioFeatures' + Color.END + \ + f': acoustic:{self.acousticness}, dance:{self.danceability}, ' \ + f'duration:{convert_ms_to_minute_string(self.duration_ms)}, energy:{self.energy}, ' \ + f'instrumental:{self.instrumentalness}, key:{self.key}, live:{self.liveness}, ' \ + f'volume:{self.loudness}db, mode:{self.mode.name}, speech:{self.speechiness}, tempo:{self.tempo}, ' \ + f'time_sig:{self.time_signature}, valence:{self.valence}' diff --git a/spotframework/net/network.py b/spotframework/net/network.py index 032213a..a9eee11 100644 --- a/spotframework/net/network.py +++ b/spotframework/net/network.py @@ -10,7 +10,7 @@ from spotframework.model.user import User from . import const from spotframework.net.user import NetworkUser from spotframework.model.playlist import SpotifyPlaylist -from spotframework.model.track import Track, SpotifyTrack, PlaylistTrack, PlayedTrack, LibraryTrack +from spotframework.model.track import Track, SpotifyTrack, PlaylistTrack, PlayedTrack, LibraryTrack, AudioFeatures from spotframework.model.album import LibraryAlbum, SpotifyAlbum from spotframework.model.service import CurrentlyPlaying, Device, Context from spotframework.model.uri import Uri @@ -139,6 +139,7 @@ class Network: return None def get_playlist(self, uri: Uri) -> Optional[SpotifyPlaylist]: + """get playlist object with tracks for uri""" logger.info(f"{uri}") @@ -147,7 +148,7 @@ class Network: if tracks is not None: playlist = SpotifyPlaylist(uri) - playlist.tracks += tracks + playlist += tracks return playlist else: @@ -160,6 +161,7 @@ class Network: public: bool = True, collaborative: bool = False, description: bool = None) -> Optional[SpotifyPlaylist]: + """create playlist for user""" json = {"name": name, "public": public, "collaborative": collaborative} @@ -175,6 +177,7 @@ class Network: return None def get_playlists(self, response_limit: int = None) -> Optional[List[SpotifyPlaylist]]: + """get current users playlists""" logger.info(f"loading") @@ -188,6 +191,7 @@ class Network: return return_items def get_library_albums(self, response_limit: int = None) -> Optional[List[LibraryAlbum]]: + """get user library albums""" logger.info(f"loading") @@ -200,7 +204,8 @@ class Network: return return_items - def get_library_tracks(self, response_limit: int = None) -> Optional[List[LibraryAlbum]]: + def get_library_tracks(self, response_limit: int = None) -> Optional[List[LibraryTrack]]: + """get user library tracks""" logger.info(f"loading") @@ -214,6 +219,7 @@ class Network: return return_items def get_user_playlists(self) -> Optional[List[SpotifyPlaylist]]: + """filter user playlists for those that were user created""" logger.info('retrieved') @@ -226,6 +232,7 @@ class Network: return None def get_playlist_tracks(self, uri: Uri, response_limit: int = None) -> List[PlaylistTrack]: + """get list of playlists tracks for uri""" logger.info(f"loading") @@ -239,6 +246,7 @@ class Network: return return_items def get_available_devices(self) -> Optional[List[Device]]: + """get users available devices""" logger.info("retrieving") @@ -253,6 +261,8 @@ class Network: response_limit: int = None, after: datetime.datetime = None, before: datetime.datetime = None) -> Optional[List[PlayedTrack]]: + """get list of recently played tracks""" + logger.info("retrieving") params = dict() @@ -279,6 +289,7 @@ class Network: return None def get_player(self) -> Optional[CurrentlyPlaying]: + """get currently playing snapshot (player)""" logger.info("retrieved") @@ -289,21 +300,23 @@ class Network: logger.info('no player returned') return None - def get_device_id(self, devicename: str) -> Optional[str]: + def get_device_id(self, device_name: str) -> Optional[str]: + """return device id of device as searched for by name""" - logger.info(f"{devicename}") + logger.info(f"{device_name}") devices = self.get_available_devices() if devices: - device = next((i for i in devices if i.name == devicename), None) + device = next((i for i in devices if i.name == device_name), None) if device: return device.device_id else: - logger.error(f'{devicename} not found') + logger.error(f'{device_name} not found') else: logger.error('no devices returned') def change_playback_device(self, device_id: str): + """migrate playback to different device""" logger.info(device_id) @@ -319,6 +332,7 @@ class Network: return None def play(self, uri: Uri = None, uris: List[Uri] = None, deviceid: str = None) -> Optional[Response]: + """begin playback""" logger.info(f"{uri}{' ' + deviceid if deviceid is not None else ''}") @@ -344,6 +358,7 @@ class Network: logger.error('error playing') def pause(self, deviceid: str = None) -> Optional[Response]: + """pause playback""" logger.info(f"{deviceid if deviceid is not None else ''}") @@ -359,6 +374,7 @@ class Network: logger.error('error pausing') def next(self, deviceid: str = None) -> Optional[Response]: + """skip track playback""" logger.info(f"{deviceid if deviceid is not None else ''}") @@ -374,6 +390,7 @@ class Network: logger.error('error skipping') def previous(self, deviceid: str = None) -> Optional[Response]: + """skip playback backwards""" logger.info(f"{deviceid if deviceid is not None else ''}") @@ -590,6 +607,59 @@ class Network: else: logger.error('error reordering playlist') + def get_track_audio_features(self, uris: List[Uri]): + logger.info(f'ids: {len(uris)}') + + audio_features = [] + chunked_uris = list(self.chunk(uris, 100)) + for chunk in chunked_uris: + resp = self.make_get_request('getAudioFeatures', + url='audio-features', + params={'ids': ','.join(i.object_id for i in chunk)}) + + if resp: + if resp.get('audio_features', None): + parsed_features = [self.parse_audio_features(i) for i in resp['audio_features']] + audio_features += parsed_features + else: + logger.error('no audio features included') + else: + logger.error('no response') + + if len(audio_features) == len(uris): + return audio_features + else: + logger.error('mismatched length of input and response') + + def populate_track_audio_features(self, tracks=Union[SpotifyTrack, List[SpotifyTrack]]): + logger.info(f'populating') + + if isinstance(tracks, SpotifyTrack): + audio_features = self.get_track_audio_features([tracks.uri]) + + if audio_features: + if len(audio_features) == 1: + tracks.audio_features = audio_features[0] + return tracks + else: + logger.error(f'{len(audio_features)} features returned') + else: + logger.error(f'no audio features returned for {tracks.uri}') + + elif isinstance(tracks, List): + if all(isinstance(i, SpotifyTrack) for i in tracks): + audio_features = self.get_track_audio_features([i.uri for i in tracks]) + + if audio_features: + for index, track in enumerate(tracks): + track.audio_features = audio_features[index] + + return tracks + else: + logger.error(f'no audio features returned') + else: + raise TypeError('must provide either single or list of spotify tracks') + @staticmethod def parse_artist(artist_dict) -> SpotifyArtist: @@ -683,7 +753,7 @@ class Network: label=label, popularity=popularity) - def parse_track(self, track_dict) -> Union[Track, SpotifyTrack, PlaylistTrack, PlayedTrack]: + def parse_track(self, track_dict) -> Union[Track, SpotifyTrack, PlaylistTrack, PlayedTrack, LibraryTrack]: if 'track' in track_dict: track = track_dict.get('track', None) @@ -861,6 +931,30 @@ class Network: object_type=Device.DeviceType[device_dict['type'].upper()], volume=device_dict['volume_percent']) + @staticmethod + def parse_audio_features(feature_dict) -> AudioFeatures: + return AudioFeatures(acousticness=feature_dict['acousticness'], + analysis_url=feature_dict['analysis_url'], + danceability=feature_dict['danceability'], + duration_ms=feature_dict['duration_ms'], + energy=feature_dict['energy'], + uri=Uri(feature_dict['uri']), + instrumentalness=feature_dict['instrumentalness'], + key=feature_dict['key'], + liveness=feature_dict['liveness'], + loudness=feature_dict['loudness'], + mode=feature_dict['mode'], + speechiness=feature_dict['speechiness'], + tempo=feature_dict['tempo'], + time_signature=feature_dict['time_signature'], + track_href=feature_dict['track_href'], + valence=feature_dict['valence']) + + @staticmethod + def chunk(l, n): + for i in range(0, len(l), n): + yield l[i:i + n] + class PageCollection: def __init__(self, diff --git a/spotframework/player/player.py b/spotframework/player/player.py index 941afec..62a5475 100644 --- a/spotframework/player/player.py +++ b/spotframework/player/player.py @@ -13,14 +13,13 @@ class Player: def __init__(self, net: Network): self.net = net - self.user = net.user self.last_status = None def __str__(self): - return f'{self.user.username} - {self.status}' + return f'{self.net.user.username} - {self.status}' def __repr__(self): - return f'Player: {self.user} - {self.status}' + return f'Player: {self.net.user} - {self.status}' @property def available_devices(self): @@ -94,7 +93,7 @@ class Player: else: self.shuffle(state=True) - def set_volume(self, value: int, device: Device = None): + def volume(self, value: int, device: Device = None): if 0 <= int(value) <= 100: if device: diff --git a/spotframework/util/__init__.py b/spotframework/util/__init__.py index e69de29..eaaeeab 100644 --- a/spotframework/util/__init__.py +++ b/spotframework/util/__init__.py @@ -0,0 +1,8 @@ +import math + + +def convert_ms_to_minute_string(ms): + seconds = ms / 1000 + minutes = math.floor(seconds / 60) + + return f'{minutes}:{math.floor(seconds%60)}'