from google.cloud import firestore import logging from datetime import timedelta, datetime, timezone from typing import List, Optional from werkzeug.security import generate_password_hash from spotframework.net.network import Network as SpotifyNetwork from spotframework.model.uri import Uri from fmframework.net.network import Network as FmNetwork from music.db.user import DatabaseUser from music.model.user import User from music.model.playlist import Playlist, RecentsPlaylist, LastFMChartPlaylist, Sort from music.model.stats import Stats db = firestore.Client() logger = logging.getLogger(__name__) def refresh_token_database_callback(user): if isinstance(user, DatabaseUser): user_obj = get_user(user.user_id) user_obj.update_database({ 'access_token': user.access_token, 'refresh_token': user.refresh_token, 'last_refreshed': user.last_refreshed, 'token_expiry': user.token_expiry }) logger.debug(f'{user.user_id} database entry updated') else: logger.error('user has no attached id') def get_authed_spotify_network(username): user = get_user(username) if user is not None: if user.spotify_linked: spotify_keys = db.document('key/spotify').get().to_dict() user_obj = DatabaseUser(client_id=spotify_keys['clientid'], client_secret=spotify_keys['clientsecret'], refresh_token=user.refresh_token, user_id=username, access_token=user.access_token) user_obj.on_refresh.append(refresh_token_database_callback) if user.last_refreshed + timedelta(seconds=user.token_expiry - 1) \ < datetime.now(timezone.utc): user_obj.refresh_access_token() user_obj.refresh_info() return SpotifyNetwork(user_obj) else: logger.error('user spotify not linked') else: logger.error(f'user {username} not found') def get_authed_lastfm_network(username): user = get_user(username) if user: if user.lastfm_username: fm_keys = db.document('key/fm').get().to_dict() return FmNetwork(username=user.lastfm_username, api_key=fm_keys['clientid']) else: logger.error(f'{username} has no last.fm username') else: logger.error(f'user {username} not found') def get_users() -> List[User]: logger.info('retrieving users') return [parse_user_reference(user_snapshot=i) for i in db.collection(u'spotify_users').stream()] def get_user(username: str) -> Optional[User]: logger.info(f'retrieving {username}') users = [i for i in db.collection(u'spotify_users').where(u'username', u'==', username).stream()] if len(users) == 0: logger.error(f'user {username} not found') return None if len(users) > 1: logger.critical(f"multiple {username}'s found") return None return parse_user_reference(user_snapshot=users[0]) def parse_user_reference(user_ref=None, user_snapshot=None) -> User: if user_ref is None and user_snapshot is None: raise ValueError('no user object supplied') if user_ref is None: user_ref = user_snapshot.reference if user_snapshot is None: user_snapshot = user_ref.get() user_dict = user_snapshot.to_dict() return User(username=user_dict.get('username'), password=user_dict.get('password'), db_ref=user_ref, email=user_dict.get('email'), user_type=User.Type[user_dict.get('type')], last_login=user_dict.get('last_login'), last_refreshed=user_dict.get('last_refreshed'), locked=user_dict.get('locked'), validated=user_dict.get('validated'), spotify_linked=user_dict.get('spotify_linked'), access_token=user_dict.get('access_token'), refresh_token=user_dict.get('refresh_token'), token_expiry=user_dict.get('token_expiry'), lastfm_username=user_dict.get('lastfm_username')) def update_user(username: str, updates: dict) -> None: logger.debug(f'updating {username}') users = [i for i in db.collection(u'spotify_users').where(u'username', u'==', username).stream()] if len(users) == 0: logger.error(f'user {username} not found') return None if len(users) > 1: logger.critical(f"multiple {username}'s found") return None user = users[0].reference user.update(updates) def create_user(username: str, password: str): db.collection(u'spotify_users').add({ 'access_token': None, 'email': None, 'last_login': datetime.utcnow(), 'last_refreshed': None, 'locked': False, 'password': generate_password_hash(password), 'refresh_token': None, 'spotify_linked': False, 'type': 'user', 'username': username, 'validated': True }) def get_user_playlists(username: str) -> List[Playlist]: logger.info(f'getting playlists for {username}') user = get_user(username) if user: playlist_refs = [i for i in user.db_ref.collection(u'playlists').stream()] return [parse_playlist_reference(username=username, playlist_snapshot=i) for i in playlist_refs] else: logger.error(f'user {username} not found') def get_playlist(username: str = None, name: str = None) -> Optional[Playlist]: logger.info(f'retrieving {name} for {username}') user = get_user(username) if user: playlists = [i for i in user.db_ref.collection(u'playlists').where(u'name', u'==', name).stream()] if len(playlists) == 0: logger.error(f'playlist {name} for {user} not found') return None if len(playlists) > 1: logger.critical(f"multiple {name}'s for {user} found") return None return parse_playlist_reference(username=username, playlist_snapshot=playlists[0]) else: logger.error(f'user {username} not found') def parse_playlist_reference(username, playlist_ref=None, playlist_snapshot=None) -> Playlist: if playlist_ref is None and playlist_snapshot is None: raise ValueError('no playlist object supplied') if playlist_ref is None: playlist_ref = playlist_snapshot.reference if playlist_snapshot is None: playlist_snapshot = playlist_ref.get() playlist_dict = playlist_snapshot.to_dict() if playlist_dict.get('type') == 'default': return Playlist(uri=playlist_dict.get('uri'), name=playlist_dict.get('name'), username=username, db_ref=playlist_ref, include_recommendations=playlist_dict.get('include_recommendations', False), recommendation_sample=playlist_dict.get('recommendation_sample', 0), include_library_tracks=playlist_dict.get('include_library_tracks', False), parts=playlist_dict.get('parts'), playlist_references=playlist_dict.get('playlist_references'), shuffle=playlist_dict.get('shuffle'), sort=Sort[playlist_dict.get('sort', 'release_date')], description_overwrite=playlist_dict.get('description_overwrite'), description_suffix=playlist_dict.get('description_suffix'), lastfm_stat_count=playlist_dict.get('lastfm_stat_count', 0), lastfm_stat_album_count=playlist_dict.get('lastfm_stat_album_count', 0), lastfm_stat_artist_count=playlist_dict.get('lastfm_stat_artist_count', 0), lastfm_stat_percent=playlist_dict.get('lastfm_stat_percent', 0), lastfm_stat_album_percent=playlist_dict.get('lastfm_stat_album_percent', 0), lastfm_stat_artist_percent=playlist_dict.get('lastfm_stat_artist_percent', 0), lastfm_stat_last_refresh=playlist_dict.get('lastfm_stat_last_refresh')) elif playlist_dict.get('type') == 'recents': return RecentsPlaylist(uri=playlist_dict.get('uri'), name=playlist_dict.get('name'), username=username, db_ref=playlist_ref, include_recommendations=playlist_dict.get('include_recommendations', False), recommendation_sample=playlist_dict.get('recommendation_sample', 0), include_library_tracks=playlist_dict.get('include_library_tracks', False), parts=playlist_dict.get('parts'), playlist_references=playlist_dict.get('playlist_references'), shuffle=playlist_dict.get('shuffle'), sort=Sort[playlist_dict.get('sort', 'release_date')], description_overwrite=playlist_dict.get('description_overwrite'), description_suffix=playlist_dict.get('description_suffix'), lastfm_stat_count=playlist_dict.get('lastfm_stat_count', 0), lastfm_stat_album_count=playlist_dict.get('lastfm_stat_album_count', 0), lastfm_stat_artist_count=playlist_dict.get('lastfm_stat_artist_count', 0), lastfm_stat_percent=playlist_dict.get('lastfm_stat_percent', 0), lastfm_stat_album_percent=playlist_dict.get('lastfm_stat_album_percent', 0), lastfm_stat_artist_percent=playlist_dict.get('lastfm_stat_artist_percent', 0), lastfm_stat_last_refresh=playlist_dict.get('lastfm_stat_last_refresh'), add_last_month=playlist_dict.get('add_last_month'), add_this_month=playlist_dict.get('add_this_month'), day_boundary=playlist_dict.get('day_boundary')) elif playlist_dict.get('type') == 'fmchart': return LastFMChartPlaylist(uri=playlist_dict.get('uri'), name=playlist_dict.get('name'), username=username, db_ref=playlist_ref, include_recommendations=playlist_dict.get('include_recommendations', False), recommendation_sample=playlist_dict.get('recommendation_sample', 0), include_library_tracks=playlist_dict.get('include_library_tracks', False), parts=playlist_dict.get('parts'), playlist_references=playlist_dict.get('playlist_references'), shuffle=playlist_dict.get('shuffle'), sort=Sort[playlist_dict.get('sort', 'release_date')], description_overwrite=playlist_dict.get('description_overwrite'), description_suffix=playlist_dict.get('description_suffix'), lastfm_stat_count=playlist_dict.get('lastfm_stat_count', 0), lastfm_stat_album_count=playlist_dict.get('lastfm_stat_album_count', 0), lastfm_stat_artist_count=playlist_dict.get('lastfm_stat_artist_count', 0), lastfm_stat_percent=playlist_dict.get('lastfm_stat_percent', 0), lastfm_stat_album_percent=playlist_dict.get('lastfm_stat_album_percent', 0), lastfm_stat_artist_percent=playlist_dict.get('lastfm_stat_artist_percent', 0), lastfm_stat_last_refresh=playlist_dict.get('lastfm_stat_last_refresh'), chart_limit=playlist_dict.get('chart_limit'), chart_range=FmNetwork.Range[playlist_dict.get('chart_range')]) def update_playlist(username: str, name: str, updates: dict) -> None: if len(updates) > 0: logger.debug(f'updating {name} for {username}') user = get_user(username) playlists = [i for i in user.db_ref.collection(u'playlists').where(u'name', u'==', name).stream()] if len(playlists) == 0: logger.error(f'playlist {name} for {username} not found') return None if len(playlists) > 1: logger.critical(f"multiple {name}'s for {username} found") return None playlist = playlists[0].reference playlist.update(updates) else: logger.debug(f'nothing to update for {name} for {username}') def delete_playlist(username: str, name: str) -> None: logger.info(f'deleting {name} for {username}') playlist = get_playlist(username=username, name=name) if playlist: playlist.db_ref.delete() else: logger.error(f'playlist {name} not found for {username}') def get_user_stats(username): logger.info(f'retrieving stats for {username}') user = get_user(username) if user: stats_refs = [i for i in user.db_ref.collection(u'stats').stream()] return [parse_user_stats_reference(username=username, stats_snapshot=i) for i in stats_refs] else: logger.error(f'user {username} not found') def get_stat(username: str = None, uri: Uri = None, name: str = None) -> Optional[Stats]: logger.info(f'retrieving {uri}/{name} stats for {username}') user = get_user(username) if user: if uri is not None: stats = [i for i in user.db_ref.collection(u'stats').where(u'uri', u'==', str(uri)).stream()] else: stats = [i for i in user.db_ref.collection(u'stats').where(u'name', u'==', name).stream()] if len(stats) == 0: logger.error(f'stat {uri} for {user} not found') return None if len(stats) > 1: logger.critical(f"multiple {uri} stats for {user} found") return None return parse_user_stats_reference(username=username, stats_snapshot=stats[0]) else: logger.error(f'user {username} not found') def parse_user_stats_reference(username, stats_ref=None, stats_snapshot=None) -> Stats: if stats_ref is None and stats_snapshot is None: raise ValueError('no user object supplied') if stats_ref is None: stats_ref = stats_snapshot.reference if stats_snapshot is None: stats_snapshot = stats_ref.get() stats_dict = stats_snapshot.to_dict() return Stats(name=stats_dict.get('name'), username=username, uri=stats_dict.get('uri'), artists=stats_dict.get('artists'), albums=stats_dict.get('albums'), tracks=stats_dict.get('tracks'), user_total=stats_dict.get('user_total'), db_ref=stats_ref) def update_stats(username: str, uri: Uri, updates: dict) -> None: if len(updates) > 0: logger.debug(f'updating {uri} stat for {username}') user = get_user(username) stats = [i for i in user.db_ref.collection(u'stats').where(u'uri', u'==', str(uri)).stream()] if len(stats) == 0: logger.error(f'stat {uri} for {username} not found') return None if len(stats) > 1: logger.critical(f"multiple {uri} stats for {username} found") return None stats[0].reference.update(updates) else: logger.debug(f'nothing to update for {uri} for {username}') def create_stat(username: str, uri: Uri): logger.info(f'creating {uri} stat for {username}') user = get_user(username=username) net = get_authed_spotify_network(username) playlist = net.get_playlist(uri=uri) if playlist is not None: if user is not None: db_ref = user.db_ref.collection(u'stats').document() db_ref.set({ 'uri': str(uri), 'name': playlist.name, 'artists': {}, 'albums': {}, 'tracks': {}, 'user_total': 0 }) return parse_user_stats_reference(stats_ref=db_ref) else: logger.error(f'no {username} user returned')