url type check, podcast methods, player device passing

This commit is contained in:
aj 2020-08-07 19:00:10 +01:00
parent cceba0bd78
commit 2e6585325b
4 changed files with 250 additions and 49 deletions

View File

@ -28,9 +28,10 @@ class SimplifiedEpisode:
name: str
release_date: datetime
release_date_precision: str
resume_point: ResumePoint
type: str
uri: Union[str, Uri]
language: str = None # soon to be deprecated
resume_point: ResumePoint = None
def __post_init__(self):
@ -71,6 +72,7 @@ class SimplifiedShow:
name: str
publisher: str
type: str
total_episodes: int
uri: Union[str, Uri]
def __post_init__(self):
@ -79,15 +81,15 @@ class SimplifiedShow:
self.uri = Uri(self.uri)
if self.uri:
if self.uri.object_type != Uri.ObjectType.episode:
raise TypeError('provided uri not for an episode')
if self.uri.object_type != Uri.ObjectType.show:
raise TypeError('provided uri not for an show')
if all((isinstance(i, dict) for i in self.images)):
self.images = [init_with_key_filter(Image, i) for i in self.images]
@dataclass
class EpisodeFull(SimplifiedEpisode):
show: SimplifiedShow
show: SimplifiedShow = None
def __post_init__(self):
super().__post_init__()

View File

@ -18,9 +18,9 @@ from spotframework.model.artist import ArtistFull
from spotframework.model.album import AlbumFull, LibraryAlbum, SimplifiedAlbum
from spotframework.model.track import SimplifiedTrack, TrackFull, PlaylistTrack, PlayedTrack, LibraryTrack, \
AudioFeatures, Device, CurrentlyPlaying, Recommendations
from spotframework.model.podcast import SimplifiedEpisode, EpisodeFull
from spotframework.model.podcast import SimplifiedEpisode, EpisodeFull, SimplifiedShow, ShowFull
from spotframework.model.uri import Uri
from spotframework.util.decorators import inject_uri
from spotframework.util.decorators import inject_uri, uri_type_check
limit = 50
@ -251,13 +251,13 @@ class Network:
self.user.user = self.get_current_user()
@inject_uri(uris=False)
@uri_type_check(uri_type=Uri.ObjectType.playlist)
def get_playlist(self,
uri: Uri,
tracks: bool = True) -> FullPlaylist:
"""get playlist object with tracks for uri
:param uri: target request uri
:param uri_string: target request uri as string
:param tracks: populate tracks of playlist during generation
:return: playlist object
"""
@ -394,13 +394,13 @@ class Network:
logger.error('no playlists returned to filter')
@inject_uri(uris=False)
@uri_type_check(uri_type=Uri.ObjectType.playlist)
def get_playlist_tracks(self,
uri: Uri,
response_limit: int = None) -> List[PlaylistTrack]:
"""get list of playlists tracks for uri
:param uri: target playlist uri
:param uri_string: target playlist uri as string
:param response_limit: max tracks to return
:return: list of playlist tracks if available
"""
@ -419,6 +419,32 @@ class Network:
return return_items
@inject_uri(uris=False)
@uri_type_check(uri_type=Uri.ObjectType.show)
def get_show_episodes(self,
uri: Uri,
response_limit: int = None) -> List[SimplifiedEpisode]:
"""get list of shows episodes for uri
:param uri: target show uri
:param response_limit: max episodes to return
:return: list of show episodes if available
"""
logger.info(f"paging episodes for {uri}")
pager = PageCollection(net=self, url=f'shows/{uri.object_id}/episodes', name='getShowEpisodes')
if response_limit:
pager.total_limit = response_limit
pager.iterate()
return_items = [init_with_key_filter(SimplifiedEpisode, i) for i in pager.items]
if len(return_items) == 0:
logger.error('no episodes returned')
return return_items
def get_available_devices(self) -> List[Device]:
"""get users available devices"""
@ -587,6 +613,7 @@ class Network:
logger.error(f"{volume} not accepted value")
@inject_uri
@uri_type_check(uri_type=Uri.ObjectType.playlist, uris_type=(Uri.ObjectType.track, Uri.ObjectType.episode))
def replace_playlist_tracks(self,
uri: Uri,
uris: List[Uri]) -> Optional[List[str]]:
@ -599,6 +626,7 @@ class Network:
return self.add_playlist_tracks(uri=uri, uris=uris[100:])
@inject_uri(uris=False)
@uri_type_check(uri_type=Uri.ObjectType.playlist)
def change_playlist_details(self,
uri: Uri,
name: str = None,
@ -619,6 +647,7 @@ class Network:
description=description)
@inject_uri
@uri_type_check(uri_type=Uri.ObjectType.playlist, uris_type=(Uri.ObjectType.track, Uri.ObjectType.episode))
def add_playlist_tracks(self, uri: Uri, uris: List[Uri]) -> List[str]:
logger.info(f"adding {len(uris)} tracks to {uri}")
@ -684,6 +713,7 @@ class Network:
logger.error('playlist has no id')
@inject_uri(uris=False)
@uri_type_check(uri_type=Uri.ObjectType.playlist)
def reorder_playlist_tracks(self,
uri: Uri,
range_start: int,
@ -708,6 +738,7 @@ class Network:
insert_before=insert_before)
@inject_uri(uri=False)
@uri_type_check(uris_type=Uri.ObjectType.track)
def get_track_audio_features(self, uris: List[Uri]) -> Optional[List[AudioFeatures]]:
logger.info(f'getting {len(uris)} features')
@ -759,13 +790,11 @@ class Network:
raise TypeError('must provide either single or list of spotify tracks')
@inject_uri(uri=False)
@uri_type_check(uris_type=Uri.ObjectType.track)
def get_tracks(self, uris: List[Uri]) -> List[TrackFull]:
logger.info(f'getting {len(uris)} tracks')
if not all(i.object_type == Uri.ObjectType.track for i in uris):
raise TypeError('uris must be of type track')
tracks = []
chunked_uris = list(self.chunk(uris, 50))
for chunk in chunked_uris:
@ -776,6 +805,7 @@ class Network:
return tracks
@inject_uri(uris=False)
@uri_type_check(uri_type=Uri.ObjectType.track)
def get_track(self, uri) -> Optional[TrackFull]:
track = self.get_tracks(uris=[uri])
@ -785,13 +815,11 @@ class Network:
return None
@inject_uri(uri=False)
@uri_type_check(uris_type=Uri.ObjectType.album)
def get_albums(self, uris: List[Uri]) -> List[AlbumFull]:
logger.info(f'getting {len(uris)} albums')
if not all(i.object_type == Uri.ObjectType.album for i in uris):
raise TypeError('uris must be of type album')
albums = []
chunked_uris = list(self.chunk(uris, 50))
for chunk in chunked_uris:
@ -802,6 +830,7 @@ class Network:
return albums
@inject_uri(uris=False)
@uri_type_check(uri_type=Uri.ObjectType.album)
def get_album(self, uri: Uri) -> Optional[AlbumFull]:
album = self.get_albums(uris=[uri])
@ -811,13 +840,11 @@ class Network:
return None
@inject_uri(uri=False)
@uri_type_check(uris_type=Uri.ObjectType.artist)
def get_artists(self, uris) -> List[ArtistFull]:
logger.info(f'getting {len(uris)} artists')
if not all(i.object_type == Uri.ObjectType.artist for i in uris):
raise TypeError('uris must be of type artist')
artists = []
chunked_uris = list(self.chunk(uris, 50))
for chunk in chunked_uris:
@ -828,6 +855,7 @@ class Network:
return artists
@inject_uri(uris=False)
@uri_type_check(uri_type=Uri.ObjectType.artist)
def get_artist(self, uri) -> Optional[ArtistFull]:
artist = self.get_artists(uris=[uri])
@ -836,6 +864,67 @@ class Network:
else:
return None
@inject_uri(uri=False)
@uri_type_check(uris_type=Uri.ObjectType.show)
def get_shows(self, uris) -> List[SimplifiedShow]:
logger.info(f'getting {len(uris)} shows')
shows = []
chunked_uris = list(self.chunk(uris, 50))
for chunk in chunked_uris:
resp = self.get_request(url='shows', ids=','.join([i.object_id for i in chunk]))
if resp:
shows += [init_with_key_filter(SimplifiedShow, i) for i in resp.get('shows', [])]
return shows
@inject_uri(uris=False)
@uri_type_check(uri_type=Uri.ObjectType.show)
def get_show(self, uri, episodes: bool = True) -> Optional[ShowFull]:
logger.info(f"retrieving {uri}")
resp = self.get_request(f'shows/{uri.object_id}')
show = init_with_key_filter(ShowFull, resp)
if resp.get('episodes') and episodes:
if 'next' in resp['episodes']:
logger.debug(f'paging episodes for {uri}')
track_pager = PageCollection(net=self, page=resp['episodes'])
track_pager.continue_iteration()
show.episodes = [init_with_key_filter(SimplifiedEpisode, i) for i in track_pager.items]
else:
logger.debug(f'parsing {len(resp.get("episodes"))} tracks for {uri}')
show.episodes = [init_with_key_filter(SimplifiedEpisode, i) for i in resp.get('episodes', [])]
return show
@inject_uri(uri=False)
@uri_type_check(uris_type=Uri.ObjectType.episode)
def get_episodes(self, uris) -> List[EpisodeFull]:
logger.info(f'getting {len(uris)} episodes')
episodes = []
chunked_uris = list(self.chunk(uris, 50))
for chunk in chunked_uris:
resp = self.get_request(url='episodes', ids=','.join([i.object_id for i in chunk]))
if resp:
episodes += [init_with_key_filter(EpisodeFull, i) for i in resp.get('episodes', [])]
return episodes
@inject_uri(uris=False)
@uri_type_check(uri_type=Uri.ObjectType.episode)
def get_episode(self, uri) -> EpisodeFull:
logger.info(f"retrieving {uri}")
resp = self.get_request(f'episodes/{uri.object_id}')
return init_with_key_filter(EpisodeFull, resp)
def search(self,
query_types: List[Uri.ObjectType],
query: str = None,

View File

@ -1,13 +1,46 @@
import functools
from typing import List, Union
import logging
from spotframework.net.network import Network, SpotifyNetworkException
from spotframework.model.track import SimplifiedTrack, Context, Device
from spotframework.model.album import AlbumFull
from spotframework.model.playlist import FullPlaylist
from typing import List, Union
import logging
from spotframework.util.decorators import inject_uri
logger = logging.getLogger(__name__)
class Player:
class Decorators:
@staticmethod
def inject_device(_func=None):
def decorator_inject_device(func):
@functools.wraps(func)
def inject_device_wrapper(self, *args, **kwargs):
if kwargs.get('device_name'):
kwargs['device'] = self.get_device(kwargs['device_name'], 'name')
del kwargs['device_name']
elif kwargs.get('device_id'):
kwargs['device'] = self.get_device(kwargs['device_id'], 'id')
del kwargs['device_id']
elif kwargs.get('device'):
pass
return func(self, *args, **kwargs)
return inject_device_wrapper
if _func is None:
return decorator_inject_device
else:
return decorator_inject_device(_func)
def __init__(self,
net: Network):
@ -24,8 +57,9 @@ class Player:
def available_devices(self):
try:
return self.net.get_available_devices()
except SpotifyNetworkException:
except SpotifyNetworkException as e:
logger.exception(f'error retrieving current devices')
raise e
@property
def status(self):
@ -37,17 +71,13 @@ class Player:
except SpotifyNetworkException:
logger.exception(f'error retrieving current devices')
@inject_uri(flatten_to_uris=True, uri_optional=True, uris_optional=True)
@Decorators.inject_device
def play(self,
context: Union[Context, AlbumFull, FullPlaylist] = None,
tracks: List[SimplifiedTrack] = None,
uris: List = None,
device: Device = None,
device_name: str = None):
if device_name:
searched_device = next((i for i in self.available_devices if i.name == device_name), None)
if searched_device:
device = searched_device
device: Device = None):
try:
if context and (tracks or uris):
raise Exception('cant execute context and track list')
@ -73,65 +103,86 @@ class Player:
except SpotifyNetworkException:
logger.exception(f'error playing')
@Decorators.inject_device
def change_device(self, device: Device):
try:
self.net.change_playback_device(device.id)
except SpotifyNetworkException:
logger.exception(f'error changing device to {device.name}')
def pause(self):
@Decorators.inject_device
def pause(self, device = None):
try:
if device is not None:
self.net.pause(deviceid=device.id)
else:
self.net.pause()
except SpotifyNetworkException:
logger.exception(f'error pausing')
def toggle_playback(self):
@Decorators.inject_device
def toggle_playback(self, device: Device = None):
status = self.status
try:
if status:
if status.is_playing:
self.pause()
self.pause(device=device)
else:
self.play()
self.play(device=device)
else:
logger.warning('no current playback, playing')
self.play()
self.play(device=device)
except SpotifyNetworkException:
logger.exception(f'error toggling playback')
def next(self):
@Decorators.inject_device
def next(self, device: Device = None):
try:
if device is not None:
self.net.next(deviceid=device.id)
else:
self.net.next()
except SpotifyNetworkException:
logger.exception(f'error skipping track')
def previous(self):
@Decorators.inject_device
def previous(self, device: Device = None):
try:
if device is not None:
self.net.previous(deviceid=device.id)
else:
self.net.previous()
except SpotifyNetworkException:
logger.exception(f'error reversing track')
def shuffle(self, state=None):
@Decorators.inject_device
def shuffle(self, device: Device = None, state=None):
if state is not None:
if isinstance(state, bool):
try:
self.net.set_shuffle(state)
if device is not None:
self.net.set_shuffle(deviceid=device.id, state=state)
else:
self.net.set_shuffle(state=state)
except SpotifyNetworkException:
logger.exception(f'error setting shuffle')
else:
raise TypeError(f'{state} is not bool')
else:
status = self.status
if status.shuffle_state:
self.shuffle(state=False)
else:
self.shuffle(state=True)
else:
self.shuffle(device=device, state=not self.status.shuffle_state)
@Decorators.inject_device
def volume(self, value: int, device: Device = None):
if 0 <= int(value) <= 100:
try:
if device:
if device is not None:
self.net.set_volume(value, deviceid=device.id)
else:
self.net.set_volume(value)
@ -139,3 +190,20 @@ class Player:
logger.exception(f'error setting volume to {value}')
else:
logger.error(f'{value} not between 0 and 100')
def get_device(self, device_in, attr):
if isinstance(device_in, str):
try:
searched_device = next((i for i in self.available_devices if getattr(i, attr) == device_in), None)
if searched_device is not None:
return searched_device
else:
logger.error(f'device not returned for {device_in}, {attr}')
except SpotifyNetworkException:
logger.exception(f'error retrieving current devices')
elif isinstance(device_in, Device): return device_in
else: raise TypeError(f'invalid uri type provided - {type(device_in)}')

View File

@ -1,7 +1,10 @@
import functools
import logging
from collections.abc import Iterable
from spotframework.util import get_uri
from spotframework.model.uri import Uri
logger = logging.getLogger(__name__)
@ -18,14 +21,21 @@ def debug(func):
return value
return wrapper_debug
def inject_uri(_func=None, *, uri=True, uris=True, uri_optional=False, uris_optional=False):
def inject_uri(_func=None, *, uri=True, uris=True, flatten_to_uris=False, uri_optional=False, uris_optional=False):
def decorator_inject_uri(func):
@functools.wraps(func)
def inject_uri_wrapper(*args, **kwargs):
if uri:
if uri_optional:
if flatten_to_uris:
kwargs['uris'] = [ get_uri(kwargs['uri']) ] if kwargs.get('uri') else None
else:
kwargs['uri'] = get_uri(kwargs['uri']) if kwargs.get('uri') else None
else:
if flatten_to_uris:
kwargs['uris'] = [ get_uri(kwargs['uri']) ]
else:
kwargs['uri'] = get_uri(kwargs['uri'])
@ -44,3 +54,35 @@ def inject_uri(_func=None, *, uri=True, uris=True, uri_optional=False, uris_opti
else:
return decorator_inject_uri(_func)
def contain_uri_types(types_in):
if isinstance(types_in, Iterable):
for uri_type in types_in:
if not isinstance(uri_type, Uri.ObjectType):
raise TypeError(f'provided type not a uri object type, {type(uri_type)}')
types = types_in
elif isinstance(types_in, Uri.ObjectType):
types = [types_in]
else:
raise TypeError(f'provided type not a uri object type, {type(types_in)}')
return types
def uri_type_check(uri_type = None, uris_type = None):
def decorator_type_check(func):
@functools.wraps(func)
def wrapper_type_check(*args, **kwargs):
if uri_type is not None:
if kwargs['uri'].object_type not in contain_uri_types(uri_type):
raise TypeError(f'uri not of required type {uri_type}, {kwargs["uri"].object_type}')
if uris_type is not None:
for uri in kwargs['uris']:
if uri.object_type not in contain_uri_types(uris_type):
raise TypeError(f'uri not of required type {uris_type}, {uri.object_type}')
return func(*args, **kwargs)
return wrapper_type_check
return decorator_type_check