457 lines
17 KiB
Python
457 lines
17 KiB
Python
from google.cloud import firestore
|
|
import logging
|
|
from datetime import timedelta, datetime, timezone
|
|
from typing import List, Optional
|
|
from werkzeug.security import generate_password_hash
|
|
|
|
from spotframework.net.network import Network as SpotifyNetwork
|
|
from fmframework.net.network import Network as FmNetwork
|
|
from music.db.user import DatabaseUser
|
|
from music.model.user import User
|
|
from music.model.playlist import Playlist, RecentsPlaylist, LastFMChartPlaylist, Sort
|
|
from music.model.tag import Tag
|
|
|
|
db = firestore.Client()
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def refresh_token_database_callback(user):
|
|
if isinstance(user, DatabaseUser):
|
|
user_obj = get_user(user.user_id)
|
|
|
|
user_obj.update_database({
|
|
'access_token': user.access_token,
|
|
'refresh_token': user.refresh_token,
|
|
'last_refreshed': user.last_refreshed,
|
|
'token_expiry': user.token_expiry
|
|
})
|
|
logger.debug(f'{user.user_id} database entry updated')
|
|
else:
|
|
logger.error('user has no attached id')
|
|
|
|
|
|
def get_authed_spotify_network(username):
|
|
|
|
user = get_user(username)
|
|
if user is not None:
|
|
if user.spotify_linked:
|
|
spotify_keys = db.document('key/spotify').get().to_dict()
|
|
|
|
user_obj = DatabaseUser(client_id=spotify_keys['clientid'],
|
|
client_secret=spotify_keys['clientsecret'],
|
|
refresh_token=user.refresh_token,
|
|
user_id=username,
|
|
access_token=user.access_token)
|
|
user_obj.on_refresh.append(refresh_token_database_callback)
|
|
|
|
if user.last_refreshed + timedelta(seconds=user.token_expiry - 1) \
|
|
< datetime.now(timezone.utc):
|
|
user_obj.refresh_access_token()
|
|
|
|
user_obj.refresh_info()
|
|
return SpotifyNetwork(user_obj)
|
|
else:
|
|
logger.error('user spotify not linked')
|
|
else:
|
|
logger.error(f'user {username} not found')
|
|
|
|
|
|
def get_authed_lastfm_network(username):
|
|
|
|
user = get_user(username)
|
|
if user:
|
|
if user.lastfm_username:
|
|
fm_keys = db.document('key/fm').get().to_dict()
|
|
return FmNetwork(username=user.lastfm_username, api_key=fm_keys['clientid'])
|
|
else:
|
|
logger.error(f'{username} has no last.fm username')
|
|
else:
|
|
logger.error(f'user {username} not found')
|
|
|
|
|
|
def get_users() -> List[User]:
|
|
logger.info('retrieving users')
|
|
return [parse_user_reference(user_snapshot=i) for i in db.collection(u'spotify_users').stream()]
|
|
|
|
|
|
def get_user(username: str) -> Optional[User]:
|
|
logger.info(f'retrieving {username}')
|
|
|
|
users = [i for i in db.collection(u'spotify_users').where(u'username', u'==', username).stream()]
|
|
|
|
if len(users) == 0:
|
|
logger.error(f'user {username} not found')
|
|
return None
|
|
if len(users) > 1:
|
|
logger.critical(f"multiple {username}'s found")
|
|
return None
|
|
|
|
return parse_user_reference(user_snapshot=users[0])
|
|
|
|
|
|
def parse_user_reference(user_ref=None, user_snapshot=None) -> User:
|
|
if user_ref is None and user_snapshot is None:
|
|
raise ValueError('no user object supplied')
|
|
|
|
if user_ref is None:
|
|
user_ref = user_snapshot.reference
|
|
|
|
if user_snapshot is None:
|
|
user_snapshot = user_ref.get()
|
|
|
|
user_dict = user_snapshot.to_dict()
|
|
|
|
return User(username=user_dict.get('username'),
|
|
password=user_dict.get('password'),
|
|
db_ref=user_ref,
|
|
email=user_dict.get('email'),
|
|
user_type=User.Type[user_dict.get('type')],
|
|
last_login=user_dict.get('last_login'),
|
|
last_refreshed=user_dict.get('last_refreshed'),
|
|
locked=user_dict.get('locked'),
|
|
validated=user_dict.get('validated'),
|
|
|
|
spotify_linked=user_dict.get('spotify_linked'),
|
|
access_token=user_dict.get('access_token'),
|
|
refresh_token=user_dict.get('refresh_token'),
|
|
token_expiry=user_dict.get('token_expiry'),
|
|
lastfm_username=user_dict.get('lastfm_username'))
|
|
|
|
|
|
def update_user(username: str, updates: dict) -> None:
|
|
logger.debug(f'updating {username}')
|
|
|
|
users = [i for i in db.collection(u'spotify_users').where(u'username', u'==', username).stream()]
|
|
|
|
if len(users) == 0:
|
|
logger.error(f'user {username} not found')
|
|
return None
|
|
if len(users) > 1:
|
|
logger.critical(f"multiple {username}'s found")
|
|
return None
|
|
|
|
user = users[0].reference
|
|
user.update(updates)
|
|
|
|
|
|
def create_user(username: str, password: str):
|
|
db.collection(u'spotify_users').add({
|
|
'access_token': None,
|
|
'email': None,
|
|
'last_login': datetime.utcnow(),
|
|
'last_refreshed': None,
|
|
'locked': False,
|
|
'password': generate_password_hash(password),
|
|
'refresh_token': None,
|
|
'spotify_linked': False,
|
|
'type': 'user',
|
|
'username': username,
|
|
'validated': True
|
|
})
|
|
|
|
|
|
def get_user_playlists(username: str) -> List[Playlist]:
|
|
logger.info(f'getting playlists for {username}')
|
|
|
|
user = get_user(username)
|
|
|
|
if user:
|
|
playlist_refs = [i for i in user.db_ref.collection(u'playlists').stream()]
|
|
|
|
return [parse_playlist_reference(username=username, playlist_snapshot=i) for i in playlist_refs]
|
|
else:
|
|
logger.error(f'user {username} not found')
|
|
|
|
|
|
def get_playlist(username: str = None, name: str = None) -> Optional[Playlist]:
|
|
logger.info(f'retrieving {name} for {username}')
|
|
|
|
user = get_user(username)
|
|
|
|
if user:
|
|
|
|
playlists = [i for i in user.db_ref.collection(u'playlists').where(u'name', u'==', name).stream()]
|
|
|
|
if len(playlists) == 0:
|
|
logger.error(f'playlist {name} for {user} not found')
|
|
return None
|
|
if len(playlists) > 1:
|
|
logger.critical(f"multiple {name}'s for {user} found")
|
|
return None
|
|
|
|
return parse_playlist_reference(username=username, playlist_snapshot=playlists[0])
|
|
else:
|
|
logger.error(f'user {username} not found')
|
|
|
|
|
|
def parse_playlist_reference(username, playlist_ref=None, playlist_snapshot=None) -> Playlist:
|
|
if playlist_ref is None and playlist_snapshot is None:
|
|
raise ValueError('no playlist object supplied')
|
|
|
|
if playlist_ref is None:
|
|
playlist_ref = playlist_snapshot.reference
|
|
|
|
if playlist_snapshot is None:
|
|
playlist_snapshot = playlist_ref.get()
|
|
|
|
playlist_dict = playlist_snapshot.to_dict()
|
|
|
|
if playlist_dict.get('type') == 'default':
|
|
return Playlist(uri=playlist_dict.get('uri'),
|
|
name=playlist_dict.get('name'),
|
|
username=username,
|
|
|
|
db_ref=playlist_ref,
|
|
|
|
include_recommendations=playlist_dict.get('include_recommendations', False),
|
|
recommendation_sample=playlist_dict.get('recommendation_sample', 0),
|
|
include_library_tracks=playlist_dict.get('include_library_tracks', False),
|
|
|
|
parts=playlist_dict.get('parts'),
|
|
playlist_references=playlist_dict.get('playlist_references'),
|
|
shuffle=playlist_dict.get('shuffle'),
|
|
|
|
sort=Sort[playlist_dict.get('sort', 'release_date')],
|
|
|
|
description_overwrite=playlist_dict.get('description_overwrite'),
|
|
description_suffix=playlist_dict.get('description_suffix'),
|
|
|
|
last_updated=playlist_dict.get('last_updated'),
|
|
|
|
lastfm_stat_count=playlist_dict.get('lastfm_stat_count', 0),
|
|
lastfm_stat_album_count=playlist_dict.get('lastfm_stat_album_count', 0),
|
|
lastfm_stat_artist_count=playlist_dict.get('lastfm_stat_artist_count', 0),
|
|
|
|
lastfm_stat_percent=playlist_dict.get('lastfm_stat_percent', 0),
|
|
lastfm_stat_album_percent=playlist_dict.get('lastfm_stat_album_percent', 0),
|
|
lastfm_stat_artist_percent=playlist_dict.get('lastfm_stat_artist_percent', 0),
|
|
|
|
lastfm_stat_last_refresh=playlist_dict.get('lastfm_stat_last_refresh'))
|
|
|
|
elif playlist_dict.get('type') == 'recents':
|
|
return RecentsPlaylist(uri=playlist_dict.get('uri'),
|
|
name=playlist_dict.get('name'),
|
|
username=username,
|
|
|
|
db_ref=playlist_ref,
|
|
|
|
include_recommendations=playlist_dict.get('include_recommendations', False),
|
|
recommendation_sample=playlist_dict.get('recommendation_sample', 0),
|
|
include_library_tracks=playlist_dict.get('include_library_tracks', False),
|
|
|
|
parts=playlist_dict.get('parts'),
|
|
playlist_references=playlist_dict.get('playlist_references'),
|
|
shuffle=playlist_dict.get('shuffle'),
|
|
|
|
sort=Sort[playlist_dict.get('sort', 'release_date')],
|
|
|
|
description_overwrite=playlist_dict.get('description_overwrite'),
|
|
description_suffix=playlist_dict.get('description_suffix'),
|
|
|
|
last_updated=playlist_dict.get('last_updated'),
|
|
|
|
lastfm_stat_count=playlist_dict.get('lastfm_stat_count', 0),
|
|
lastfm_stat_album_count=playlist_dict.get('lastfm_stat_album_count', 0),
|
|
lastfm_stat_artist_count=playlist_dict.get('lastfm_stat_artist_count', 0),
|
|
|
|
lastfm_stat_percent=playlist_dict.get('lastfm_stat_percent', 0),
|
|
lastfm_stat_album_percent=playlist_dict.get('lastfm_stat_album_percent', 0),
|
|
lastfm_stat_artist_percent=playlist_dict.get('lastfm_stat_artist_percent', 0),
|
|
|
|
lastfm_stat_last_refresh=playlist_dict.get('lastfm_stat_last_refresh'),
|
|
|
|
add_last_month=playlist_dict.get('add_last_month'),
|
|
add_this_month=playlist_dict.get('add_this_month'),
|
|
day_boundary=playlist_dict.get('day_boundary'))
|
|
|
|
elif playlist_dict.get('type') == 'fmchart':
|
|
return LastFMChartPlaylist(uri=playlist_dict.get('uri'),
|
|
name=playlist_dict.get('name'),
|
|
username=username,
|
|
|
|
db_ref=playlist_ref,
|
|
|
|
include_recommendations=playlist_dict.get('include_recommendations', False),
|
|
recommendation_sample=playlist_dict.get('recommendation_sample', 0),
|
|
include_library_tracks=playlist_dict.get('include_library_tracks', False),
|
|
|
|
parts=playlist_dict.get('parts'),
|
|
playlist_references=playlist_dict.get('playlist_references'),
|
|
shuffle=playlist_dict.get('shuffle'),
|
|
|
|
sort=Sort[playlist_dict.get('sort', 'release_date')],
|
|
|
|
description_overwrite=playlist_dict.get('description_overwrite'),
|
|
description_suffix=playlist_dict.get('description_suffix'),
|
|
|
|
last_updated=playlist_dict.get('last_updated'),
|
|
|
|
lastfm_stat_count=playlist_dict.get('lastfm_stat_count', 0),
|
|
lastfm_stat_album_count=playlist_dict.get('lastfm_stat_album_count', 0),
|
|
lastfm_stat_artist_count=playlist_dict.get('lastfm_stat_artist_count', 0),
|
|
|
|
lastfm_stat_percent=playlist_dict.get('lastfm_stat_percent', 0),
|
|
lastfm_stat_album_percent=playlist_dict.get('lastfm_stat_album_percent', 0),
|
|
lastfm_stat_artist_percent=playlist_dict.get('lastfm_stat_artist_percent', 0),
|
|
|
|
lastfm_stat_last_refresh=playlist_dict.get('lastfm_stat_last_refresh'),
|
|
|
|
chart_limit=playlist_dict.get('chart_limit'),
|
|
chart_range=FmNetwork.Range[playlist_dict.get('chart_range')])
|
|
|
|
|
|
def update_playlist(username: str, name: str, updates: dict) -> None:
|
|
if len(updates) > 0:
|
|
logger.debug(f'updating {name} for {username}')
|
|
|
|
user = get_user(username)
|
|
|
|
playlists = [i for i in user.db_ref.collection(u'playlists').where(u'name', u'==', name).stream()]
|
|
|
|
if len(playlists) == 0:
|
|
logger.error(f'playlist {name} for {username} not found')
|
|
return None
|
|
if len(playlists) > 1:
|
|
logger.critical(f"multiple {name}'s for {username} found")
|
|
return None
|
|
|
|
playlist = playlists[0].reference
|
|
playlist.update(updates)
|
|
else:
|
|
logger.debug(f'nothing to update for {name} for {username}')
|
|
|
|
|
|
def delete_playlist(username: str, name: str) -> None:
|
|
logger.info(f'deleting {name} for {username}')
|
|
|
|
playlist = get_playlist(username=username, name=name)
|
|
|
|
if playlist:
|
|
playlist.db_ref.delete()
|
|
else:
|
|
logger.error(f'playlist {name} not found for {username}')
|
|
|
|
|
|
def get_user_tags(username: str) -> List[Tag]:
|
|
logger.info(f'getting tags for {username}')
|
|
|
|
user = get_user(username)
|
|
|
|
if user:
|
|
tag_refs = [i for i in user.db_ref.collection(u'tags').stream()]
|
|
|
|
return [parse_tag_reference(username=username, tag_snapshot=i) for i in tag_refs]
|
|
else:
|
|
logger.error(f'user {username} not found')
|
|
|
|
|
|
def get_tag(username: str = None, tag_id: str = None) -> Optional[Tag]:
|
|
logger.info(f'retrieving {tag_id} for {username}')
|
|
|
|
user = get_user(username)
|
|
|
|
if user:
|
|
|
|
tags = [i for i in user.db_ref.collection(u'tags').where(u'tag_id', u'==', tag_id).stream()]
|
|
|
|
if len(tags) == 0:
|
|
logger.error(f'tag {tag_id} for {user} not found')
|
|
return None
|
|
if len(tags) > 1:
|
|
logger.critical(f"multiple {tag_id}'s for {user} found")
|
|
return None
|
|
|
|
return parse_tag_reference(username=username, tag_snapshot=tags[0])
|
|
else:
|
|
logger.error(f'user {username} not found')
|
|
|
|
|
|
def parse_tag_reference(username, tag_ref=None, tag_snapshot=None) -> Tag:
|
|
if tag_ref is None and tag_snapshot is None:
|
|
raise ValueError('no tag object supplied')
|
|
|
|
if tag_ref is None:
|
|
tag_ref = tag_snapshot.reference
|
|
|
|
if tag_snapshot is None:
|
|
tag_snapshot = tag_ref.get()
|
|
|
|
tag_dict = tag_snapshot.to_dict()
|
|
|
|
return Tag(tag_id=tag_dict['tag_id'],
|
|
name=tag_dict.get('name', 'n/a'),
|
|
username=username,
|
|
|
|
db_ref=tag_ref,
|
|
|
|
tracks=tag_dict.get('tracks', []),
|
|
albums=tag_dict.get('albums', []),
|
|
artists=tag_dict.get('artists', []),
|
|
|
|
count=tag_dict.get('count', 0),
|
|
proportion=tag_dict.get('proportion', 0.0),
|
|
total_user_scrobbles=tag_dict.get('total_user_scrobbles', 0),
|
|
|
|
last_updated=tag_dict.get('last_updated'))
|
|
|
|
|
|
def update_tag(username: str, tag_id: str, updates: dict) -> None:
|
|
if len(updates) > 0:
|
|
logger.debug(f'updating {tag_id} for {username}')
|
|
|
|
user = get_user(username)
|
|
|
|
tags = [i for i in user.db_ref.collection(u'tags').where(u'tag_id', u'==', tag_id).stream()]
|
|
|
|
if len(tags) == 0:
|
|
logger.error(f'tag {tag_id} for {username} not found')
|
|
return None
|
|
if len(tags) > 1:
|
|
logger.critical(f"multiple {tag_id}'s for {username} found")
|
|
return None
|
|
|
|
tag = tags[0].reference
|
|
tag.update(updates)
|
|
else:
|
|
logger.debug(f'nothing to update for {tag_id} for {username}')
|
|
|
|
|
|
def delete_tag(username: str, tag_id: str) -> bool:
|
|
logger.info(f'deleting {tag_id} for {username}')
|
|
|
|
tag = get_tag(username=username, tag_id=tag_id)
|
|
|
|
if tag:
|
|
tag.db_ref.delete()
|
|
return True
|
|
else:
|
|
logger.error(f'playlist {tag_id} not found for {username}')
|
|
return False
|
|
|
|
|
|
def create_tag(username: str, tag_id: str):
|
|
user = get_user(username)
|
|
|
|
if user is None:
|
|
logger.error(f'{username} not found')
|
|
return None
|
|
|
|
if tag_id in [i.tag_id for i in get_user_tags(username)]:
|
|
logger.error(f'{tag_id} already exists for {username}')
|
|
return None
|
|
|
|
user.db_ref.collection(u'tags').add({
|
|
'tag_id': tag_id,
|
|
'name': tag_id,
|
|
|
|
'tracks': [],
|
|
'albums': [],
|
|
'artists': [],
|
|
|
|
'count': 0,
|
|
'proportion': 0.0,
|
|
'total_user_scrobbles': 0,
|
|
'last_updated': None
|
|
})
|