Mixonomer/music/db/database.py

423 lines
16 KiB
Python
Raw Normal View History

from google.cloud import firestore
import logging
from datetime import timedelta, datetime, timezone
from typing import List, Optional
from werkzeug.security import generate_password_hash
from spotframework.net.network import Network as SpotifyNetwork
from fmframework.net.network import Network as FmNetwork
2019-10-19 17:14:11 +01:00
from music.db.user import DatabaseUser
2019-10-23 14:44:17 +01:00
from music.model.user import User
from music.model.playlist import Playlist, RecentsPlaylist, LastFMChartPlaylist, Sort
2020-02-03 09:44:33 +00:00
from music.model.tag import Tag
db = firestore.Client()
logger = logging.getLogger(__name__)
def refresh_token_database_callback(user):
if isinstance(user, DatabaseUser):
2019-10-29 13:36:35 +00:00
user_obj = get_user(user.user_id)
2019-10-29 13:36:35 +00:00
user_obj.update_database({
'access_token': user.access_token,
'refresh_token': user.refresh_token,
'last_refreshed': user.last_refreshed,
'token_expiry': user.token_expiry
})
logger.debug(f'{user.user_id} database entry updated')
else:
logger.error('user has no attached id')
def get_authed_spotify_network(username):
user = get_user(username)
if user is not None:
if user.spotify_linked:
spotify_keys = db.document('key/spotify').get().to_dict()
user_obj = DatabaseUser(client_id=spotify_keys['clientid'],
client_secret=spotify_keys['clientsecret'],
refresh_token=user.refresh_token,
user_id=username,
access_token=user.access_token)
user_obj.on_refresh.append(refresh_token_database_callback)
if user.last_refreshed + timedelta(seconds=user.token_expiry - 1) \
< datetime.now(timezone.utc):
user_obj.refresh_access_token()
user_obj.refresh_info()
return SpotifyNetwork(user_obj)
else:
logger.error('user spotify not linked')
else:
logger.error(f'user {username} not found')
def get_authed_lastfm_network(username):
user = get_user(username)
if user:
if user.lastfm_username:
fm_keys = db.document('key/fm').get().to_dict()
return FmNetwork(username=user.lastfm_username, api_key=fm_keys['clientid'])
else:
logger.error(f'{username} has no last.fm username')
else:
logger.error(f'user {username} not found')
2019-10-23 14:44:17 +01:00
def get_users() -> List[User]:
logger.info('retrieving users')
return [parse_user_reference(user_snapshot=i) for i in db.collection(u'spotify_users').stream()]
def get_user(username: str) -> Optional[User]:
logger.info(f'retrieving {username}')
users = [i for i in db.collection(u'spotify_users').where(u'username', u'==', username).stream()]
if len(users) == 0:
logger.error(f'user {username} not found')
return None
if len(users) > 1:
logger.critical(f"multiple {username}'s found")
return None
return parse_user_reference(user_snapshot=users[0])
def parse_user_reference(user_ref=None, user_snapshot=None) -> User:
if user_ref is None and user_snapshot is None:
raise ValueError('no user object supplied')
if user_ref is None:
user_ref = user_snapshot.reference
if user_snapshot is None:
user_snapshot = user_ref.get()
user_dict = user_snapshot.to_dict()
return User(username=user_dict.get('username'),
password=user_dict.get('password'),
db_ref=user_ref,
email=user_dict.get('email'),
user_type=User.Type[user_dict.get('type')],
last_login=user_dict.get('last_login'),
last_refreshed=user_dict.get('last_refreshed'),
locked=user_dict.get('locked'),
validated=user_dict.get('validated'),
spotify_linked=user_dict.get('spotify_linked'),
access_token=user_dict.get('access_token'),
refresh_token=user_dict.get('refresh_token'),
token_expiry=user_dict.get('token_expiry'),
lastfm_username=user_dict.get('lastfm_username'))
def update_user(username: str, updates: dict) -> None:
logger.debug(f'updating {username}')
users = [i for i in db.collection(u'spotify_users').where(u'username', u'==', username).stream()]
if len(users) == 0:
logger.error(f'user {username} not found')
return None
if len(users) > 1:
logger.critical(f"multiple {username}'s found")
return None
user = users[0].reference
user.update(updates)
def create_user(username: str, password: str):
db.collection(u'spotify_users').add({
'access_token': None,
'email': None,
'last_login': datetime.utcnow(),
'last_refreshed': None,
'locked': False,
'password': generate_password_hash(password),
'refresh_token': None,
'spotify_linked': False,
'type': 'user',
'username': username,
'validated': True
})
2019-10-23 14:44:17 +01:00
def get_user_playlists(username: str) -> List[Playlist]:
logger.info(f'getting playlists for {username}')
user = get_user(username)
if user:
playlist_refs = [i for i in user.db_ref.collection(u'playlists').stream()]
return [parse_playlist_reference(username=username, playlist_snapshot=i) for i in playlist_refs]
else:
logger.error(f'user {username} not found')
def get_playlist(username: str = None, name: str = None) -> Optional[Playlist]:
logger.info(f'retrieving {name} for {username}')
user = get_user(username)
if user:
playlists = [i for i in user.db_ref.collection(u'playlists').where(u'name', u'==', name).stream()]
if len(playlists) == 0:
logger.error(f'playlist {name} for {user} not found')
return None
if len(playlists) > 1:
logger.critical(f"multiple {name}'s for {user} found")
return None
return parse_playlist_reference(username=username, playlist_snapshot=playlists[0])
else:
logger.error(f'user {username} not found')
def parse_playlist_reference(username, playlist_ref=None, playlist_snapshot=None) -> Playlist:
if playlist_ref is None and playlist_snapshot is None:
raise ValueError('no playlist object supplied')
if playlist_ref is None:
playlist_ref = playlist_snapshot.reference
if playlist_snapshot is None:
playlist_snapshot = playlist_ref.get()
playlist_dict = playlist_snapshot.to_dict()
if playlist_dict.get('type') == 'default':
return Playlist(uri=playlist_dict.get('uri'),
name=playlist_dict.get('name'),
username=username,
db_ref=playlist_ref,
include_recommendations=playlist_dict.get('include_recommendations', False),
recommendation_sample=playlist_dict.get('recommendation_sample', 0),
include_library_tracks=playlist_dict.get('include_library_tracks', False),
parts=playlist_dict.get('parts'),
playlist_references=playlist_dict.get('playlist_references'),
shuffle=playlist_dict.get('shuffle'),
sort=Sort[playlist_dict.get('sort', 'release_date')],
description_overwrite=playlist_dict.get('description_overwrite'),
description_suffix=playlist_dict.get('description_suffix'),
lastfm_stat_count=playlist_dict.get('lastfm_stat_count', 0),
lastfm_stat_album_count=playlist_dict.get('lastfm_stat_album_count', 0),
lastfm_stat_artist_count=playlist_dict.get('lastfm_stat_artist_count', 0),
lastfm_stat_percent=playlist_dict.get('lastfm_stat_percent', 0),
lastfm_stat_album_percent=playlist_dict.get('lastfm_stat_album_percent', 0),
lastfm_stat_artist_percent=playlist_dict.get('lastfm_stat_artist_percent', 0),
lastfm_stat_last_refresh=playlist_dict.get('lastfm_stat_last_refresh'))
elif playlist_dict.get('type') == 'recents':
return RecentsPlaylist(uri=playlist_dict.get('uri'),
name=playlist_dict.get('name'),
username=username,
db_ref=playlist_ref,
include_recommendations=playlist_dict.get('include_recommendations', False),
recommendation_sample=playlist_dict.get('recommendation_sample', 0),
include_library_tracks=playlist_dict.get('include_library_tracks', False),
parts=playlist_dict.get('parts'),
playlist_references=playlist_dict.get('playlist_references'),
shuffle=playlist_dict.get('shuffle'),
sort=Sort[playlist_dict.get('sort', 'release_date')],
description_overwrite=playlist_dict.get('description_overwrite'),
description_suffix=playlist_dict.get('description_suffix'),
lastfm_stat_count=playlist_dict.get('lastfm_stat_count', 0),
lastfm_stat_album_count=playlist_dict.get('lastfm_stat_album_count', 0),
lastfm_stat_artist_count=playlist_dict.get('lastfm_stat_artist_count', 0),
lastfm_stat_percent=playlist_dict.get('lastfm_stat_percent', 0),
lastfm_stat_album_percent=playlist_dict.get('lastfm_stat_album_percent', 0),
lastfm_stat_artist_percent=playlist_dict.get('lastfm_stat_artist_percent', 0),
lastfm_stat_last_refresh=playlist_dict.get('lastfm_stat_last_refresh'),
add_last_month=playlist_dict.get('add_last_month'),
add_this_month=playlist_dict.get('add_this_month'),
day_boundary=playlist_dict.get('day_boundary'))
elif playlist_dict.get('type') == 'fmchart':
return LastFMChartPlaylist(uri=playlist_dict.get('uri'),
name=playlist_dict.get('name'),
username=username,
db_ref=playlist_ref,
include_recommendations=playlist_dict.get('include_recommendations', False),
recommendation_sample=playlist_dict.get('recommendation_sample', 0),
include_library_tracks=playlist_dict.get('include_library_tracks', False),
parts=playlist_dict.get('parts'),
playlist_references=playlist_dict.get('playlist_references'),
shuffle=playlist_dict.get('shuffle'),
sort=Sort[playlist_dict.get('sort', 'release_date')],
description_overwrite=playlist_dict.get('description_overwrite'),
description_suffix=playlist_dict.get('description_suffix'),
lastfm_stat_count=playlist_dict.get('lastfm_stat_count', 0),
lastfm_stat_album_count=playlist_dict.get('lastfm_stat_album_count', 0),
lastfm_stat_artist_count=playlist_dict.get('lastfm_stat_artist_count', 0),
lastfm_stat_percent=playlist_dict.get('lastfm_stat_percent', 0),
lastfm_stat_album_percent=playlist_dict.get('lastfm_stat_album_percent', 0),
lastfm_stat_artist_percent=playlist_dict.get('lastfm_stat_artist_percent', 0),
lastfm_stat_last_refresh=playlist_dict.get('lastfm_stat_last_refresh'),
chart_limit=playlist_dict.get('chart_limit'),
chart_range=FmNetwork.Range[playlist_dict.get('chart_range')])
2019-10-23 14:44:17 +01:00
def update_playlist(username: str, name: str, updates: dict) -> None:
if len(updates) > 0:
logger.debug(f'updating {name} for {username}')
user = get_user(username)
playlists = [i for i in user.db_ref.collection(u'playlists').where(u'name', u'==', name).stream()]
if len(playlists) == 0:
logger.error(f'playlist {name} for {username} not found')
return None
if len(playlists) > 1:
logger.critical(f"multiple {name}'s for {username} found")
return None
playlist = playlists[0].reference
playlist.update(updates)
else:
logger.debug(f'nothing to update for {name} for {username}')
def delete_playlist(username: str, name: str) -> None:
logger.info(f'deleting {name} for {username}')
playlist = get_playlist(username=username, name=name)
if playlist:
playlist.db_ref.delete()
else:
logger.error(f'playlist {name} not found for {username}')
2020-02-03 09:44:33 +00:00
def get_user_tags(username: str) -> List[Tag]:
logger.info(f'getting tags for {username}')
user = get_user(username)
if user:
tag_refs = [i for i in user.db_ref.collection(u'tags').stream()]
return [parse_tag_reference(username=username, tag_snapshot=i) for i in tag_refs]
else:
logger.error(f'user {username} not found')
def get_tag(username: str = None, tag_id: str = None) -> Optional[Tag]:
logger.info(f'retrieving {tag_id} for {username}')
user = get_user(username)
if user:
tags = [i for i in user.db_ref.collection(u'tags').where(u'tag_id', u'==', tag_id).stream()]
if len(tags) == 0:
logger.error(f'tag {tag_id} for {user} not found')
return None
if len(tags) > 1:
logger.critical(f"multiple {tag_id}'s for {user} found")
return None
return parse_tag_reference(username=username, tag_snapshot=tags[0])
else:
logger.error(f'user {username} not found')
def parse_tag_reference(username, tag_ref=None, tag_snapshot=None) -> Tag:
if tag_ref is None and tag_snapshot is None:
raise ValueError('no tag object supplied')
if tag_ref is None:
tag_ref = tag_snapshot.reference
if tag_snapshot is None:
tag_snapshot = tag_ref.get()
tag_dict = tag_snapshot.to_dict()
return Tag(tag_id=tag_dict['tag_id'],
name=tag_dict.get('name', 'n/a'),
username=username,
db_ref=tag_ref,
tracks=tag_dict.get('tracks', []),
albums=tag_dict.get('albums', []),
artists=tag_dict.get('artists', []),
count=tag_dict.get('count', 0),
proportion=tag_dict.get('proportion', 0.0),
total_user_scrobbles=tag_dict.get('total_user_scrobbles', 0),
last_updated=tag_dict.get('last_updated'))
def update_tag(username: str, tag_id: str, updates: dict) -> None:
if len(updates) > 0:
logger.debug(f'updating {tag_id} for {username}')
user = get_user(username)
tags = [i for i in user.db_ref.collection(u'tags').where(u'tag_id', u'==', tag_id).stream()]
if len(tags) == 0:
logger.error(f'tag {tag_id} for {username} not found')
return None
if len(tags) > 1:
logger.critical(f"multiple {tag_id}'s for {username} found")
return None
tag = tags[0].reference
tag.update(updates)
else:
logger.debug(f'nothing to update for {tag_id} for {username}')
def delete_tag(username: str, tag_id: str) -> None:
logger.info(f'deleting {tag_id} for {username}')
tag = get_tag(username=username, tag_id=tag_id)
if tag:
tag.db_ref.delete()
else:
logger.error(f'playlist {tag_id} not found for {username}')