added audio features and wrapping functions

This commit is contained in:
aj 2019-10-03 00:58:40 +01:00
parent 47a7f74c98
commit af0abe0285
8 changed files with 423 additions and 23 deletions

View File

@ -0,0 +1,91 @@
from spotframework.engine.processor.abstract import BatchSingleProcessor
from abc import ABC, abstractmethod
from typing import List
from spotframework.model.track import SpotifyTrack
from spotframework.model.uri import Uri
class AudioFeaturesProcessor(BatchSingleProcessor, ABC):
def __init__(self,
names: List[str] = None,
uris: List[Uri] = None,
append_malformed: bool = True):
super().__init__(names=names,
uris=uris)
self.append_malformed = append_malformed
def process(self, tracks: List[SpotifyTrack]) -> List[SpotifyTrack]:
return_tracks = []
malformed_tracks = []
for track in tracks:
if isinstance(track, SpotifyTrack) and track.audio_features is not None:
return_tracks.append(track)
else:
malformed_tracks.append(track)
return_tracks = super().process(return_tracks)
if self.append_malformed:
return_tracks += malformed_tracks
return return_tracks
class FloatFilter(AudioFeaturesProcessor, ABC):
def __init__(self,
names: List[str] = None,
uris: List[Uri] = None,
append_malformed: bool = True,
boundary: float = None,
greater_than: bool = True):
super().__init__(names=names,
uris=uris,
append_malformed=append_malformed)
self.boundary = boundary
self.greater_than = greater_than
@abstractmethod
def get_variable_value(self, track: SpotifyTrack) -> float:
pass
def process_single(self, track: SpotifyTrack):
if self.greater_than:
if self.get_variable_value(track) > self.boundary:
return track
else:
return None
else:
if self.get_variable_value(track) < self.boundary:
return track
else:
return None
class EnergyFilter(FloatFilter):
def get_variable_value(self, track: SpotifyTrack) -> float:
return track.audio_features.energy
class ValenceFilter(FloatFilter):
def get_variable_value(self, track: SpotifyTrack) -> float:
return track.audio_features.valence
class TempoFilter(FloatFilter):
def get_variable_value(self, track: SpotifyTrack) -> float:
return track.audio_features.tempo
class DanceabilityFilter(FloatFilter):
def get_variable_value(self, track: SpotifyTrack) -> float:
return track.audio_features.danceability
class AcousticnessFilter(FloatFilter):
def get_variable_value(self, track: SpotifyTrack) -> float:
return track.audio_features.acousticness

View File

@ -30,6 +30,11 @@ class Album:
return Color.DARKCYAN + Color.BOLD + 'Album' + Color.END + \
f': {self.name}, [{self.artists}]'
@staticmethod
def wrap(name: str = None,
artists: Union[str, List[str]] = None):
return Album(name=name, artists=[Artist(i) for i in artists])
class SpotifyAlbum(Album):
def __init__(self,
@ -56,6 +61,9 @@ class SpotifyAlbum(Album):
else:
self.uri = uri
if self.uri.object_type != Uri.ObjectType.album:
raise TypeError('provided uri not for an album')
self.genres = genres
self.tracks = tracks
@ -69,6 +77,16 @@ class SpotifyAlbum(Album):
return Color.DARKCYAN + Color.BOLD + 'SpotifyAlbum' + Color.END + \
f': {self.name}, {self.artists}, {self.uri}, {self.tracks}'
@staticmethod
def wrap(uri: Uri = None,
name: str = None,
artists: Union[str, List[str]] = None):
if uri:
return SpotifyAlbum(name=None, artists=None, uri=uri)
else:
return super().wrap(name=name, artists=artists)
class LibraryAlbum(SpotifyAlbum):
def __init__(self,

View File

@ -34,6 +34,9 @@ class SpotifyArtist(Artist):
else:
self.uri = uri
if self.uri.object_type != Uri.ObjectType.artist:
raise TypeError('provided uri not for an artist')
self.genres = genres
self.popularity = popularity
@ -41,3 +44,7 @@ class SpotifyArtist(Artist):
def __repr__(self):
return Color.PURPLE + Color.BOLD + 'SpotifyArtist' + Color.END + \
f': {self.name}, {self.uri}'
@staticmethod
def wrap(uri: Uri):
return SpotifyArtist(name=None, uri=uri)

View File

@ -142,6 +142,9 @@ class SpotifyPlaylist(Playlist):
else:
self.uri = uri
if self.uri.object_type != Uri.ObjectType.playlist:
raise TypeError('provided uri not for a playlist')
self.collaborative = collaborative
self.public = public
self.ext_spotify = ext_spotify

View File

@ -4,6 +4,8 @@ from typing import List, Union
from datetime import datetime
from spotframework.model.uri import Uri
from spotframework.util.console import Color
from spotframework.util import convert_ms_to_minute_string
from enum import Enum
if TYPE_CHECKING:
from spotframework.model.album import Album
from spotframework.model.artist import Artist
@ -51,6 +53,15 @@ class Track:
return Color.YELLOW + Color.BOLD + 'Track' + Color.END + \
f': {self.name}, ({self.album}), {self.artists}'
@staticmethod
def wrap(name: str = None,
artists: List[str] = None,
album: str = None,
album_artists: List[str] = None):
return Track(name=name,
album=Album.wrap(name=album, artists=album_artists),
artists=[Artist(i) for i in artists])
class SpotifyTrack(Track):
def __init__(self,
@ -66,7 +77,9 @@ class SpotifyTrack(Track):
explicit: bool = None,
is_playable: bool = None,
popularity: int = None
popularity: int = None,
audio_features: AudioFeatures = None
):
super().__init__(name=name, album=album, artists=artists,
disc_number=disc_number,
@ -78,17 +91,35 @@ class SpotifyTrack(Track):
self.uri = Uri(uri)
else:
self.uri = uri
if self.uri.object_type != Uri.ObjectType.track:
raise TypeError('provided uri not for a track')
self.is_playable = is_playable
self.popularity = popularity
self.audio_features = audio_features
def __repr__(self):
return Color.BOLD + Color.YELLOW + 'SpotifyTrack' + Color.END + \
string = Color.BOLD + Color.YELLOW + 'SpotifyTrack' + Color.END + \
f': {self.name}, ({self.album}), {self.artists}, {self.uri}'
if self.audio_features is not None:
string += ' ' + repr(self.audio_features)
return string
@staticmethod
def get_uri_shell(uri):
def wrap(uri: Uri = None,
name: str = None,
artists: Union[str, List[str]] = None,
album: str = None,
album_artists: Union[str, List[str]] = None):
if uri:
return SpotifyTrack(name=None, album=None, artists=None, uri=uri)
else:
return super().wrap(name=name, artists=artists, album=album, album_artists=album_artists)
class LibraryTrack(SpotifyTrack):
@ -107,6 +138,8 @@ class LibraryTrack(SpotifyTrack):
popularity: int = None,
audio_features: AudioFeatures = None,
added_at: datetime = None
):
super().__init__(name=name, album=album, artists=artists,
@ -117,14 +150,20 @@ class LibraryTrack(SpotifyTrack):
duration_ms=duration_ms,
explicit=explicit,
is_playable=is_playable,
popularity=popularity)
popularity=popularity,
audio_features=audio_features)
self.added_at = added_at
def __repr__(self):
return Color.BOLD + Color.YELLOW + 'LibraryTrack' + Color.END + \
string = Color.BOLD + Color.YELLOW + 'LibraryTrack' + Color.END + \
f': {self.name}, ({self.album}), {self.artists}, {self.uri}, {self.added_at}'
if self.audio_features is not None:
string += ' ' + repr(self.audio_features)
return string
class PlaylistTrack(SpotifyTrack):
def __init__(self,
@ -144,7 +183,9 @@ class PlaylistTrack(SpotifyTrack):
explicit: bool = None,
is_playable: bool = None,
popularity: int = None
popularity: int = None,
audio_features: AudioFeatures = None
):
super().__init__(name=name, album=album, artists=artists,
href=href,
@ -154,16 +195,22 @@ class PlaylistTrack(SpotifyTrack):
duration_ms=duration_ms,
explicit=explicit,
is_playable=is_playable,
popularity=popularity)
popularity=popularity,
audio_features=audio_features)
self.added_at = added_at
self.added_by = added_by
self.is_local = is_local
def __repr__(self):
return Color.BOLD + Color.YELLOW + 'PlaylistTrack' + Color.END + \
string = Color.BOLD + Color.YELLOW + 'PlaylistTrack' + Color.END + \
f': {self.name}, ({self.album}), {self.artists}, {self.uri}, {self.added_at}'
if self.audio_features is not None:
string += ' ' + repr(self.audio_features)
return string
class PlayedTrack(SpotifyTrack):
def __init__(self,
@ -181,6 +228,8 @@ class PlayedTrack(SpotifyTrack):
popularity: int = None,
audio_features: AudioFeatures = None,
played_at: datetime = None,
context: Context = None
):
@ -192,10 +241,141 @@ class PlayedTrack(SpotifyTrack):
duration_ms=duration_ms,
explicit=explicit,
is_playable=is_playable,
popularity=popularity)
popularity=popularity,
audio_features=audio_features)
self.played_at = played_at
self.context = context
def __repr__(self):
return Color.BOLD + Color.YELLOW + 'PlayedTrack' + Color.END + \
string = Color.BOLD + Color.YELLOW + 'PlayedTrack' + Color.END + \
f': {self.name}, ({self.album}), {self.artists}, {self.uri}, {self.played_at}'
if self.audio_features is not None:
string += ' ' + repr(self.audio_features)
return string
class AudioFeatures:
class Mode(Enum):
MINOR = 0
MAJOR = 1
def __init__(self,
acousticness: float,
analysis_url: str,
danceability: float,
duration_ms: int,
energy: float,
uri: Uri,
instrumentalness: float,
key: int,
liveness: float,
loudness: float,
mode: int,
speechiness: float,
tempo: float,
time_signature: int,
track_href: str,
valence: float):
self.acousticness = self.check_float(acousticness)
self.analysis_url = analysis_url
self.danceability = self.check_float(danceability)
self.duration_ms = duration_ms
self.energy = self.check_float(energy)
self.uri = uri
self.instrumentalness = self.check_float(instrumentalness)
self._key = key
self.liveness = self.check_float(liveness)
self.loudness = loudness
if mode == 0:
self.mode = self.Mode.MINOR
elif mode == 1:
self.mode = self.Mode.MAJOR
else:
raise ValueError('illegal value for mode')
self.speechiness = self.check_float(speechiness)
self.tempo = tempo
self.time_signature = time_signature
self.track_href = track_href
self.valence = self.check_float(valence)
@property
def key(self) -> str:
legend = {
0: 'C',
1: 'C#',
2: 'D',
3: 'D#',
4: 'E',
5: 'F',
6: 'F#',
7: 'G',
8: 'G#',
9: 'A',
10: 'A#',
11: 'B'
}
if legend.get(self._key, None):
return legend.get(self._key, None)
else:
raise ValueError('key value out of bounds')
@key.setter
def key(self, value):
if isinstance(value, int):
if 0 <= value <= 11:
self._key = value
else:
raise ValueError('key value out of bounds')
else:
raise ValueError('key value not integer')
def is_live(self):
if self.liveness is not None:
if self.liveness > 0.8:
return True
else:
return False
else:
raise ValueError('no value for liveness')
def is_instrumental(self):
if self.instrumentalness is not None:
if self.instrumentalness > 0.5:
return True
else:
return False
else:
raise ValueError('no value for instrumentalness')
def is_spoken_word(self):
if self.speechiness is not None:
if self.speechiness > 0.66:
return True
else:
return False
else:
raise ValueError('no value for speechiness')
@staticmethod
def check_float(value):
value = float(value)
if isinstance(value, float):
if 0 <= value <= 1:
return value
else:
raise ValueError(f'value {value} out of bounds')
else:
raise ValueError(f'value {value} is not float')
def __repr__(self):
return Color.BOLD + Color.DARKCYAN + 'AudioFeatures' + Color.END + \
f': acoustic:{self.acousticness}, dance:{self.danceability}, ' \
f'duration:{convert_ms_to_minute_string(self.duration_ms)}, energy:{self.energy}, ' \
f'instrumental:{self.instrumentalness}, key:{self.key}, live:{self.liveness}, ' \
f'volume:{self.loudness}db, mode:{self.mode.name}, speech:{self.speechiness}, tempo:{self.tempo}, ' \
f'time_sig:{self.time_signature}, valence:{self.valence}'

View File

@ -10,7 +10,7 @@ from spotframework.model.user import User
from . import const
from spotframework.net.user import NetworkUser
from spotframework.model.playlist import SpotifyPlaylist
from spotframework.model.track import Track, SpotifyTrack, PlaylistTrack, PlayedTrack, LibraryTrack
from spotframework.model.track import Track, SpotifyTrack, PlaylistTrack, PlayedTrack, LibraryTrack, AudioFeatures
from spotframework.model.album import LibraryAlbum, SpotifyAlbum
from spotframework.model.service import CurrentlyPlaying, Device, Context
from spotframework.model.uri import Uri
@ -139,6 +139,7 @@ class Network:
return None
def get_playlist(self, uri: Uri) -> Optional[SpotifyPlaylist]:
"""get playlist object with tracks for uri"""
logger.info(f"{uri}")
@ -147,7 +148,7 @@ class Network:
if tracks is not None:
playlist = SpotifyPlaylist(uri)
playlist.tracks += tracks
playlist += tracks
return playlist
else:
@ -160,6 +161,7 @@ class Network:
public: bool = True,
collaborative: bool = False,
description: bool = None) -> Optional[SpotifyPlaylist]:
"""create playlist for user"""
json = {"name": name, "public": public, "collaborative": collaborative}
@ -175,6 +177,7 @@ class Network:
return None
def get_playlists(self, response_limit: int = None) -> Optional[List[SpotifyPlaylist]]:
"""get current users playlists"""
logger.info(f"loading")
@ -188,6 +191,7 @@ class Network:
return return_items
def get_library_albums(self, response_limit: int = None) -> Optional[List[LibraryAlbum]]:
"""get user library albums"""
logger.info(f"loading")
@ -200,7 +204,8 @@ class Network:
return return_items
def get_library_tracks(self, response_limit: int = None) -> Optional[List[LibraryAlbum]]:
def get_library_tracks(self, response_limit: int = None) -> Optional[List[LibraryTrack]]:
"""get user library tracks"""
logger.info(f"loading")
@ -214,6 +219,7 @@ class Network:
return return_items
def get_user_playlists(self) -> Optional[List[SpotifyPlaylist]]:
"""filter user playlists for those that were user created"""
logger.info('retrieved')
@ -226,6 +232,7 @@ class Network:
return None
def get_playlist_tracks(self, uri: Uri, response_limit: int = None) -> List[PlaylistTrack]:
"""get list of playlists tracks for uri"""
logger.info(f"loading")
@ -239,6 +246,7 @@ class Network:
return return_items
def get_available_devices(self) -> Optional[List[Device]]:
"""get users available devices"""
logger.info("retrieving")
@ -253,6 +261,8 @@ class Network:
response_limit: int = None,
after: datetime.datetime = None,
before: datetime.datetime = None) -> Optional[List[PlayedTrack]]:
"""get list of recently played tracks"""
logger.info("retrieving")
params = dict()
@ -279,6 +289,7 @@ class Network:
return None
def get_player(self) -> Optional[CurrentlyPlaying]:
"""get currently playing snapshot (player)"""
logger.info("retrieved")
@ -289,21 +300,23 @@ class Network:
logger.info('no player returned')
return None
def get_device_id(self, devicename: str) -> Optional[str]:
def get_device_id(self, device_name: str) -> Optional[str]:
"""return device id of device as searched for by name"""
logger.info(f"{devicename}")
logger.info(f"{device_name}")
devices = self.get_available_devices()
if devices:
device = next((i for i in devices if i.name == devicename), None)
device = next((i for i in devices if i.name == device_name), None)
if device:
return device.device_id
else:
logger.error(f'{devicename} not found')
logger.error(f'{device_name} not found')
else:
logger.error('no devices returned')
def change_playback_device(self, device_id: str):
"""migrate playback to different device"""
logger.info(device_id)
@ -319,6 +332,7 @@ class Network:
return None
def play(self, uri: Uri = None, uris: List[Uri] = None, deviceid: str = None) -> Optional[Response]:
"""begin playback"""
logger.info(f"{uri}{' ' + deviceid if deviceid is not None else ''}")
@ -344,6 +358,7 @@ class Network:
logger.error('error playing')
def pause(self, deviceid: str = None) -> Optional[Response]:
"""pause playback"""
logger.info(f"{deviceid if deviceid is not None else ''}")
@ -359,6 +374,7 @@ class Network:
logger.error('error pausing')
def next(self, deviceid: str = None) -> Optional[Response]:
"""skip track playback"""
logger.info(f"{deviceid if deviceid is not None else ''}")
@ -374,6 +390,7 @@ class Network:
logger.error('error skipping')
def previous(self, deviceid: str = None) -> Optional[Response]:
"""skip playback backwards"""
logger.info(f"{deviceid if deviceid is not None else ''}")
@ -590,6 +607,59 @@ class Network:
else:
logger.error('error reordering playlist')
def get_track_audio_features(self, uris: List[Uri]):
logger.info(f'ids: {len(uris)}')
audio_features = []
chunked_uris = list(self.chunk(uris, 100))
for chunk in chunked_uris:
resp = self.make_get_request('getAudioFeatures',
url='audio-features',
params={'ids': ','.join(i.object_id for i in chunk)})
if resp:
if resp.get('audio_features', None):
parsed_features = [self.parse_audio_features(i) for i in resp['audio_features']]
audio_features += parsed_features
else:
logger.error('no audio features included')
else:
logger.error('no response')
if len(audio_features) == len(uris):
return audio_features
else:
logger.error('mismatched length of input and response')
def populate_track_audio_features(self, tracks=Union[SpotifyTrack, List[SpotifyTrack]]):
logger.info(f'populating')
if isinstance(tracks, SpotifyTrack):
audio_features = self.get_track_audio_features([tracks.uri])
if audio_features:
if len(audio_features) == 1:
tracks.audio_features = audio_features[0]
return tracks
else:
logger.error(f'{len(audio_features)} features returned')
else:
logger.error(f'no audio features returned for {tracks.uri}')
elif isinstance(tracks, List):
if all(isinstance(i, SpotifyTrack) for i in tracks):
audio_features = self.get_track_audio_features([i.uri for i in tracks])
if audio_features:
for index, track in enumerate(tracks):
track.audio_features = audio_features[index]
return tracks
else:
logger.error(f'no audio features returned')
else:
raise TypeError('must provide either single or list of spotify tracks')
@staticmethod
def parse_artist(artist_dict) -> SpotifyArtist:
@ -683,7 +753,7 @@ class Network:
label=label,
popularity=popularity)
def parse_track(self, track_dict) -> Union[Track, SpotifyTrack, PlaylistTrack, PlayedTrack]:
def parse_track(self, track_dict) -> Union[Track, SpotifyTrack, PlaylistTrack, PlayedTrack, LibraryTrack]:
if 'track' in track_dict:
track = track_dict.get('track', None)
@ -861,6 +931,30 @@ class Network:
object_type=Device.DeviceType[device_dict['type'].upper()],
volume=device_dict['volume_percent'])
@staticmethod
def parse_audio_features(feature_dict) -> AudioFeatures:
return AudioFeatures(acousticness=feature_dict['acousticness'],
analysis_url=feature_dict['analysis_url'],
danceability=feature_dict['danceability'],
duration_ms=feature_dict['duration_ms'],
energy=feature_dict['energy'],
uri=Uri(feature_dict['uri']),
instrumentalness=feature_dict['instrumentalness'],
key=feature_dict['key'],
liveness=feature_dict['liveness'],
loudness=feature_dict['loudness'],
mode=feature_dict['mode'],
speechiness=feature_dict['speechiness'],
tempo=feature_dict['tempo'],
time_signature=feature_dict['time_signature'],
track_href=feature_dict['track_href'],
valence=feature_dict['valence'])
@staticmethod
def chunk(l, n):
for i in range(0, len(l), n):
yield l[i:i + n]
class PageCollection:
def __init__(self,

View File

@ -13,14 +13,13 @@ class Player:
def __init__(self,
net: Network):
self.net = net
self.user = net.user
self.last_status = None
def __str__(self):
return f'{self.user.username} - {self.status}'
return f'{self.net.user.username} - {self.status}'
def __repr__(self):
return f'Player: {self.user} - {self.status}'
return f'Player: {self.net.user} - {self.status}'
@property
def available_devices(self):
@ -94,7 +93,7 @@ class Player:
else:
self.shuffle(state=True)
def set_volume(self, value: int, device: Device = None):
def volume(self, value: int, device: Device = None):
if 0 <= int(value) <= 100:
if device:

View File

@ -0,0 +1,8 @@
import math
def convert_ms_to_minute_string(ms):
seconds = ms / 1000
minutes = math.floor(seconds / 60)
return f'{minutes}:{math.floor(seconds%60)}'