fully objectified model

This commit is contained in:
aj 2019-09-04 17:45:26 +01:00
parent 2f840f879b
commit 0bd9fac8f4
29 changed files with 737 additions and 199 deletions

View File

@ -1,21 +0,0 @@
# This file specifies files that are *not* uploaded to Google Cloud Platform
# using gcloud. It follows the same syntax as .gitignore, with the addition of
# "#!include" directives (which insert the entries of the given .gitignore-style
# file at that point).
#
# For more information, run:
# $ gcloud topic gcloudignore
#
.gcloudignore
# If you would like to upload your .git directory, .gitignore file or files
# from your .gitignore file, remove the corresponding line
# below:
.git
.gitignore
env
.idea
.spot
node_modules
#!include:.gitignore

2
.gitignore vendored
View File

@ -3,3 +3,5 @@ __pycache__
*.csv *.csv
.idea .idea
.spot .spot
scratch.py

View File

@ -1,4 +1,4 @@
from spotframework.net.user import User from spotframework.net.user import NetworkUser
from spotframework.net.network import Network from spotframework.net.network import Network
import spotframework.net.const as const import spotframework.net.const as const
import spotframework.io.json as json import spotframework.io.json as json
@ -33,10 +33,10 @@ if __name__ == '__main__':
try: try:
network = Network(User(os.environ['SPOTCLIENT'], network = Network(NetworkUser(os.environ['SPOTCLIENT'],
os.environ['SPOTSECRET'], os.environ['SPOTSECRET'],
os.environ['SPOTACCESS'], os.environ['SPOTACCESS'],
os.environ['SPOTREFRESH'])) os.environ['SPOTREFRESH']))
found = False found = False

View File

@ -1,4 +1,4 @@
from spotframework.net.user import User from spotframework.net.user import NetworkUser
from spotframework.net.network import Network from spotframework.net.network import Network
import spotframework.io.csv as csvwrite import spotframework.io.csv as csvwrite
@ -21,14 +21,14 @@ if __name__ == '__main__':
try: try:
network = Network(User(os.environ['SPOTCLIENT'], network = Network(NetworkUser(os.environ['SPOTCLIENT'],
os.environ['SPOTSECRET'], os.environ['SPOTSECRET'],
os.environ['SPOTACCESS'], os.environ['SPOTACCESS'],
os.environ['SPOTREFRESH'])) os.environ['SPOTREFRESH']))
playlists = network.get_user_playlists() playlists = network.get_user_playlists()
for playlist in playlists: for playlist in playlists:
playlist.tracks = network.get_playlist_tracks(playlist.playlistid) playlist.tracks = network.get_playlist_tracks(playlist.playlist_id)
path = sys.argv[1] path = sys.argv[1]

View File

@ -1,13 +1,12 @@
import spotframework.net.const as const import spotframework.net.const as const
from spotframework.net.network import Network from spotframework.net.network import Network
from spotframework.net.user import User from spotframework.net.user import NetworkUser
import spotframework.io.json as json import spotframework.io.json as json
import spotframework.util.monthstrings as monthstrings import spotframework.util.monthstrings as monthstrings
from spotframework.engine.playlistengine import PlaylistEngine from spotframework.engine.playlistengine import PlaylistEngine
from spotframework.engine.filter.shuffle import Shuffle from spotframework.engine.filter.shuffle import Shuffle
from spotframework.engine.filter.sortreversereleasedate import SortReverseReleaseDate from spotframework.engine.filter.sort import SortReverseReleaseDate
from spotframework.engine.filter.deduplicatebyid import DeduplicateByID from spotframework.engine.filter.deduplicate import DeduplicateByID, DeduplicateByName
from spotframework.engine.filter.deduplicatebyname import DeduplicateByName
import os import os
import datetime import datetime
@ -115,10 +114,10 @@ def go():
logger.critical('none to execute, terminating') logger.critical('none to execute, terminating')
return return
net = Network(User(os.environ['SPOTCLIENT'], net = Network(NetworkUser(os.environ['SPOTCLIENT'],
os.environ['SPOTSECRET'], os.environ['SPOTSECRET'],
os.environ['SPOTACCESS'], os.environ['SPOTACCESS'],
os.environ['SPOTREFRESH'])) os.environ['SPOTREFRESH']))
engine = PlaylistEngine(net) engine = PlaylistEngine(net)
engine.load_user_playlists() engine.load_user_playlists()

View File

@ -1,13 +1,13 @@
from spotframework.net.user import User from spotframework.net.user import NetworkUser
from spotframework.net.network import Network from spotframework.net.network import Network
import os import os
if __name__ == '__main__': if __name__ == '__main__':
network = Network(User(os.environ['SPOTCLIENT'], network = Network(NetworkUser(os.environ['SPOTCLIENT'],
os.environ['SPOTSECRET'], os.environ['SPOTSECRET'],
os.environ['SPOTACCESS'], os.environ['SPOTACCESS'],
os.environ['SPOTREFRESH'])) os.environ['SPOTREFRESH']))
print(network.user.access_token) print(network.user.access_token)

View File

@ -0,0 +1,47 @@
from abc import ABC, abstractmethod
from typing import List
from spotframework.model.track import Track
class AbstractProcessor(ABC):
def __init__(self, names: List[str] = None):
self.playlist_names = names
def has_targets(self):
if self.playlist_names:
return True
else:
return False
@abstractmethod
def process(self, tracks: List[Track]):
pass
class AbstractTestFilter(AbstractProcessor, ABC):
def __init__(self,
names: List[str] = None,
keep_failed: bool = True):
super().__init__(names)
self.keep_failed = keep_failed
@abstractmethod
def logic_test(self, track: Track):
pass
def process(self, tracks: List[Track]):
return_tracks = []
malformed_tracks = []
for track in tracks:
if self.logic_test(track):
return_tracks.append(track)
else:
malformed_tracks.append(track)
if self.keep_failed:
return_tracks += malformed_tracks
return return_tracks

View File

@ -1,11 +0,0 @@
from abc import ABC, abstractmethod
class AbstractProcessor(ABC):
def __init__(self, names=[]):
self.playlist_names = names
@abstractmethod
def process(self, tracks):
pass

View File

@ -0,0 +1,47 @@
from abc import ABC, abstractmethod
from .abstract import AbstractProcessor
import datetime
from typing import List
from spotframework.model.track import Track, PlaylistTrack
class Added(AbstractProcessor, ABC):
def __init__(self,
boundary: datetime.datetime,
names: List[str] = None,
keep_malformed_type: bool = True):
super().__init__(names)
self.boundary = boundary
self.keep_malformed_type = keep_malformed_type
@abstractmethod
def check_date(self, track: PlaylistTrack):
pass
def process(self, tracks: List[Track]):
return_tracks = []
malformed_tracks = []
for track in tracks:
if isinstance(track, PlaylistTrack):
if self.check_date(track):
return_tracks.append(track)
else:
malformed_tracks.append(track)
if self.keep_malformed_type:
return_tracks += malformed_tracks
return return_tracks
class AddedBefore(Added):
def check_date(self, track: PlaylistTrack):
return track.added_at < self.boundary
class AddedSince(Added):
def check_date(self, track: PlaylistTrack):
return track.added_at > self.boundary

View File

@ -1,17 +0,0 @@
from .abstractprocessor import AbstractProcessor
import datetime
class AddedBefore(AbstractProcessor):
def __init__(self, boundary, names=[]):
super().__init__(names)
self.boundary = boundary
def check_date(self, track):
added_at = datetime.datetime.fromisoformat(track['added_at'].replace('T', ' ').replace('Z', ''))
return added_at < self.boundary
def process(self, tracks):
return [i for i in tracks if self.check_date(i)]

View File

@ -1,17 +0,0 @@
from .abstractprocessor import AbstractProcessor
import datetime
class AddedSince(AbstractProcessor):
def __init__(self, boundary, names=[]):
super().__init__(names)
self.boundary = boundary
def check_date(self, track):
added_at = datetime.datetime.fromisoformat(track['added_at'].replace('T', ' ').replace('Z', ''))
return added_at > self.boundary
def process(self, tracks):
return [i for i in tracks if self.check_date(i)]

View File

@ -0,0 +1,45 @@
from spotframework.engine.filter.abstract import AbstractProcessor
from typing import List
from spotframework.model.track import Track, SpotifyTrack
class DeduplicateByID(AbstractProcessor):
def __init__(self,
names: List[str] = None,
keep_malformed_type: bool = True):
super().__init__(names)
self.keep_malformed_type = keep_malformed_type
def process(self, tracks: List[Track]):
return_tracks = []
malformed_tracks = []
for track in tracks:
if isinstance(track, SpotifyTrack):
if track.uri not in [i.uri for i in return_tracks]:
return_tracks.append(track)
else:
malformed_tracks.append(track)
if self.keep_malformed_type:
return_tracks += malformed_tracks
return return_tracks
class DeduplicateByName(AbstractProcessor):
def process(self, tracks: List[Track]):
return_tracks = []
for to_check in tracks:
for cache_track in return_tracks:
if to_check.name.lower() == cache_track.name.lower():
if to_check.artists[0].name.lower() == cache_track.artists[0].name.lower():
break
else:
return_tracks.append(to_check)
return return_tracks

View File

@ -1,13 +0,0 @@
from .abstractprocessor import AbstractProcessor
class DeduplicateByID(AbstractProcessor):
def process(self, tracks):
return_tracks = []
for track in tracks:
if track['track']['uri'] not in [i['track']['uri'] for i in return_tracks]:
return_tracks.append(track)
return return_tracks

View File

@ -1,19 +0,0 @@
from .abstractprocessor import AbstractProcessor
class DeduplicateByName(AbstractProcessor):
def process(self, tracks):
return_tracks = []
for to_check in tracks:
for cache_track in return_tracks:
if to_check['track']['name'].lower() == cache_track['track']['name'].lower():
if to_check['track']['artists'][0]['name'].lower() \
== cache_track['track']['artists'][0]['name'].lower():
break
else:
return_tracks.append(to_check)
return return_tracks

View File

@ -1,17 +0,0 @@
from .abstractprocessor import AbstractProcessor
import random
class RandomSample(AbstractProcessor):
def __init__(self, sample_size, names=[]):
super().__init__(names)
self.sample_size = sample_size
def process(self, tracks):
return_tracks = list(tracks)
random.shuffle(return_tracks)
return return_tracks[:self.sample_size]

View File

@ -1,9 +1,23 @@
from .abstractprocessor import AbstractProcessor from .abstract import AbstractProcessor
import random import random
from typing import List
from spotframework.model.track import Track
class Shuffle(AbstractProcessor): class Shuffle(AbstractProcessor):
def process(self, tracks): def process(self, tracks: List[Track]):
random.shuffle(tracks) random.shuffle(tracks)
return tracks return tracks
class RandomSample(Shuffle):
def __init__(self,
sample_size: int,
names: List[str] = None):
super().__init__(names)
self.sample_size = sample_size
def process(self, tracks: List[Track]):
return super().process(tracks)[:self.sample_size]

View File

@ -0,0 +1,31 @@
from .abstract import AbstractProcessor
from typing import List
from spotframework.model.track import Track
class SortReverseReleaseDate(AbstractProcessor):
def process(self, tracks: List[Track]):
tracks.sort(key=lambda x: x.album.release_date, reverse=True)
return tracks
class SortReleaseDate(AbstractProcessor):
def process(self, tracks: List[Track]):
tracks.sort(key=lambda x: x.album.release_date, reverse=False)
return tracks
class SortArtistName(AbstractProcessor):
def process(self, tracks: List[Track]):
tracks.sort(key=lambda x: x.artists[0].name, reverse=False)
return tracks
class SortReverseArtistName(AbstractProcessor):
def process(self, tracks: List[Track]):
tracks.sort(key=lambda x: x.artists[0].name, reverse=True)
return tracks

View File

@ -1,8 +0,0 @@
from .abstractprocessor import AbstractProcessor
class SortReverseReleaseDate(AbstractProcessor):
def process(self, tracks):
tracks.sort(key=lambda x: x['track']['album']['release_date'], reverse=True)
return tracks

View File

@ -3,14 +3,21 @@ import os
import logging import logging
import spotframework.util.monthstrings as monthstrings import spotframework.util.monthstrings as monthstrings
from spotframework.engine.filter.addedsince import AddedSince from spotframework.engine.filter.added import AddedSince
from typing import List
from spotframework.model.track import SpotifyTrack
from spotframework.model.playlist import Playlist
from spotframework.net.network import Network
from spotframework.engine.filter.abstract import AbstractProcessor
from datetime import datetime
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class PlaylistEngine: class PlaylistEngine:
def __init__(self, net): def __init__(self, net: Network):
self.playlists = [] self.playlists = []
self.net = net self.net = net
@ -32,16 +39,24 @@ class PlaylistEngine:
else: else:
logger.error('error getting playlists') logger.error('error getting playlists')
def get_playlist_tracks(self, playlist): def get_playlist_tracks(self,
playlist: Playlist):
logger.info(f"pulling tracks for {playlist.name}") logger.info(f"pulling tracks for {playlist.name}")
tracks = self.net.get_playlist_tracks(playlist.playlistid) tracks = self.net.get_playlist_tracks(playlist.playlist_id)
if tracks and len(tracks) > 0: if tracks and len(tracks) > 0:
playlist.tracks = tracks playlist.tracks = tracks
else: else:
logger.error('error getting tracks') logger.error('error getting tracks')
def make_playlist(self, playlist_parts, processors=[], include_recommendations=False, recommendation_limit=10): def make_playlist(self,
playlist_parts: List[str],
processors: List[AbstractProcessor] = None,
include_recommendations: bool = False,
recommendation_limit: int = 10):
if processors is None:
processors = []
tracks = [] tracks = []
@ -56,39 +71,41 @@ class PlaylistEngine:
playlist_tracks = list(play.tracks) playlist_tracks = list(play.tracks)
for processor in [i for i in processors if play.name in [j for j in i.playlist_names]]: for processor in [i for i in processors if i.has_targets()]:
playlist_tracks = processor.process(playlist_tracks) if play.name in [i for i in processor.playlist_names]:
playlist_tracks = processor.process(playlist_tracks)
tracks += [i for i in playlist_tracks if i['is_local'] is False] tracks += [i for i in playlist_tracks if i.is_local is False]
else: else:
logger.warning(f"requested playlist {part} not found") logger.warning(f"requested playlist {part} not found")
if 'SLACKHOOK' in os.environ: if 'SLACKHOOK' in os.environ:
requests.post(os.environ['SLACKHOOK'], json={"text": f"spot playlists: {part} not found"}) requests.post(os.environ['SLACKHOOK'], json={"text": f"spot playlists: {part} not found"})
for processor in [i for i in processors if len(i.playlist_names) <= 0]: for processor in [i for i in processors if i.has_targets() is False]:
tracks = processor.process(tracks) tracks = processor.process(tracks)
tracks = [i['track'] for i in tracks]
if include_recommendations: if include_recommendations:
recommendations = self.net.get_recommendations(tracks=[i['id'] for i in tracks], recommendations = self.net.get_recommendations(tracks=[i.spotify_id for i in tracks],
response_limit=recommendation_limit) response_limit=recommendation_limit)
if recommendations and len(recommendations) > 0: if recommendations and len(recommendations) > 0:
tracks += recommendations['tracks'] tracks += recommendations
else: else:
logger.error('error getting recommendations') logger.error('error getting recommendations')
return tracks return tracks
def get_recent_playlist(self, def get_recent_playlist(self,
boundary_date, boundary_date: datetime,
recent_playlist_parts, recent_playlist_parts: List[str],
processors=[], processors: List[AbstractProcessor] = None,
include_recommendations=False, include_recommendations: bool = False,
recommendation_limit=10, recommendation_limit: int = 10,
add_this_month=False, add_this_month: bool = False,
add_last_month=False): add_last_month: bool = False):
if processors is None:
processors = []
this_month = monthstrings.get_this_month() this_month = monthstrings.get_this_month()
last_month = monthstrings.get_last_month() last_month = monthstrings.get_last_month()
@ -110,16 +127,22 @@ class PlaylistEngine:
include_recommendations=include_recommendations, include_recommendations=include_recommendations,
recommendation_limit=recommendation_limit) recommendation_limit=recommendation_limit)
def execute_playlist(self, tracks, playlist_id): def execute_playlist(self,
tracks: List[SpotifyTrack],
playlist_id: str):
resp = self.net.replace_playlist_tracks(playlist_id, [i['uri'] for i in tracks]) resp = self.net.replace_playlist_tracks(playlist_id, [i.uri for i in tracks])
if resp: if resp:
return resp return resp
else: else:
logger.error('error executing') logger.error('error executing')
return None return None
def change_description(self, playlistparts, playlist_id, overwrite=None, suffix=None): def change_description(self,
playlistparts: List[str],
playlist_id: str,
overwrite: bool = None,
suffix: str = None):
if overwrite: if overwrite:
string = overwrite string = overwrite

View File

@ -24,15 +24,15 @@ def export_playlist(playlist, path, name=None):
for track in playlist.tracks: for track in playlist.tracks:
trackdict = { trackdict = {
'name':track['track']['name'], 'name':track.name,
'album':track['track']['album']['name'], 'album':track.album.name,
'added':track['added_at'], 'added':track.added_at,
'track id':track['track']['id'], 'track id':track.spotify_id,
'album id':track['track']['album']['id'], 'album id':track.album.spotify_id,
'added by':track['added_by']['id']} 'added by':track.added_by.username}
trackdict['album artist'] = ', '.join(x['name'] for x in track['track']['album']['artists']) trackdict['album artist'] = ', '.join(x.name for x in track.album.artists)
trackdict['artist'] = ', '.join(x['name'] for x in track['track']['artists']) trackdict['artist'] = ', '.join(x.name for x in track.artists)
writer.writerow(trackdict) writer.writerow(trackdict)

View File

@ -0,0 +1,50 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from typing import List
if TYPE_CHECKING:
from spotframework.model.artist import Artist
class Album:
def __init__(self, name: str, artists: List[Artist]):
self.name = name
self.artists = artists
def __str__(self):
artists = ' , '.join([i.name for i in self.artists]) if self.artists else 'n/a'
return f'{self.name} / {artists}'
class SpotifyAlbum(Album):
def __init__(self,
name: str,
artists: List[Artist],
href: str = None,
spotify_id: str = None,
uri: str = None,
genres: List[str] = None,
tracks: List = None,
release_date: str = None,
release_date_precision: str = None,
label: str = None,
popularity: int = None
):
super().__init__(name, artists)
self.href = href
self.spotify_id = spotify_id
self.uri = uri
self.genres = genres
self.tracks = tracks
self.release_date = release_date
self.release_date_precision = release_date_precision
self.label = label
self.popularity = popularity

View File

@ -0,0 +1,32 @@
from typing import List
class Artist:
def __init__(self, name: str):
self.name = name
def __str__(self):
return f'{self.name}'
class SpotifyArtist(Artist):
def __init__(self,
name: str,
href: str = None,
spotify_id: str = None,
uri: str = None,
genres: List[str] = None,
popularity: int = None
):
super().__init__(name)
self.href = href
self.spotify_id = spotify_id
self.uri = uri
self.genres = genres
self.popularity = popularity

View File

@ -1,14 +1,35 @@
from spotframework.model.user import User
class Playlist: class Playlist:
def __init__(self, playlistid, uri=None, name=None, userid=None): def __init__(self,
playlistid: str,
name: str = None,
owner: User = None,
description: str = None,
href: str = None,
uri: str = None,
collaborative: bool = None,
public: bool = None,
ext_spotify: str = None):
self.tracks = [] self.tracks = []
self.name = name self.name = name
self.playlistid = playlistid
self.userid = userid self.playlist_id = playlistid
self.owner = owner
self.description = description
self.href = href
self.uri = uri self.uri = uri
self.collaborative = collaborative
self.public = public
self.ext_spotify = ext_spotify
def has_tracks(self): def has_tracks(self):
if len(self.tracks) > 0: if len(self.tracks) > 0:
return True return True

View File

@ -0,0 +1,100 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from typing import List
from datetime import datetime
if TYPE_CHECKING:
from spotframework.model.album import Album
from spotframework.model.artist import Artist
from spotframework.model.user import User
class Track:
def __init__(self,
name: str,
album: Album,
artists: List[Artist],
disc_number: int = None,
duration_ms: int = None,
excplicit: bool = None
):
self.name = name
self.album = album
self.artists = artists
self.disc_number = disc_number
self.duration_ms = duration_ms
self.explicit = excplicit
def __str__(self):
album = self.album.name if self.album else 'n/a'
artists = ' , '.join([i.name for i in self.artists]) if self.artists else 'n/a'
return f'{self.name} / {album} / {artists}'
class SpotifyTrack(Track):
def __init__(self,
name: str,
album: Album,
artists: List[Artist],
href: str = None,
spotify_id: str = None,
uri: str = None,
disc_number: int = None,
duration_ms: int = None,
explicit: bool = None,
is_playable: bool = None,
popularity: int = None
):
super().__init__(name=name, album=album, artists=artists,
disc_number=disc_number,
duration_ms=duration_ms,
excplicit=explicit)
self.href = href
self.spotify_id = spotify_id
self.uri = uri
self.is_playable = is_playable
self.popularity = popularity
class PlaylistTrack(SpotifyTrack):
def __init__(self,
name: str,
album: Album,
artists: List[Artist],
added_at: str,
added_by: User,
is_local: bool,
href: str = None,
spotify_id: str = None,
uri: str = None,
disc_number: int = None,
duration_ms: int = None,
explicit: bool = None,
is_playable: bool = None,
popularity: int = None
):
super().__init__(name=name, album=album, artists=artists,
href=href,
spotify_id=spotify_id,
uri=uri,
disc_number=disc_number,
duration_ms=duration_ms,
explicit=explicit,
is_playable=is_playable,
popularity=popularity)
self.added_at = datetime.fromisoformat(added_at.replace('T', ' ').replace('Z', ''))
self.added_by = added_by
self.is_local = is_local

View File

@ -0,0 +1,21 @@
class User:
def __init__(self,
username: str,
href: str = None,
uri: str = None,
display_name: str = None,
ext_spotify: str = None):
self.username = username
self.href = href
self.uri = uri
self.display_name = display_name
self.ext_spotify = ext_spotify
def __str__(self):
return f'{self.username}'

View File

@ -2,7 +2,9 @@ import requests
import random import random
import logging import logging
import time import time
from typing import List
from . import const from . import const
from spotframework.net.parse import parse
from spotframework.model.playlist import Playlist from spotframework.model.playlist import Playlist
limit = 50 limit = 50
@ -96,7 +98,7 @@ class Network:
return None return None
def get_playlist(self, playlistid): def get_playlist(self, playlistid: str):
logger.info(f"{playlistid}") logger.info(f"{playlistid}")
@ -140,14 +142,7 @@ class Network:
if resp: if resp:
for responseplaylist in resp['items']: for responseplaylist in resp['items']:
playlists.append(parse.parse_playlist(responseplaylist))
playlist = Playlist(responseplaylist['id'], responseplaylist['uri'])
playlist.name = responseplaylist['name']
playlist.userid = responseplaylist['owner']['id']
playlists.append(playlist)
# playlists = playlists + resp['items']
if resp.get('next', None): if resp.get('next', None):
more_playlists = self.get_playlists(offset + limit) more_playlists = self.get_playlists(offset + limit)
@ -167,7 +162,7 @@ class Network:
playlists = self.get_playlists() playlists = self.get_playlists()
if playlists: if playlists:
return list(filter(lambda x: x.userid == self.user.username, playlists)) return list(filter(lambda x: x.owner.username == self.user.username, playlists))
else: else:
logger.error('no playlists returned to filter') logger.error('no playlists returned to filter')
return None return None
@ -184,7 +179,7 @@ class Network:
if resp: if resp:
if resp.get('items', None): if resp.get('items', None):
tracks += resp['items'] tracks += [parse.parse_track(i) for i in resp.get('items', None)]
else: else:
logger.warning(f'{playlistid} no items returned') logger.warning(f'{playlistid} no items returned')
@ -366,7 +361,7 @@ class Network:
logger.error('error updating details') logger.error('error updating details')
return None return None
def add_playlist_tracks(self, playlistid, uris): def add_playlist_tracks(self, playlistid: str, uris: List[str]):
logger.info(f"{playlistid}") logger.info(f"{playlistid}")
@ -410,7 +405,34 @@ class Network:
else: else:
resp = self._make_get_request('getRecommendations', 'recommendations', params=params) resp = self._make_get_request('getRecommendations', 'recommendations', params=params)
if resp: if resp:
return resp if 'tracks' in resp:
return [parse.parse_track(i) for i in resp['tracks']]
else:
logger.error('no tracks returned')
return None
else: else:
logger.error('error getting recommendations') logger.error('error getting recommendations')
return None return None
def write_playlist_object(self,
playlist: Playlist,
append_tracks: bool = False):
if playlist.playlist_id:
if playlist.tracks == -1:
self.replace_playlist_tracks(playlist.playlist_id, [])
elif playlist.tracks:
if append_tracks:
self.add_playlist_tracks(playlist.playlist_id, [i.uri for i in playlist.tracks])
else:
self.replace_playlist_tracks(playlist.playlist_id, [i.uri for i in playlist.tracks])
if playlist.name or playlist.collaborative or playlist.public or playlist.description:
self.change_playlist_details(playlist.playlist_id,
playlist.name,
playlist.public,
playlist.collaborative,
playlist.description)
else:
logger.error('playlist has no id')

View File

View File

@ -0,0 +1,186 @@
from spotframework.model.artist import Artist, SpotifyArtist
from spotframework.model.album import Album, SpotifyAlbum
from spotframework.model.track import Track, SpotifyTrack, PlaylistTrack
from spotframework.model.playlist import Playlist
from spotframework.model.user import User
def parse_artist(artist_dict) -> Artist:
name = artist_dict.get('name', None)
href = artist_dict.get('href', None)
spotify_id = artist_dict.get('id', None)
uri = artist_dict.get('uri', None)
genres = artist_dict.get('genres', None)
popularity = artist_dict.get('popularity', None)
if name is None:
raise KeyError('artist name not found')
return SpotifyArtist(name,
href=href,
spotify_id=spotify_id,
uri=uri,
genres=genres,
popularity=popularity)
def parse_album(album_dict) -> Album:
name = album_dict.get('name', None)
if name is None:
raise KeyError('album name not found')
artists = [parse_artist(i) for i in album_dict.get('artists', [])]
href = album_dict.get('href', None)
spotify_id = album_dict.get('id', None)
uri = album_dict.get('uri', None)
genres = album_dict.get('genres', None)
tracks = [parse_track(i) for i in album_dict.get('tracks', [])]
release_date = album_dict.get('release_date', None)
release_date_precision = album_dict.get('release_date_precision', None)
label = album_dict.get('label', None)
popularity = album_dict.get('popularity', None)
return SpotifyAlbum(name=name,
artists=artists,
href=href,
spotify_id=spotify_id,
uri=uri,
genres=genres,
tracks=tracks,
release_date=release_date,
release_date_precision=release_date_precision,
label=label,
popularity=popularity)
def parse_track(track_dict) -> Track:
if 'track' in track_dict:
track = track_dict.get('track', None)
else:
track = track_dict
name = track.get('name', None)
if name is None:
raise KeyError('track name not found')
if track.get('album', None):
album = parse_album(track['album'])
else:
album = None
# print(album.name)
artists = [parse_artist(i) for i in track.get('artists', [])]
href = track.get('href', None)
spotify_id = track.get('id', None)
uri = track.get('uri', None)
disc_number = track.get('disc_number', None)
duration_ms = track.get('duration_ms', None)
explicit = track.get('explicit', None)
is_playable = track.get('is_playable', None)
popularity = track.get('popularity', None)
added_by = parse_user(track_dict.get('added_by')) if track_dict.get('added_by', None) else None
added_at = track_dict.get('added_at', None)
is_local = track_dict.get('is_local', None)
# print(album.name)
if added_at or added_by or is_local:
return PlaylistTrack(name=name,
album=album,
artists=artists,
added_at=added_at,
added_by=added_by,
is_local=is_local,
href=href,
spotify_id=spotify_id,
uri=uri,
disc_number=disc_number,
duration_ms=duration_ms,
explicit=explicit,
is_playable=is_playable,
popularity=popularity)
else:
return SpotifyTrack(name=name,
album=album,
artists=artists,
href=href,
spotify_id=spotify_id,
uri=uri,
disc_number=disc_number,
duration_ms=duration_ms,
explicit=explicit,
is_playable=is_playable,
popularity=popularity)
def parse_user(user_dict):
display_name = user_dict.get('display_name', None)
spotify_id = user_dict.get('id', None)
href = user_dict.get('href', None)
uri = user_dict.get('uri', None)
return User(spotify_id,
href=href,
uri=uri,
display_name=display_name)
def parse_playlist(playlist_dict):
collaborative = playlist_dict.get('collaborative', None)
ext_spotify = None
if playlist_dict.get('external_urls', None):
if playlist_dict['external_urls'].get('spotify', None):
ext_spotify = playlist_dict['external_urls']['spotify']
href = playlist_dict.get('href', None)
playlist_id = playlist_dict.get('id', None)
description = playlist_dict.get('description', None)
name = playlist_dict.get('name', None)
if playlist_dict.get('owner', None):
owner = parse_user(playlist_dict.get('owner'))
else:
owner = None
public = playlist_dict.get('public', None)
uri = playlist_dict.get('uri', None)
return Playlist(playlistid=playlist_id,
name=name,
owner=owner,
description=description,
href=href,
uri=uri,
collaborative=collaborative,
public=public,
ext_spotify=ext_spotify)

View File

@ -1,13 +1,16 @@
import requests import requests
from spotframework.model.user import User
from base64 import b64encode from base64 import b64encode
import logging import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class User: class NetworkUser(User):
def __init__(self, client_id, client_secret, access_token, refresh_token): def __init__(self, client_id, client_secret, access_token, refresh_token):
super().__init__('')
self.accesstoken = access_token self.accesstoken = access_token
self.refreshtoken = refresh_token self.refreshtoken = refresh_token
@ -15,8 +18,7 @@ class User:
self.client_secret = client_secret self.client_secret = client_secret
self.refresh_token() self.refresh_token()
self.refresh_info()
self.username = self.get_info()['id']
def refresh_token(self): def refresh_token(self):
@ -33,6 +35,25 @@ class User:
else: else:
logger.error(f'http error {req.status_code}') logger.error(f'http error {req.status_code}')
def refresh_info(self):
info = self.get_info()
if info.get('display_name', None):
self.display_name = info['display_name']
if info.get('external_urls', None):
if info['external_urls'].get('spotify', None):
self.ext_spotify = info['external_urls']['spotify']
if info.get('href', None):
self.href = info['href']
if info.get('id', None):
self.username = info['id']
if info.get('uri', None):
self.uri = info['uri']
def get_info(self): def get_info(self):
headers = {'Authorization': 'Bearer %s' % self.accesstoken} headers = {'Authorization': 'Bearer %s' % self.accesstoken}