concise method names, some generator filters

This commit is contained in:
aj 2020-08-12 09:28:41 +01:00
parent 2e6585325b
commit 10ee5501e8
10 changed files with 98 additions and 104 deletions

View File

@ -59,7 +59,7 @@ if __name__ == '__main__':
date = datetime.datetime.now()
playlists = network.get_user_playlists()
playlists = network.user_playlists()
if data['alarm']['use_month']:
playlisturi = next((i.uri for i in playlists if i.name == month.get_this_month()),
@ -67,10 +67,10 @@ if __name__ == '__main__':
else:
playlisturi = data['alarm']['uri']
network.play(uri=playlisturi, deviceid=network.get_device_id(data['alarm']['device_name']))
network.play(uri=playlisturi, deviceid=network.map_device_name_to_id(data['alarm']['device_name']))
network.set_shuffle(True)
network.set_volume(data['alarm']['volume'])
network.shuffle(True)
network.volume(data['alarm']['volume'])
network.next()
except Exception as e:

View File

@ -31,11 +31,11 @@ if __name__ == '__main__':
refresh_token=os.environ['SPOT_REFRESH'])).refresh_access_token()
try:
playlists = network.get_user_playlists()
playlists = network.user_playlists()
for playlist in playlists:
try:
playlist.tracks = network.get_playlist_tracks(uri=playlist.uri)
playlist.tracks = network.playlist_tracks(uri=playlist.uri)
csvwrite.export_playlist(playlist, totalpath)
except SpotifyNetworkException:
logger.exception(f'error occured during {playlist.name} track retrieval')

View File

@ -221,7 +221,7 @@ class PlaylistSource(TrackSource):
def append_user_playlists(self) -> None:
logger.info('appending user playlists')
playlists = self.net.get_playlists()
playlists = self.net.playlists()
if playlists and len(playlists) > 0:
self.playlists += playlists
else:
@ -231,7 +231,7 @@ class PlaylistSource(TrackSource):
playlist: FullPlaylist) -> None:
logger.info(f"pulling tracks for {playlist.name}")
tracks = self.net.get_playlist_tracks(uri=playlist.uri)
tracks = self.net.playlist_tracks(uri=playlist.uri)
if tracks and len(tracks) > 0:
playlist.tracks = tracks
else:
@ -240,7 +240,7 @@ class PlaylistSource(TrackSource):
def load(self) -> None:
logger.info('loading user playlists')
playlists = self.net.get_playlists()
playlists = self.net.playlists()
if playlists and len(playlists) > 0:
self.playlists = playlists
else:
@ -264,7 +264,7 @@ class PlaylistSource(TrackSource):
if playlist:
playlists.append(playlist)
else:
playlist = self.net.get_playlist(uri=uri)
playlist = self.net.playlist(uri=uri)
if playlist:
playlists.append(playlist)
self.playlists.append(playlist)
@ -306,7 +306,7 @@ class LibraryTrackSource(TrackSource):
def load(self) -> None:
logger.info('loading library tracks')
tracks = self.net.get_library_tracks()
tracks = self.net.saved_tracks()
if tracks and len(tracks) > 0:
self.tracks = tracks
else:
@ -361,9 +361,9 @@ class RecommendationSource(TrackSource):
if len(query_uris) > 0:
recommendations = self.net.get_recommendations(tracks=[i.object_id for i in query_uris
if i.object_type == Uri.ObjectType.track],
response_limit=params.recommendation_limit)
recommendations = self.net.recommendations(tracks=[i.object_id for i in query_uris
if i.object_type == Uri.ObjectType.track],
response_limit=params.recommendation_limit)
if recommendations and len(recommendations) > 0:
pass
else:

View File

@ -1,37 +1,30 @@
from typing import List
from typing import List, Union, Generator, Tuple
import logging
from spotframework.model.track import SimplifiedTrack, LibraryTrack, PlayedTrack, PlaylistTrack
logger = logging.getLogger(__name__)
def remove_local(tracks: List, include_malformed=True) -> List:
def remove_local(tracks: List, include_malformed=True) -> Generator[SimplifiedTrack, None, None]:
prop = 'is_local'
return_tracks = []
for track in tracks:
if hasattr(track, prop) and isinstance(getattr(track, prop), bool):
if getattr(track, prop) is False:
return_tracks.append(track)
yield track
else:
if include_malformed:
return_tracks.append(track)
return return_tracks
yield track
def get_track_objects(tracks: List) -> (List, List):
inner_tracks = []
whole_tracks = []
def get_track_objects(tracks: List) -> Generator[Tuple[SimplifiedTrack, Union[SimplifiedTrack,
PlaylistTrack,
PlayedTrack,
LibraryTrack]], None, None]:
for track in tracks:
if isinstance(track, SimplifiedTrack):
inner_tracks.append(track)
whole_tracks.append(track)
yield track, track
elif isinstance(track, (PlaylistTrack, PlayedTrack, LibraryTrack)):
inner_tracks.append(track.track)
whole_tracks.append(track)
yield track.track, track
else:
logger.warning(f'invalid type found for {track} ({type(track)}), discarding')
return inner_tracks, whole_tracks

View File

@ -1,35 +1,36 @@
import logging
from typing import List
from typing import Generator, Union, List
from datetime import datetime
from spotframework.model.track import PlaylistTrack, LibraryTrack
logger = logging.getLogger(__name__)
def added_before(tracks: List, boundary: datetime, include_malformed=True) -> List:
def added_before(tracks: List, boundary: datetime, include_malformed=True) -> Generator[Union[PlaylistTrack,
LibraryTrack],
None, None]:
prop = 'added_at'
return_tracks = []
for track in tracks:
if hasattr(track, prop) and isinstance(getattr(track, prop), datetime):
if getattr(track, prop) < boundary:
return_tracks.append(track)
yield track
else:
if include_malformed:
return_tracks.append(track)
return return_tracks
yield track
def added_after(tracks: List, boundary: datetime, include_malformed=True) -> List:
def added_after(tracks: List, boundary: datetime, include_malformed=True) -> Generator[Union[PlaylistTrack,
LibraryTrack],
None, None]:
prop = 'added_at'
return_tracks = []
for track in tracks:
if hasattr(track, prop) and isinstance(getattr(track, prop), datetime):
if getattr(track, prop) > boundary:
return_tracks.append(track)
yield track
else:
if include_malformed:
return_tracks.append(track)
return return_tracks
yield track

View File

@ -12,7 +12,7 @@ def deduplicate_by_id(tracks: List, include_malformed=True) -> List:
prop = 'uri'
return_tracks = []
for inner_track, whole_track in zip(*get_track_objects(tracks)):
for inner_track, whole_track in get_track_objects(tracks):
if hasattr(inner_track, prop) and isinstance(getattr(inner_track, prop), Uri):
if getattr(inner_track, prop) not in [getattr(i, prop) for i in return_tracks]:
return_tracks.append(whole_track)
@ -26,11 +26,11 @@ def deduplicate_by_id(tracks: List, include_malformed=True) -> List:
def deduplicate_by_name(tracks: List, include_malformed=True) -> List:
return_tracks = []
for inner_track, whole_track in zip(*get_track_objects(tracks)):
for inner_track, whole_track in get_track_objects(tracks):
if isinstance(inner_track, TrackFull):
to_check_artists = [i.name.lower() for i in inner_track.artists]
for index, (_inner_track, _whole_track) in enumerate(zip(*get_track_objects(return_tracks))):
for index, (_inner_track, _whole_track) in enumerate(get_track_objects(return_tracks)):
if inner_track.name.lower() == _inner_track.name.lower():
_track_artists = [i.name.lower() for i in _inner_track.artists]

View File

@ -11,7 +11,7 @@ logger = logging.getLogger(__name__)
def sort_by_popularity(tracks: List, reverse: bool = False) -> List:
prop = 'popularity'
return [j for i, j
in sorted([(k, l) for k, l in zip(*get_track_objects(tracks))
in sorted([(k, l) for k, l in get_track_objects(tracks)
if hasattr(k, prop) and isinstance(getattr(k, prop), int)],
key=lambda x: x[0].popularity, reverse=reverse
)
@ -38,7 +38,7 @@ def sort_by_added_date(tracks: List, reverse: bool = False) -> List:
def sort_artist_album_track_number(tracks: List, inner_tracks_only: bool = False) -> List:
sorted_tracks = sorted([(i, w) for i, w in zip(*get_track_objects(tracks))
sorted_tracks = sorted([(i, w) for i, w in get_track_objects(tracks)
if hasattr(i, 'album') and isinstance(getattr(i, 'album'), SimplifiedAlbum)],
key=lambda x: (x[0].album.artists[0].name.lower(),
x[0].album.name.lower(),

View File

@ -22,7 +22,7 @@ class Listener:
self.prev_now_playing: Optional[CurrentlyPlaying] = None
self.now_playing = None
try:
self.now_playing: Optional[CurrentlyPlaying] = net.get_player()
self.now_playing: Optional[CurrentlyPlaying] = net.player()
except SpotifyNetworkException:
logger.exception(f'error occured retrieving currently playing')
@ -33,7 +33,7 @@ class Listener:
logger.debug('updating now playing')
try:
live_now_playing = self.net.get_player()
live_now_playing = self.net.player()
if self.now_playing is None and live_now_playing is None:
return
@ -53,7 +53,7 @@ class Listener:
logger.debug('updating recent tracks')
try:
tracks = self.net.get_recently_played_tracks(response_limit=self.request_size)
tracks = self.net.recently_played_tracks(response_limit=self.request_size)
for track in tracks:
if track.played_at not in [i.played_at for i in self.recent_tracks]:
self.recent_tracks.append(track)

View File

@ -248,13 +248,13 @@ class Network:
return self
def refresh_user_info(self):
self.user.user = self.get_current_user()
self.user.user = self.current_user()
@inject_uri(uris=False)
@uri_type_check(uri_type=Uri.ObjectType.playlist)
def get_playlist(self,
uri: Uri,
tracks: bool = True) -> FullPlaylist:
def playlist(self,
uri: Uri,
tracks: bool = True) -> FullPlaylist:
"""get playlist object with tracks for uri
:param uri: target request uri
@ -310,7 +310,7 @@ class Network:
description=description)
return init_with_key_filter(FullPlaylist, req)
def get_playlists(self, response_limit: int = None) -> Optional[List[SimplifiedPlaylist]]:
def playlists(self, response_limit: int = None) -> Optional[List[SimplifiedPlaylist]]:
"""get current users playlists
:param response_limit: max playlists to return
@ -331,7 +331,7 @@ class Network:
return return_items
def get_library_albums(self, response_limit: int = None) -> Optional[List[LibraryAlbum]]:
def saved_albums(self, response_limit: int = None) -> Optional[List[LibraryAlbum]]:
"""get user library albums
:param response_limit: max albums to return
@ -352,7 +352,7 @@ class Network:
return return_items
def get_library_tracks(self, response_limit: int = None) -> Optional[List[LibraryTrack]]:
def saved_tracks(self, response_limit: int = None) -> Optional[List[LibraryTrack]]:
"""get user library tracks
:param response_limit: max tracks to return
@ -373,7 +373,7 @@ class Network:
return return_items
def get_user_playlists(self, response_limit: int = None) -> List[SimplifiedPlaylist]:
def user_playlists(self, response_limit: int = None) -> List[SimplifiedPlaylist]:
"""retrieve user owned playlists
:param response_limit: max playlists to return
@ -382,7 +382,7 @@ class Network:
logger.info('pulling all playlists')
playlists = self.get_playlists(response_limit=response_limit)
playlists = self.playlists(response_limit=response_limit)
if self.user.user is None:
logger.debug('no user info, refreshing for filter')
@ -395,9 +395,9 @@ class Network:
@inject_uri(uris=False)
@uri_type_check(uri_type=Uri.ObjectType.playlist)
def get_playlist_tracks(self,
uri: Uri,
response_limit: int = None) -> List[PlaylistTrack]:
def playlist_tracks(self,
uri: Uri,
response_limit: int = None) -> List[PlaylistTrack]:
"""get list of playlists tracks for uri
:param uri: target playlist uri
@ -421,9 +421,9 @@ class Network:
@inject_uri(uris=False)
@uri_type_check(uri_type=Uri.ObjectType.show)
def get_show_episodes(self,
uri: Uri,
response_limit: int = None) -> List[SimplifiedEpisode]:
def show_episodes(self,
uri: Uri,
response_limit: int = None) -> List[SimplifiedEpisode]:
"""get list of shows episodes for uri
:param uri: target show uri
@ -445,7 +445,7 @@ class Network:
return return_items
def get_available_devices(self) -> List[Device]:
def available_devices(self) -> List[Device]:
"""get users available devices"""
logger.info("polling available devices")
@ -456,10 +456,10 @@ class Network:
logger.error('no devices returned')
return [init_with_key_filter(Device, i) for i in resp['devices']]
def get_recently_played_tracks(self,
response_limit: int = None,
after: datetime.datetime = None,
before: datetime.datetime = None) -> Optional[List[PlayedTrack]]:
def recently_played_tracks(self,
response_limit: int = None,
after: datetime.datetime = None,
before: datetime.datetime = None) -> Optional[List[PlayedTrack]]:
"""get list of recently played tracks
:param response_limit: max number of tracks to return
@ -489,7 +489,7 @@ class Network:
return [init_with_key_filter(PlayedTrack, i) for i in pager.items]
def get_player(self) -> CurrentlyPlaying:
def player(self) -> CurrentlyPlaying:
"""get currently playing snapshot (player)"""
logger.info("polling player")
@ -497,7 +497,7 @@ class Network:
resp = self.get_request('me/player')
return init_with_key_filter(CurrentlyPlaying, resp)
def get_device_id(self, device_name: str) -> Optional[str]:
def map_device_name_to_id(self, device_name: str) -> Optional[str]:
"""return device id of device as searched for by name
:param device_name: target device name
@ -506,14 +506,14 @@ class Network:
logger.info(f"querying {device_name}")
devices = self.get_available_devices()
devices = self.available_devices()
device = next((i for i in devices if i.name == device_name), None)
if device:
return device.id
else:
logger.error(f'{device_name} not found')
def get_current_user(self) -> PublicUser:
def current_user(self) -> PublicUser:
logger.info(f"getting current user")
resp = self.get_request('me')
@ -586,7 +586,7 @@ class Network:
self.post_request('me/player/previous', params=params)
def set_shuffle(self, state: bool, deviceid: str = None):
def shuffle(self, state: bool, deviceid: str = None):
logger.info(f"{state}{' ' + deviceid if deviceid is not None else ''}")
@ -597,7 +597,7 @@ class Network:
return self.put_request('me/player/shuffle', params=params)
def set_volume(self, volume: int, deviceid: str = None):
def volume(self, volume: int, deviceid: str = None):
logger.info(f"{volume}{' ' + deviceid if deviceid is not None else ''}")
@ -662,10 +662,10 @@ class Network:
return snapshot_ids
def get_recommendations(self,
tracks: List[str] = None,
artists: List[str] = None,
response_limit=10) -> Optional[Recommendations]:
def recommendations(self,
tracks: List[str] = None,
artists: List[str] = None,
response_limit=10) -> Optional[Recommendations]:
logger.info(f'getting {response_limit} recommendations, '
f'tracks: {len(tracks) if tracks is not None else 0}, '
@ -739,7 +739,7 @@ class Network:
@inject_uri(uri=False)
@uri_type_check(uris_type=Uri.ObjectType.track)
def get_track_audio_features(self, uris: List[Uri]) -> Optional[List[AudioFeatures]]:
def track_audio_features(self, uris: List[Uri]) -> Optional[List[AudioFeatures]]:
logger.info(f'getting {len(uris)} features')
audio_features = []
@ -761,7 +761,7 @@ class Network:
logger.info(f'populating {len(tracks)} features')
if isinstance(tracks, SimplifiedTrack):
audio_features = self.get_track_audio_features(uris=[tracks.uri])
audio_features = self.track_audio_features(uris=[tracks.uri])
if audio_features:
if len(audio_features) == 1:
@ -774,7 +774,7 @@ class Network:
elif isinstance(tracks, List):
if all(isinstance(i, SimplifiedTrack) for i in tracks):
audio_features = self.get_track_audio_features(uris=[i.uri for i in tracks])
audio_features = self.track_audio_features(uris=[i.uri for i in tracks])
if audio_features:
if len(audio_features) != len(tracks):
@ -791,7 +791,7 @@ class Network:
@inject_uri(uri=False)
@uri_type_check(uris_type=Uri.ObjectType.track)
def get_tracks(self, uris: List[Uri]) -> List[TrackFull]:
def tracks(self, uris: List[Uri]) -> List[TrackFull]:
logger.info(f'getting {len(uris)} tracks')
@ -806,9 +806,9 @@ class Network:
@inject_uri(uris=False)
@uri_type_check(uri_type=Uri.ObjectType.track)
def get_track(self, uri) -> Optional[TrackFull]:
def track(self, uri) -> Optional[TrackFull]:
track = self.get_tracks(uris=[uri])
track = self.tracks(uris=[uri])
if len(track) == 1:
return track[0]
else:
@ -816,7 +816,7 @@ class Network:
@inject_uri(uri=False)
@uri_type_check(uris_type=Uri.ObjectType.album)
def get_albums(self, uris: List[Uri]) -> List[AlbumFull]:
def albums(self, uris: List[Uri]) -> List[AlbumFull]:
logger.info(f'getting {len(uris)} albums')
@ -831,9 +831,9 @@ class Network:
@inject_uri(uris=False)
@uri_type_check(uri_type=Uri.ObjectType.album)
def get_album(self, uri: Uri) -> Optional[AlbumFull]:
def album(self, uri: Uri) -> Optional[AlbumFull]:
album = self.get_albums(uris=[uri])
album = self.albums(uris=[uri])
if len(album) == 1:
return album[0]
else:
@ -841,7 +841,7 @@ class Network:
@inject_uri(uri=False)
@uri_type_check(uris_type=Uri.ObjectType.artist)
def get_artists(self, uris) -> List[ArtistFull]:
def artists(self, uris) -> List[ArtistFull]:
logger.info(f'getting {len(uris)} artists')
@ -856,9 +856,9 @@ class Network:
@inject_uri(uris=False)
@uri_type_check(uri_type=Uri.ObjectType.artist)
def get_artist(self, uri) -> Optional[ArtistFull]:
def artist(self, uri) -> Optional[ArtistFull]:
artist = self.get_artists(uris=[uri])
artist = self.artists(uris=[uri])
if len(artist) == 1:
return artist[0]
else:
@ -866,7 +866,7 @@ class Network:
@inject_uri(uri=False)
@uri_type_check(uris_type=Uri.ObjectType.show)
def get_shows(self, uris) -> List[SimplifiedShow]:
def shows(self, uris) -> List[SimplifiedShow]:
logger.info(f'getting {len(uris)} shows')
@ -881,7 +881,7 @@ class Network:
@inject_uri(uris=False)
@uri_type_check(uri_type=Uri.ObjectType.show)
def get_show(self, uri, episodes: bool = True) -> Optional[ShowFull]:
def show(self, uri, episodes: bool = True) -> Optional[ShowFull]:
logger.info(f"retrieving {uri}")
@ -903,7 +903,7 @@ class Network:
@inject_uri(uri=False)
@uri_type_check(uris_type=Uri.ObjectType.episode)
def get_episodes(self, uris) -> List[EpisodeFull]:
def episodes(self, uris) -> List[EpisodeFull]:
logger.info(f'getting {len(uris)} episodes')
@ -918,7 +918,7 @@ class Network:
@inject_uri(uris=False)
@uri_type_check(uri_type=Uri.ObjectType.episode)
def get_episode(self, uri) -> EpisodeFull:
def episode(self, uri) -> EpisodeFull:
logger.info(f"retrieving {uri}")

View File

@ -56,7 +56,7 @@ class Player:
@property
def available_devices(self):
try:
return self.net.get_available_devices()
return self.net.available_devices()
except SpotifyNetworkException as e:
logger.exception(f'error retrieving current devices')
raise e
@ -64,7 +64,7 @@ class Player:
@property
def status(self):
try:
new_status = self.net.get_player()
new_status = self.net.player()
if new_status:
self.last_status = new_status
return self.last_status
@ -164,9 +164,9 @@ class Player:
if isinstance(state, bool):
try:
if device is not None:
self.net.set_shuffle(deviceid=device.id, state=state)
self.net.shuffle(deviceid=device.id, state=state)
else:
self.net.set_shuffle(state=state)
self.net.shuffle(state=state)
except SpotifyNetworkException:
logger.exception(f'error setting shuffle')
@ -183,9 +183,9 @@ class Player:
if 0 <= int(value) <= 100:
try:
if device is not None:
self.net.set_volume(value, deviceid=device.id)
self.net.volume(value, deviceid=device.id)
else:
self.net.set_volume(value)
self.net.volume(value)
except SpotifyNetworkException:
logger.exception(f'error setting volume to {value}')
else: