added page iterator

This commit is contained in:
aj 2019-09-15 23:16:55 +01:00
parent 2bd26df92f
commit ef92b49ee6
4 changed files with 502 additions and 333 deletions

View File

@ -1,4 +1,5 @@
from __future__ import annotations
from datetime import datetime
from typing import TYPE_CHECKING
from typing import List, Union
from spotframework.util.console import Color
@ -67,3 +68,35 @@ class SpotifyAlbum(Album):
def __repr__(self):
return Color.DARKCYAN + Color.BOLD + 'SpotifyAlbum' + Color.END + \
f': {self.name}, {self.artists}, {self.uri}, {self.tracks}'
class LibraryAlbum(SpotifyAlbum):
def __init__(self,
name: str,
artists: List[Artist],
href: str = None,
uri: Union[str, Uri] = None,
genres: List[str] = None,
tracks: List = None,
release_date: str = None,
release_date_precision: str = None,
label: str = None,
popularity: int = None,
added_at: datetime = None
):
super().__init__(name=name,
artists=artists,
href=href,
uri=uri,
genres=genres,
tracks=tracks,
release_date=release_date,
release_date_precision=release_date_precision,
label=label,
popularity=popularity)
self.added_at = added_at

View File

@ -2,14 +2,17 @@ import requests
import random
import logging
import time
from typing import List, Optional
from datetime import datetime
from typing import List, Optional, Union
import datetime
from spotframework.model.artist import SpotifyArtist
from spotframework.model.user import User
from . import const
from spotframework.net.parse import parse
from spotframework.net.user import NetworkUser
from spotframework.model.playlist import SpotifyPlaylist
from spotframework.model.track import Track, PlaylistTrack, PlayedTrack
from spotframework.model.service import CurrentlyPlaying, Device
from spotframework.model.track import Track, SpotifyTrack, PlaylistTrack, PlayedTrack
from spotframework.model.album import LibraryAlbum, SpotifyAlbum
from spotframework.model.service import CurrentlyPlaying, Device, Context
from spotframework.model.uri import Uri
from requests.models import Response
@ -23,11 +26,17 @@ class Network:
def __init__(self, user: NetworkUser):
self.user = user
def _make_get_request(self, method, url, params=None, headers={}) -> Optional[dict]:
def make_get_request(self, method, url=None, params=None, headers=None, whole_url=None) -> Optional[dict]:
if headers is None:
headers = dict()
headers['Authorization'] = 'Bearer ' + self.user.accesstoken
req = requests.get(const.api_url + url, params=params, headers=headers)
if whole_url:
req = requests.get(whole_url, params=params, headers=headers)
else:
req = requests.get(const.api_url + url, params=params, headers=headers)
if 200 <= req.status_code < 300:
logger.debug(f'{method} get {req.status_code}')
@ -44,14 +53,14 @@ class Network:
if retry_after:
logger.warning(f'{method} rate limit reached: retrying in {retry_after} seconds')
time.sleep(int(retry_after) + 1)
return self._make_get_request(method, url, params, headers)
return self.make_get_request(method, url, params, headers)
else:
logger.error(f'{method} rate limit reached: cannot find Retry-After header')
elif req.status_code == 401:
logger.warning(f'{method} access token expired, refreshing')
self.user.refresh_token()
return self._make_get_request(method, url, params, headers)
return self.make_get_request(method, url, params, headers)
else:
error_text = req.json()['error']['message']
@ -59,7 +68,10 @@ class Network:
return None
def _make_post_request(self, method, url, params=None, json=None, headers={}) -> Optional[Response]:
def make_post_request(self, method, url, params=None, json=None, headers=None) -> Optional[Response]:
if headers is None:
headers = dict()
headers['Authorization'] = 'Bearer ' + self.user.accesstoken
@ -76,14 +88,14 @@ class Network:
if retry_after:
logger.warning(f'{method} rate limit reached: retrying in {retry_after} seconds')
time.sleep(int(retry_after) + 1)
return self._make_post_request(method, url, params, json, headers)
return self.make_post_request(method, url, params, json, headers)
else:
logger.error(f'{method} rate limit reached: cannot find Retry-After header')
elif req.status_code == 401:
logger.warning(f'{method} access token expired, refreshing')
self.user.refresh_token()
return self._make_post_request(method, url, params, json, headers)
return self.make_post_request(method, url, params, json, headers)
else:
error_text = str(req.text)
@ -91,7 +103,10 @@ class Network:
return None
def _make_put_request(self, method, url, params=None, json=None, headers={}) -> Optional[Response]:
def make_put_request(self, method, url, params=None, json=None, headers=None) -> Optional[Response]:
if headers is None:
headers = dict()
headers['Authorization'] = 'Bearer ' + self.user.accesstoken
@ -108,14 +123,14 @@ class Network:
if retry_after:
logger.warning(f'{method} rate limit reached: retrying in {retry_after} seconds')
time.sleep(int(retry_after) + 1)
return self._make_put_request(method, url, params, json, headers)
return self.make_put_request(method, url, params, json, headers)
else:
logger.error(f'{method} rate limit reached: cannot find Retry-After header')
elif req.status_code == 401:
logger.warning(f'{method} access token expired, refreshing')
self.user.refresh_token()
return self._make_put_request(method, url, params, json, headers)
return self.make_put_request(method, url, params, json, headers)
else:
error_text = str(req.text)
@ -131,7 +146,7 @@ class Network:
if tracks is not None:
playlist = SpotifyPlaylist(uri.object_id)
playlist = SpotifyPlaylist(uri)
playlist.tracks += tracks
return playlist
@ -140,50 +155,50 @@ class Network:
return None
def create_playlist(self,
username,
name='New Playlist',
public=True,
collaborative=False,
description=None) -> Optional[SpotifyPlaylist]:
username: str,
name: str = 'New Playlist',
public: bool = True,
collaborative: bool = False,
description: bool = None) -> Optional[SpotifyPlaylist]:
json = {"name": name, "public": public, "collaborative": collaborative}
if description:
json['description'] = description
req = self._make_post_request('createPlaylist', f'users/{username}/playlists', json=json)
req = self.make_post_request('createPlaylist', f'users/{username}/playlists', json=json)
if 200 <= req.status_code < 300:
return parse.parse_playlist(req.json())
return self.parse_playlist(req.json())
else:
logger.error('error creating playlist')
return None
def get_playlists(self, offset=0) -> Optional[List[SpotifyPlaylist]]:
def get_playlists(self, response_limit: int = None) -> Optional[List[SpotifyPlaylist]]:
logger.info(f"{offset}")
logger.info(f"loading")
playlists = []
pager = PageCollection(net=self, url='me/playlists', name='getPlaylists')
if response_limit:
pager.total_limit = response_limit
pager.iterate()
params = {'offset': offset, 'limit': limit}
return_items = [self.parse_playlist(i) for i in pager.items]
resp = self._make_get_request('getPlaylists', 'me/playlists', params=params)
return return_items
if resp:
def get_library_albums(self, response_limit: int = None) -> Optional[List[LibraryAlbum]]:
for responseplaylist in resp['items']:
playlists.append(parse.parse_playlist(responseplaylist))
logger.info(f"loading")
if resp.get('next', None):
more_playlists = self.get_playlists(offset + limit)
if more_playlists:
playlists += more_playlists
pager = PageCollection(net=self, url='me/albums', name='getLibrary')
if response_limit:
pager.total_limit = response_limit
pager.iterate()
return playlists
return_items = [self.parse_album(i) for i in pager.items]
else:
logger.error(f'error getting playlists offset={offset}')
return None
return return_items
def get_user_playlists(self) -> Optional[List[SpotifyPlaylist]]:
@ -197,53 +212,37 @@ class Network:
logger.error('no playlists returned to filter')
return None
def get_playlist_tracks(self, uri: Uri, offset=0) -> List[PlaylistTrack]:
def get_playlist_tracks(self, uri: Uri, response_limit: int = None) -> List[PlaylistTrack]:
logger.info(f"{uri}{' ' + str(offset) if offset is not 0 else ''}")
logger.info(f"loading")
tracks = []
pager = PageCollection(net=self, url=f'playlists/{uri.object_id}/tracks', name='getPlaylistTracks')
if response_limit:
pager.total_limit = response_limit
pager.iterate()
params = {'offset': offset, 'limit': limit}
return_items = [self.parse_track(i) for i in pager.items]
resp = self._make_get_request('getPlaylistTracks', f'playlists/{uri.object_id}/tracks', params=params)
if resp:
if resp.get('items', None):
tracks += [parse.parse_track(i) for i in resp.get('items', None)]
else:
logger.warning(f'{uri} no items returned')
if resp.get('next', None):
more_tracks = self.get_playlist_tracks(uri, offset + limit)
if more_tracks:
tracks += more_tracks
else:
logger.warning(f'{uri} error on response')
return tracks
return return_items
def get_available_devices(self) -> Optional[List[Device]]:
logger.info("retrieving")
resp = self._make_get_request('getAvailableDevices', 'me/player/devices')
resp = self.make_get_request('getAvailableDevices', 'me/player/devices')
if resp:
return [parse.parse_device(i) for i in resp['devices']]
return [self.parse_device(i) for i in resp['devices']]
else:
logger.error('no devices returned')
return None
def get_recently_played_tracks(self,
response_limit: int = None,
after: datetime = None,
before: datetime = None) -> Optional[List[PlayedTrack]]:
after: datetime.datetime = None,
before: datetime.datetime = None) -> Optional[List[PlayedTrack]]:
logger.info("retrieving")
params = dict()
if response_limit:
params['limit'] = response_limit
if after and before:
raise ValueError('cant have before and after')
if after:
@ -251,10 +250,17 @@ class Network:
if before:
params['before'] = int(before.timestamp() * 1000)
resp = self._make_get_request('getRecentlyPlayedTracks', 'me/player/recently-played', params=params)
resp = self.make_get_request('getRecentlyPlayedTracks', 'me/player/recently-played', params=params)
if resp:
return [parse.parse_track(i) for i in resp['items']]
pager = PageCollection(self, page=resp)
if response_limit:
pager.total_limit = response_limit
else:
pager.total_limit = 20
pager.continue_iteration()
return [self.parse_track(i) for i in pager.items]
else:
logger.error('no tracks returned')
return None
@ -263,14 +269,14 @@ class Network:
logger.info("retrieved")
resp = self._make_get_request('getPlayer', 'me/player')
resp = self.make_get_request('getPlayer', 'me/player')
if resp:
return parse.parse_currently_playing(resp)
return self.parse_currently_playing(resp)
else:
logger.info('no player returned')
return None
def get_device_id(self, devicename) -> Optional[str]:
def get_device_id(self, devicename: str) -> Optional[str]:
logger.info(f"{devicename}")
@ -284,7 +290,7 @@ class Network:
else:
logger.error('no devices returned')
def change_playback_device(self, device_id):
def change_playback_device(self, device_id: str):
logger.info(device_id)
@ -293,13 +299,13 @@ class Network:
'play': True
}
resp = self._make_put_request('changePlaybackDevice', 'me/player', json=json)
resp = self.make_put_request('changePlaybackDevice', 'me/player', json=json)
if resp:
return True
else:
return None
def play(self, uri: Uri = None, uris: List[Uri] = None, deviceid=None) -> Optional[Response]:
def play(self, uri: Uri = None, uris: List[Uri] = None, deviceid: str = None) -> Optional[Response]:
logger.info(f"{uri}{' ' + deviceid if deviceid is not None else ''}")
@ -318,13 +324,13 @@ class Network:
if uris:
payload['uris'] = [str(i) for i in uris[:200]]
req = self._make_put_request('play', 'me/player/play', params=params, json=payload)
req = self.make_put_request('play', 'me/player/play', params=params, json=payload)
if req:
return req
else:
logger.error('error playing')
def pause(self, deviceid=None) -> Optional[Response]:
def pause(self, deviceid: str = None) -> Optional[Response]:
logger.info(f"{deviceid if deviceid is not None else ''}")
@ -333,13 +339,13 @@ class Network:
else:
params = None
req = self._make_put_request('pause', 'me/player/pause', params=params)
req = self.make_put_request('pause', 'me/player/pause', params=params)
if req:
return req
else:
logger.error('error pausing')
def next(self, deviceid=None) -> Optional[Response]:
def next(self, deviceid: str = None) -> Optional[Response]:
logger.info(f"{deviceid if deviceid is not None else ''}")
@ -348,13 +354,13 @@ class Network:
else:
params = None
req = self._make_post_request('next', 'me/player/next', params=params)
req = self.make_post_request('next', 'me/player/next', params=params)
if req:
return req
else:
logger.error('error skipping')
def previous(self, deviceid=None) -> Optional[Response]:
def previous(self, deviceid: str = None) -> Optional[Response]:
logger.info(f"{deviceid if deviceid is not None else ''}")
@ -363,13 +369,13 @@ class Network:
else:
params = None
req = self._make_post_request('previous', 'me/player/previous', params=params)
req = self.make_post_request('previous', 'me/player/previous', params=params)
if req:
return req
else:
logger.error('error reversing')
def set_shuffle(self, state, deviceid=None) -> Optional[Response]:
def set_shuffle(self, state: bool, deviceid: str = None) -> Optional[Response]:
logger.info(f"{state}{' ' + deviceid if deviceid is not None else ''}")
@ -378,24 +384,24 @@ class Network:
if deviceid is not None:
params['device_id'] = deviceid
req = self._make_put_request('setShuffle', 'me/player/shuffle', params=params)
req = self.make_put_request('setShuffle', 'me/player/shuffle', params=params)
if req:
return req
else:
logger.error(f'error setting shuffle {state}')
def set_volume(self, volume, deviceid=None) -> Optional[Response]:
def set_volume(self, volume: int, deviceid: str = None) -> Optional[Response]:
logger.info(f"{volume}{' ' + deviceid if deviceid is not None else ''}")
if volume.isdigit() and 0 <= int(volume) <= 100:
if 0 <= int(volume) <= 100:
params = {'volume_percent': volume}
if deviceid is not None:
params['device_id'] = deviceid
req = self._make_put_request('setVolume', 'me/player/volume', params=params)
req = self.make_put_request('setVolume', 'me/player/volume', params=params)
if req:
return req
else:
@ -414,8 +420,8 @@ class Network:
json = {"uris": [str(i) for i in uris[:100]]}
req = self._make_put_request('replacePlaylistTracks', f'playlists/{uri.object_id}/tracks',
json=json, headers=headers)
req = self.make_put_request('replacePlaylistTracks', f'playlists/{uri.object_id}/tracks',
json=json, headers=headers)
if req is not None:
@ -428,10 +434,10 @@ class Network:
def change_playlist_details(self,
uri: Uri,
name=None,
public=None,
collaborative=None,
description=None) -> Optional[Response]:
name: str = None,
public: bool = None,
collaborative: bool = None,
description: str = None) -> Optional[Response]:
logger.info(f"{uri}")
@ -455,8 +461,8 @@ class Network:
logger.warning('update dictionairy length 0')
return None
else:
req = self._make_put_request('changePlaylistDetails', f'playlists/{uri.object_id}',
json=json, headers=headers)
req = self.make_put_request('changePlaylistDetails', f'playlists/{uri.object_id}',
json=json, headers=headers)
if req:
return req
else:
@ -471,8 +477,8 @@ class Network:
json = {"uris": [str(i) for i in uris[:100]]}
req = self._make_post_request('addPlaylistTracks', f'playlists/{uri.object_id}/tracks',
json=json, headers=headers)
req = self.make_post_request('addPlaylistTracks', f'playlists/{uri.object_id}/tracks',
json=json, headers=headers)
if req is not None:
resp = req.json()
@ -506,10 +512,10 @@ class Network:
logger.warning('update dictionairy length 0')
return None
else:
resp = self._make_get_request('getRecommendations', 'recommendations', params=params)
resp = self.make_get_request('getRecommendations', 'recommendations', params=params)
if resp:
if 'tracks' in resp:
return [parse.parse_track(i) for i in resp['tracks']]
return [self.parse_track(i) for i in resp['tracks']]
else:
logger.error('no tracks returned')
return None
@ -526,9 +532,11 @@ class Network:
self.replace_playlist_tracks(playlist.uri, [])
elif playlist.tracks:
if append_tracks:
self.add_playlist_tracks(playlist.uri, [i.uri for i in playlist.tracks])
self.add_playlist_tracks(playlist.uri, [i.uri for i in playlist.tracks if
isinstance(i, SpotifyTrack)])
else:
self.replace_playlist_tracks(playlist.uri, [i.uri for i in playlist.tracks])
self.replace_playlist_tracks(playlist.uri, [i.uri for i in playlist.tracks if
isinstance(i, SpotifyTrack)])
if playlist.name or playlist.collaborative or playlist.public or playlist.description:
self.change_playlist_details(playlist.uri,
@ -562,9 +570,368 @@ class Network:
'range_length': range_length,
'insert_before': insert_before}
resp = self._make_put_request('reorderPlaylistTracks', f'playlists/{uri.object_id}/tracks', json=json)
resp = self.make_put_request('reorderPlaylistTracks', f'playlists/{uri.object_id}/tracks', json=json)
if resp:
return resp
else:
logger.error('error reordering playlist')
@staticmethod
def parse_artist(artist_dict) -> SpotifyArtist:
name = artist_dict.get('name', None)
href = artist_dict.get('href', None)
uri = artist_dict.get('uri', None)
genres = artist_dict.get('genres', None)
popularity = artist_dict.get('popularity', None)
if name is None:
raise KeyError('artist name not found')
return SpotifyArtist(name,
href=href,
uri=uri,
genres=genres,
popularity=popularity)
def parse_album(self, album_dict) -> Union[SpotifyAlbum, LibraryAlbum]:
if 'album' in album_dict:
album = album_dict.get('album', None)
else:
album = album_dict
name = album.get('name', None)
if name is None:
raise KeyError('album name not found')
artists = [self.parse_artist(i) for i in album.get('artists', [])]
href = album.get('href', None)
uri = album.get('uri', None)
genres = album.get('genres', None)
if album.get('tracks'):
if 'next' in album['tracks']:
track_pager = PageCollection(net=self, page=album['tracks'])
track_pager.continue_iteration()
tracks = [self.parse_track(i) for i in track_pager.items]
else:
tracks = [self.parse_track(i) for i in album.get('tracks', [])]
else:
tracks = []
release_date = album.get('release_date', None)
release_date_precision = album.get('release_date_precision', None)
label = album.get('label', None)
popularity = album.get('popularity', None)
added_at = album_dict.get('added_at', None)
if added_at:
added_at = datetime.datetime.strptime(added_at, '%Y-%m-%dT%H:%M:%S%z')
if added_at:
return LibraryAlbum(name=name,
artists=artists,
href=href,
uri=uri,
genres=genres,
tracks=tracks,
release_date=release_date,
release_date_precision=release_date_precision,
label=label,
popularity=popularity,
added_at=added_at)
else:
return SpotifyAlbum(name=name,
artists=artists,
href=href,
uri=uri,
genres=genres,
tracks=tracks,
release_date=release_date,
release_date_precision=release_date_precision,
label=label,
popularity=popularity)
def parse_track(self, track_dict) -> Union[Track, SpotifyTrack, PlaylistTrack, PlayedTrack]:
if 'track' in track_dict:
track = track_dict.get('track', None)
else:
track = track_dict
name = track.get('name', None)
if name is None:
raise KeyError('track name not found')
if track.get('album', None):
album = self.parse_album(track['album'])
else:
album = None
artists = [self.parse_artist(i) for i in track.get('artists', [])]
href = track.get('href', None)
uri = track.get('uri', None)
disc_number = track.get('disc_number', None)
duration_ms = track.get('duration_ms', None)
explicit = track.get('explicit', None)
is_playable = track.get('is_playable', None)
popularity = track.get('popularity', None)
added_by = self.parse_user(track_dict.get('added_by')) if track_dict.get('added_by', None) else None
added_at = track_dict.get('added_at', None)
if added_at:
added_at = datetime.datetime.strptime(added_at, '%Y-%m-%dT%H:%M:%S%z')
is_local = track_dict.get('is_local', None)
played_at = track_dict.get('played_at', None)
if played_at:
played_at = datetime.datetime.strptime(played_at, '%Y-%m-%dT%H:%M:%S.%f%z')
context = track_dict.get('context', None)
if context:
context = self.parse_context(context)
if added_at or added_by or is_local:
return PlaylistTrack(name=name,
album=album,
artists=artists,
added_at=added_at,
added_by=added_by,
is_local=is_local,
href=href,
uri=uri,
disc_number=disc_number,
duration_ms=duration_ms,
explicit=explicit,
is_playable=is_playable,
popularity=popularity)
elif played_at or context:
return PlayedTrack(name=name,
album=album,
artists=artists,
href=href,
uri=uri,
disc_number=disc_number,
duration_ms=duration_ms,
explicit=explicit,
is_playable=is_playable,
popularity=popularity,
played_at=played_at,
context=context)
else:
return SpotifyTrack(name=name,
album=album,
artists=artists,
href=href,
uri=uri,
disc_number=disc_number,
duration_ms=duration_ms,
explicit=explicit,
is_playable=is_playable,
popularity=popularity)
@staticmethod
def parse_user(user_dict) -> User:
display_name = user_dict.get('display_name', None)
spotify_id = user_dict.get('id', None)
href = user_dict.get('href', None)
uri = user_dict.get('uri', None)
return User(spotify_id,
href=href,
uri=uri,
display_name=display_name)
def parse_playlist(self, playlist_dict) -> SpotifyPlaylist:
collaborative = playlist_dict.get('collaborative', None)
ext_spotify = None
if playlist_dict.get('external_urls', None):
if playlist_dict['external_urls'].get('spotify', None):
ext_spotify = playlist_dict['external_urls']['spotify']
href = playlist_dict.get('href', None)
description = playlist_dict.get('description', None)
name = playlist_dict.get('name', None)
if playlist_dict.get('owner', None):
owner = self.parse_user(playlist_dict.get('owner'))
else:
owner = None
public = playlist_dict.get('public', None)
uri = playlist_dict.get('uri', None)
return SpotifyPlaylist(uri=uri,
name=name,
owner=owner,
description=description,
href=href,
collaborative=collaborative,
public=public,
ext_spotify=ext_spotify)
@staticmethod
def parse_context(context_dict) -> Context:
return Context(object_type=context_dict['type'],
href=context_dict['href'],
external_spot=context_dict['external_urls']['spotify'],
uri=context_dict['uri'])
def parse_currently_playing(self, play_dict) -> CurrentlyPlaying:
return CurrentlyPlaying(
context=self.parse_context(play_dict['context']) if play_dict['context'] is not None else None,
timestamp=datetime.datetime.fromtimestamp(play_dict['timestamp'] / 1000),
progress_ms=play_dict['progress_ms'],
is_playing=play_dict['is_playing'],
track=self.parse_track(play_dict['item']),
device=self.parse_device(play_dict['device']),
shuffle=play_dict['shuffle_state'],
repeat=play_dict['repeat_state'],
currently_playing_type=play_dict['currently_playing_type'])
@staticmethod
def parse_device(device_dict) -> Device:
return Device(device_id=device_dict['id'],
is_active=device_dict['is_active'],
is_private_session=device_dict['is_private_session'],
is_restricted=device_dict['is_restricted'],
name=device_dict['name'],
object_type=Device.DeviceType[device_dict['type'].upper()],
volume=device_dict['volume_percent'])
class PageCollection:
def __init__(self,
net: Network,
url: str = None,
page_limit: int = 50,
total_limit: int = None,
name: str = None,
page: dict = None):
self.net = net
self.url = url
self.pages = []
self.name = name
self.page_limit = page_limit
self.total_limit = total_limit
if page:
self.add_page(page)
def __len__(self):
length = 0
for page in self.pages:
length += len(page.items)
return length
@property
def total(self):
if len(self.pages) > 0:
return self.pages[0].total
return 0
@property
def items(self):
items = []
for page in self.pages:
items += page.items
return items[:self.total_limit]
def continue_iteration(self):
if len(self) < self.total_limit:
if len(self.pages) > 0:
if self.pages[-1].next_link:
self.iterate(self.pages[-1].next_link)
else:
raise IndexError('no pages')
def iterate(self, url=None):
logger.debug(f'iterating {self.name}')
params = {'limit': self.page_limit}
if url:
resp = self.net.make_get_request(method=self.name, whole_url=url, params=params)
else:
if self.url:
resp = self.net.make_get_request(method=self.name, url=self.url, params=params)
else:
raise ValueError('no url to query')
if resp:
page = self.add_page(resp)
if page.next_link:
if self.total_limit:
if len(self) < self.total_limit:
self.iterate(page.next_link)
else:
self.iterate(page.next_link)
else:
logger.error('no response')
def add_page(self, page_dict):
page = self.parse_page(page_dict)
self.pages.append(page)
return page
@staticmethod
def parse_page(page_dict):
return Page(
href=page_dict['href'],
items=page_dict['items'],
response_limit=page_dict['limit'],
next_link=page_dict['next'],
offset=page_dict.get('offset', None),
previous=page_dict.get('previous', None),
total=page_dict.get('total', None))
class Page:
def __init__(self,
href: str,
items: List,
response_limit: int,
next_link: str,
previous: str,
total: int,
offset: int = None):
self.href = href
self.items = items
self.response_limit = response_limit
self.next_link = next_link
self.offset = offset
self.previous = previous
self.total = total

View File

@ -1,231 +0,0 @@
from spotframework.model.artist import SpotifyArtist
from spotframework.model.album import SpotifyAlbum
from spotframework.model.track import Track, SpotifyTrack, PlaylistTrack, PlayedTrack
from spotframework.model.playlist import SpotifyPlaylist
from spotframework.model.user import User
from spotframework.model.service import Context, CurrentlyPlaying, Device
import datetime
from typing import Union
def parse_artist(artist_dict) -> SpotifyArtist:
name = artist_dict.get('name', None)
href = artist_dict.get('href', None)
uri = artist_dict.get('uri', None)
genres = artist_dict.get('genres', None)
popularity = artist_dict.get('popularity', None)
if name is None:
raise KeyError('artist name not found')
return SpotifyArtist(name,
href=href,
uri=uri,
genres=genres,
popularity=popularity)
def parse_album(album_dict) -> SpotifyAlbum:
name = album_dict.get('name', None)
if name is None:
raise KeyError('album name not found')
artists = [parse_artist(i) for i in album_dict.get('artists', [])]
href = album_dict.get('href', None)
uri = album_dict.get('uri', None)
genres = album_dict.get('genres', None)
tracks = [parse_track(i) for i in album_dict.get('tracks', [])]
release_date = album_dict.get('release_date', None)
release_date_precision = album_dict.get('release_date_precision', None)
label = album_dict.get('label', None)
popularity = album_dict.get('popularity', None)
return SpotifyAlbum(name=name,
artists=artists,
href=href,
uri=uri,
genres=genres,
tracks=tracks,
release_date=release_date,
release_date_precision=release_date_precision,
label=label,
popularity=popularity)
def parse_track(track_dict) -> Union[Track, SpotifyTrack, PlayedTrack]:
if 'track' in track_dict:
track = track_dict.get('track', None)
else:
track = track_dict
name = track.get('name', None)
if name is None:
raise KeyError('track name not found')
if track.get('album', None):
album = parse_album(track['album'])
else:
album = None
artists = [parse_artist(i) for i in track.get('artists', [])]
href = track.get('href', None)
uri = track.get('uri', None)
disc_number = track.get('disc_number', None)
duration_ms = track.get('duration_ms', None)
explicit = track.get('explicit', None)
is_playable = track.get('is_playable', None)
popularity = track.get('popularity', None)
added_by = parse_user(track_dict.get('added_by')) if track_dict.get('added_by', None) else None
added_at = track_dict.get('added_at', None)
if added_at:
added_at = datetime.datetime.strptime(added_at, '%Y-%m-%dT%H:%M:%S%z')
is_local = track_dict.get('is_local', None)
played_at = track_dict.get('played_at', None)
if played_at:
played_at = datetime.datetime.strptime(played_at, '%Y-%m-%dT%H:%M:%S.%f%z')
context = track_dict.get('context', None)
if context:
context = parse_context(context)
if added_at or added_by or is_local:
return PlaylistTrack(name=name,
album=album,
artists=artists,
added_at=added_at,
added_by=added_by,
is_local=is_local,
href=href,
uri=uri,
disc_number=disc_number,
duration_ms=duration_ms,
explicit=explicit,
is_playable=is_playable,
popularity=popularity)
elif played_at or context:
return PlayedTrack(name=name,
album=album,
artists=artists,
href=href,
uri=uri,
disc_number=disc_number,
duration_ms=duration_ms,
explicit=explicit,
is_playable=is_playable,
popularity=popularity,
played_at=played_at,
context=context)
else:
return SpotifyTrack(name=name,
album=album,
artists=artists,
href=href,
uri=uri,
disc_number=disc_number,
duration_ms=duration_ms,
explicit=explicit,
is_playable=is_playable,
popularity=popularity)
def parse_user(user_dict) -> User:
display_name = user_dict.get('display_name', None)
spotify_id = user_dict.get('id', None)
href = user_dict.get('href', None)
uri = user_dict.get('uri', None)
return User(spotify_id,
href=href,
uri=uri,
display_name=display_name)
def parse_playlist(playlist_dict) -> SpotifyPlaylist:
collaborative = playlist_dict.get('collaborative', None)
ext_spotify = None
if playlist_dict.get('external_urls', None):
if playlist_dict['external_urls'].get('spotify', None):
ext_spotify = playlist_dict['external_urls']['spotify']
href = playlist_dict.get('href', None)
description = playlist_dict.get('description', None)
name = playlist_dict.get('name', None)
if playlist_dict.get('owner', None):
owner = parse_user(playlist_dict.get('owner'))
else:
owner = None
public = playlist_dict.get('public', None)
uri = playlist_dict.get('uri', None)
return SpotifyPlaylist(uri=uri,
name=name,
owner=owner,
description=description,
href=href,
collaborative=collaborative,
public=public,
ext_spotify=ext_spotify)
def parse_context(context_dict) -> Context:
return Context(object_type=context_dict['type'],
href=context_dict['href'],
external_spot=context_dict['external_urls']['spotify'],
uri=context_dict['uri'])
def parse_currently_playing(play_dict) -> CurrentlyPlaying:
return CurrentlyPlaying(context=parse_context(play_dict['context']) if play_dict['context'] is not None else None,
timestamp=datetime.datetime.fromtimestamp(play_dict['timestamp']/1000),
progress_ms=play_dict['progress_ms'],
is_playing=play_dict['is_playing'],
track=parse_track(play_dict['item']),
device=parse_device(play_dict['device']),
shuffle=play_dict['shuffle_state'],
repeat=play_dict['repeat_state'],
currently_playing_type=play_dict['currently_playing_type'])
def parse_device(device_dict) -> Device:
return Device(device_id=device_dict['id'],
is_active=device_dict['is_active'],
is_private_session=device_dict['is_private_session'],
is_restricted=device_dict['is_restricted'],
name=device_dict['name'],
object_type=Device.DeviceType[device_dict['type'].upper()],
volume=device_dict['volume_percent'])