added orm

This commit is contained in:
aj 2019-10-23 14:44:17 +01:00
parent c3d1f9e9b0
commit 70b57a05c7
12 changed files with 872 additions and 214 deletions

View File

@ -8,7 +8,6 @@ import logging
from google.cloud import firestore from google.cloud import firestore
from google.cloud import tasks_v2 from google.cloud import tasks_v2
from google.protobuf import timestamp_pb2 from google.protobuf import timestamp_pb2
from werkzeug.security import check_password_hash, generate_password_hash
from music.api.decorators import login_required, login_or_basic_auth, admin_required, gae_cron, cloud_task from music.api.decorators import login_required, login_or_basic_auth, admin_required, gae_cron, cloud_task
from music.tasks.run_user_playlist import run_user_playlist as run_user_playlist from music.tasks.run_user_playlist import run_user_playlist as run_user_playlist
@ -28,28 +27,17 @@ logger = logging.getLogger(__name__)
@blueprint.route('/playlists', methods=['GET']) @blueprint.route('/playlists', methods=['GET'])
@login_or_basic_auth @login_or_basic_auth
def get_playlists(username=None): def get_playlists(username=None):
return jsonify({
user_ref = database.get_user_doc_ref(username) 'playlists': [i.to_dict() for i in database.get_user_playlists(username)]
}), 200
playlists = user_ref.collection(u'playlists')
playlist_docs = [i.to_dict() for i in playlists.stream()]
for j in playlist_docs:
j['playlist_references'] = [i.get().to_dict().get('name', 'n/a')
for i in j['playlist_references']]
response = {
'playlists': playlist_docs
}
return jsonify(response), 200
@blueprint.route('/playlist', methods=['GET', 'POST', 'PUT', 'DELETE']) @blueprint.route('/playlist', methods=['GET', 'POST', 'PUT', 'DELETE'])
@login_or_basic_auth @login_or_basic_auth
def playlist(username=None): def playlist(username=None):
user_playlists = database.get_user_playlists(username)
user_ref = database.get_user_doc_ref(username) user_ref = database.get_user_doc_ref(username)
playlists = user_ref.collection(u'playlists') playlists = user_ref.collection(u'playlists')
@ -58,27 +46,16 @@ def playlist(username=None):
if playlist_name: if playlist_name:
queried_playlist = [i for i in playlists.where(u'name', u'==', playlist_name).stream()] queried_playlist = next((i for i in user_playlists if i.name == playlist_name), None)
if len(queried_playlist) == 0: if queried_playlist is None:
return jsonify({'error': 'no playlist found'}), 404 return jsonify({'error': 'no playlist found'}), 404
elif len(queried_playlist) > 1:
return jsonify({'error': 'multiple playlists found'}), 500
if request.method == "GET": if request.method == "GET":
return jsonify(queried_playlist.to_dict()), 200
playlist_doc = queried_playlist[0].to_dict()
playlist_doc['playlist_references'] = [i.get().to_dict().get('name', 'n/a')
for i in playlist_doc['playlist_references']]
return jsonify(playlist_doc), 200
elif request.method == 'DELETE': elif request.method == 'DELETE':
database.delete_playlist(username=username, name=playlist_name)
logger.info(f'deleted {username} / {queried_playlist[0].to_dict()["name"]}')
queried_playlist[0].reference.delete()
return jsonify({"message": 'playlist deleted', "status": "success"}), 200 return jsonify({"message": 'playlist deleted', "status": "success"}), 200
else: else:
@ -100,9 +77,10 @@ def playlist(username=None):
if request_json.get('playlist_references', None): if request_json.get('playlist_references', None):
if request_json['playlist_references'] != -1: if request_json['playlist_references'] != -1:
for i in request_json['playlist_references']: for i in request_json['playlist_references']:
retrieved_ref = database.get_user_playlist_ref_by_user_ref(user_ref, i)
if retrieved_ref: updating_playlist = database.get_playlist(username=username, name=i)
playlist_references.append(retrieved_ref) if updating_playlist is not None:
playlist_references.append(updating_playlist.db_ref)
else: else:
return jsonify({"message": f'managed playlist {i} not found', "status": "error"}), 400 return jsonify({"message": f'managed playlist {i} not found', "status": "error"}), 400
@ -127,9 +105,6 @@ def playlist(username=None):
if len(queried_playlist) != 0: if len(queried_playlist) != 0:
return jsonify({'error': 'playlist already exists'}), 400 return jsonify({'error': 'playlist already exists'}), 400
# if playlist_id is None or playlist_shuffle is None:
# return jsonify({'error': 'parts and id required'}), 400
from music.tasks.create_playlist import create_playlist as create_playlist from music.tasks.create_playlist import create_playlist as create_playlist
to_add = { to_add = {
@ -165,7 +140,7 @@ def playlist(username=None):
if len(queried_playlist) > 1: if len(queried_playlist) > 1:
return jsonify({'error': "multiple playlists exist"}), 500 return jsonify({'error': "multiple playlists exist"}), 500
playlist_doc = playlists.document(queried_playlist[0].id) updating_playlist = database.get_playlist(username=username, name=playlist_name)
dic = {} dic = {}
@ -209,7 +184,7 @@ def playlist(username=None):
logger.warning(f'no changes to make for {username} / {playlist_name}') logger.warning(f'no changes to make for {username} / {playlist_name}')
return jsonify({"message": 'no changes to make', "status": "error"}), 400 return jsonify({"message": 'no changes to make', "status": "error"}), 400
playlist_doc.update(dic) updating_playlist.update_database(dic)
logger.info(f'updated {username} / {playlist_name}') logger.info(f'updated {username} / {playlist_name}')
return jsonify({"message": 'playlist updated', "status": "success"}), 200 return jsonify({"message": 'playlist updated', "status": "success"}), 200
@ -221,21 +196,14 @@ def user(username=None):
if request.method == 'GET': if request.method == 'GET':
pulled_user = database.get_user_doc_ref(username).get().to_dict() database_user = database.get_user(username)
return jsonify(database_user.to_dict()), 200
response = {
'username': pulled_user['username'],
'type': pulled_user['type'],
'spotify_linked': pulled_user['spotify_linked'],
'validated': pulled_user['validated'],
'lastfm_username': pulled_user['lastfm_username']
}
return jsonify(response), 200
else: else:
if database.get_user_doc_ref(username).get().to_dict()['type'] != 'admin': db_user = database.get_user(username)
if db_user.user_type != db_user.Type.admin:
return jsonify({'status': 'error', 'message': 'unauthorized'}), 401 return jsonify({'status': 'error', 'message': 'unauthorized'}), 401
request_json = request.get_json() request_json = request.get_json()
@ -243,21 +211,16 @@ def user(username=None):
if 'username' in request_json: if 'username' in request_json:
username = request_json['username'] username = request_json['username']
actionable_user = database.get_user_doc_ref(username) actionable_user = database.get_user(username)
if actionable_user.get().exists is False:
return jsonify({"message": 'non-existent user', "status": "error"}), 400
dic = {}
if 'locked' in request_json: if 'locked' in request_json:
logger.info(f'updating lock {request_json["username"]} / {request_json["locked"]}') logger.info(f'updating lock {username} / {request_json["locked"]}')
dic['locked'] = request_json['locked'] actionable_user.locked = request_json['locked']
if 'spotify_linked' in request_json: if 'spotify_linked' in request_json:
logger.info(f'deauthing {request_json["username"]}') logger.info(f'deauthing {username}')
if request_json['spotify_linked'] is False: if request_json['spotify_linked'] is False:
dic.update({ actionable_user.update_database({
'access_token': None, 'access_token': None,
'refresh_token': None, 'refresh_token': None,
'spotify_linked': False 'spotify_linked': False
@ -265,13 +228,8 @@ def user(username=None):
if 'lastfm_username' in request_json: if 'lastfm_username' in request_json:
logger.info(f'updating lastfm username {username} -> {request_json["lastfm_username"]}') logger.info(f'updating lastfm username {username} -> {request_json["lastfm_username"]}')
dic['lastfm_username'] = request_json['lastfm_username'] actionable_user.lastfm_username = request_json['lastfm_username']
if len(dic) == 0:
logger.warning(f'no updates for {request_json["username"]}')
return jsonify({"message": 'no changes to make', "status": "error"}), 400
actionable_user.update(dic)
logger.info(f'updated {username}') logger.info(f'updated {username}')
return jsonify({'message': 'account updated', 'status': 'succeeded'}), 200 return jsonify({'message': 'account updated', 'status': 'succeeded'}), 200
@ -281,24 +239,9 @@ def user(username=None):
@login_or_basic_auth @login_or_basic_auth
@admin_required @admin_required
def users(username=None): def users(username=None):
return jsonify({
dic = { 'accounts': [i.to_dict() for i in database.get_users()]
'accounts': [] }), 200
}
for account in [i.to_dict() for i in db.collection(u'spotify_users').stream()]:
user_dic = {
'username': account['username'],
'type': account['type'],
'spotify_linked': account['spotify_linked'],
'locked': account['locked'],
'last_login': account['last_login']
}
dic['accounts'].append(user_dic)
return jsonify(dic), 200
@blueprint.route('/user/password', methods=['POST']) @blueprint.route('/user/password', methods=['POST'])
@ -315,15 +258,12 @@ def change_password(username=None):
if len(request_json['new_password']) > 30: if len(request_json['new_password']) > 30:
return jsonify({"error": 'password too long'}), 400 return jsonify({"error": 'password too long'}), 400
current_user = database.get_user_doc_ref(username) db_user = database.get_user(username)
if db_user.check_password(request_json['current_password']):
if check_password_hash(current_user.get().to_dict()['password'], request_json['current_password']): db_user.password = request_json['new_password']
current_user.update({'password': generate_password_hash(request_json['new_password'])})
logger.info(f'password udpated {username}') logger.info(f'password udpated {username}')
return jsonify({"message": 'password changed', "status": "success"}), 200 return jsonify({"message": 'password changed', "status": "success"}), 200
else: else:
logger.warning(f"incorrect password {username}") logger.warning(f"incorrect password {username}")
return jsonify({'error': 'wrong password provided'}), 401 return jsonify({'error': 'wrong password provided'}), 401
@ -489,48 +429,44 @@ def execute_all_users():
seconds_delay = 0 seconds_delay = 0
logger.info('running') logger.info('running')
for iter_user in [i.to_dict() for i in db.collection(u'spotify_users').stream()]: for iter_user in database.get_users():
if iter_user['spotify_linked'] and not iter_user['locked']: if iter_user.spotify_linked and not iter_user.locked:
task = { task = {
'app_engine_http_request': { # Specify the type of request. 'app_engine_http_request': { # Specify the type of request.
'http_method': 'POST', 'http_method': 'POST',
'relative_uri': '/api/playlist/run/user/task', 'relative_uri': '/api/playlist/run/user/task',
'body': iter_user['username'].encode() 'body': iter_user.username.encode()
} }
} }
d = datetime.datetime.utcnow() + datetime.timedelta(seconds=seconds_delay) d = datetime.datetime.utcnow() + datetime.timedelta(seconds=seconds_delay)
# Create Timestamp protobuf.
timestamp = timestamp_pb2.Timestamp() timestamp = timestamp_pb2.Timestamp()
timestamp.FromDatetime(d) timestamp.FromDatetime(d)
# Add the timestamp to the tasks.
task['schedule_time'] = timestamp task['schedule_time'] = timestamp
tasker.create_task(task_path, task) tasker.create_task(task_path, task)
seconds_delay += 30 seconds_delay += 30
def execute_user(username): def execute_user(username):
playlists = [i.to_dict() for i in playlists = database.get_user_playlists(username)
database.get_user_playlists_collection(database.get_user_query_stream(username)[0].id).stream()]
seconds_delay = 0 seconds_delay = 0
logger.info(f'running {username}') logger.info(f'running {username}')
for iterate_playlist in playlists: for iterate_playlist in playlists:
if len(iterate_playlist['parts']) > 0 or len(iterate_playlist['playlist_references']) > 0: if len(iterate_playlist.parts) > 0 or len(iterate_playlist.playlist_references) > 0:
if iterate_playlist.get('uri', None): if iterate_playlist.uri:
if os.environ.get('DEPLOY_DESTINATION', None) == 'PROD': if os.environ.get('DEPLOY_DESTINATION', None) == 'PROD':
create_run_user_playlist_task(username, iterate_playlist['name'], seconds_delay) create_run_user_playlist_task(username, iterate_playlist.name, seconds_delay)
else: else:
run_playlist(username, iterate_playlist['name']) run_playlist(username, iterate_playlist.name)
seconds_delay += 6 seconds_delay += 6

View File

@ -18,7 +18,7 @@ def is_logged_in():
def is_basic_authed(): def is_basic_authed():
if request.authorization: if request.authorization:
if request.authorization.get('username', None) and request.authorization.get('password', None): if request.authorization.get('username', None) and request.authorization.get('password', None):
if database.check_user_password(request.authorization.username, request.authorization.password): if database.get_user(request.authorization.username).check_password(request.authorization.password):
return True return True
return False return False
@ -52,13 +52,13 @@ def login_or_basic_auth(func):
def admin_required(func): def admin_required(func):
@functools.wraps(func) @functools.wraps(func)
def admin_required_wrapper(*args, **kwargs): def admin_required_wrapper(*args, **kwargs):
user_dict = database.get_user_doc_ref(kwargs.get('username')).get().to_dict() db_user = database.get_user(kwargs.get('username'))
if user_dict: if db_user is not None:
if user_dict['type'] == 'admin': if db_user.user_type == db_user.Type.admin:
return func(*args, **kwargs) return func(*args, **kwargs)
else: else:
logger.warning(f'{user_dict["username"]} not authorized') logger.warning(f'{db_user.username} not authorized')
return jsonify({'status': 'error', 'message': 'unauthorized'}), 401 return jsonify({'status': 'error', 'message': 'unauthorized'}), 401
else: else:
logger.warning('user not logged in') logger.warning('user not logged in')
@ -70,13 +70,13 @@ def admin_required(func):
def spotify_link_required(func): def spotify_link_required(func):
@functools.wraps(func) @functools.wraps(func)
def spotify_link_required_wrapper(*args, **kwargs): def spotify_link_required_wrapper(*args, **kwargs):
user_dict = database.get_user_doc_ref(kwargs.get('username')).get().to_dict() db_user = database.get_user(kwargs.get('username'))
if user_dict: if db_user is not None:
if user_dict['spotify_linked']: if db_user.spotify_linked:
return func(*args, **kwargs) return func(*args, **kwargs)
else: else:
logger.warning(f'{user_dict["username"]} spotify not linked') logger.warning(f'{db_user.username} spotify not linked')
return jsonify({'status': 'error', 'message': 'spotify not linked'}), 401 return jsonify({'status': 'error', 'message': 'spotify not linked'}), 401
else: else:
logger.warning('user not logged in') logger.warning('user not logged in')
@ -88,13 +88,13 @@ def spotify_link_required(func):
def lastfm_username_required(func): def lastfm_username_required(func):
@functools.wraps(func) @functools.wraps(func)
def lastfm_username_required_wrapper(*args, **kwargs): def lastfm_username_required_wrapper(*args, **kwargs):
user_dict = database.get_user_doc_ref(kwargs.get('username')).get().to_dict() db_user = database.get_user(kwargs.get('username'))
if user_dict: if db_user is not None:
if user_dict.get('lastfm_username') and len(user_dict.get('lastfm_username')) > 0: if db_user.lastfm_username and len(db_user.lastfm_username) > 0:
return func(*args, **kwargs) return func(*args, **kwargs)
else: else:
logger.warning(f'no last.fm username for {user_dict["username"]}') logger.warning(f'no last.fm username for {db_user.username}')
return jsonify({'status': 'error', 'message': 'no last.fm username'}), 401 return jsonify({'status': 'error', 'message': 'no last.fm username'}), 401
else: else:
logger.warning('user not logged in') logger.warning('user not logged in')

View File

@ -184,42 +184,38 @@ def execute_all_users():
seconds_delay = 0 seconds_delay = 0
logger.info('running') logger.info('running')
for iter_user in [i.to_dict() for i in db.collection(u'spotify_users').stream()]: for iter_user in database.get_users():
if iter_user.get('spotify_linked') \ if iter_user.spotify_linked and iter_user.lastfm_username and \
and iter_user.get('lastfm_username') \ len(iter_user.lastfm_username) > 0 and not iter_user.locked:
and len(iter_user.get('lastfm_username')) > 0 \
and not iter_user['locked']:
if os.environ.get('DEPLOY_DESTINATION', None) == 'PROD': if os.environ.get('DEPLOY_DESTINATION', None) == 'PROD':
create_refresh_user_task(username=iter_user.get('username'), delay=seconds_delay) create_refresh_user_task(username=iter_user.username, delay=seconds_delay)
else: else:
execute_user(username=iter_user.get('username')) execute_user(username=iter_user.username)
seconds_delay += 2400 seconds_delay += 2400
else: else:
logger.debug(f'skipping {iter_user.get("username")}') logger.debug(f'skipping {iter_user.username}')
def execute_user(username): def execute_user(username):
playlists = [i.to_dict() for i in playlists = database.get_user_playlists(username)
database.get_user_playlists_collection(database.get_user_query_stream(username)[0].id).stream()] user = database.get_user(username)
seconds_delay = 0 seconds_delay = 0
logger.info(f'running {username}') logger.info(f'running {username}')
user = database.get_user_doc_ref(username).get().to_dict() if user.lastfm_username and len(user.lastfm_username) > 0:
for playlist in playlists:
if user.get('lastfm_username') and len(user.get('lastfm_username')) > 0: if playlist.uri:
for iterate_playlist in playlists:
if iterate_playlist.get('uri', None):
if os.environ.get('DEPLOY_DESTINATION', None) == 'PROD': if os.environ.get('DEPLOY_DESTINATION', None) == 'PROD':
create_refresh_playlist_task(username, iterate_playlist['name'], seconds_delay) create_refresh_playlist_task(username, playlist.name, seconds_delay)
else: else:
refresh_lastfm_track_stats(username, iterate_playlist['name']) refresh_lastfm_track_stats(username, playlist.name)
seconds_delay += 1200 seconds_delay += 1200
else: else:
@ -239,11 +235,9 @@ def create_refresh_user_task(username, delay=0):
if delay > 0: if delay > 0:
d = datetime.datetime.utcnow() + datetime.timedelta(seconds=delay) d = datetime.datetime.utcnow() + datetime.timedelta(seconds=delay)
# Create Timestamp protobuf.
timestamp = timestamp_pb2.Timestamp() timestamp = timestamp_pb2.Timestamp()
timestamp.FromDatetime(d) timestamp.FromDatetime(d)
# Add the timestamp to the tasks.
task['schedule_time'] = timestamp task['schedule_time'] = timestamp
tasker.create_task(task_path, task) tasker.create_task(task_path, task)

View File

@ -7,6 +7,8 @@ from werkzeug.security import check_password_hash
from spotframework.net.network import Network as SpotifyNetwork from spotframework.net.network import Network as SpotifyNetwork
from fmframework.net.network import Network as FmNetwork from fmframework.net.network import Network as FmNetwork
from music.db.user import DatabaseUser from music.db.user import DatabaseUser
from music.model.user import User
from music.model.playlist import Playlist, RecentsPlaylist, Sort
db = firestore.Client() db = firestore.Client()
@ -166,3 +168,211 @@ def get_user_playlist_ref_by_user_ref(user_ref: firestore.DocumentReference,
else: else:
logger.error(f'{username} playlist collection not found, looking up {playlist}') logger.error(f'{username} playlist collection not found, looking up {playlist}')
return None return None
def get_users() -> List[User]:
logger.info('retrieving users')
return [parse_user_reference(user_snapshot=i) for i in db.collection(u'spotify_users').stream()]
def get_user(username: str) -> Optional[User]:
logger.info(f'retrieving {username}')
users = [i for i in db.collection(u'spotify_users').where(u'username', u'==', username).stream()]
if len(users) == 0:
logger.error(f'user {username} not found')
return None
if len(users) > 1:
logger.critical(f"multiple {username}'s found")
return None
return parse_user_reference(user_snapshot=users[0])
def parse_user_reference(user_ref=None, user_snapshot=None) -> User:
if user_ref is None and user_snapshot is None:
raise ValueError('no user object supplied')
if user_ref is None:
user_ref = user_snapshot.reference
if user_snapshot is None:
user_snapshot = user_ref.get()
user_dict = user_snapshot.to_dict()
return User(username=user_dict.get('username'),
password=user_dict.get('password'),
db_ref=user_ref,
email=user_dict.get('email'),
user_type=User.Type[user_dict.get('type')],
last_login=user_dict.get('last_login'),
last_refreshed=user_dict.get('last_refreshed'),
locked=user_dict.get('locked'),
validated=user_dict.get('validated'),
spotify_linked=user_dict.get('spotify_linked'),
access_token=user_dict.get('access_token'),
refresh_token=user_dict.get('refresh_token'),
token_expiry=user_dict.get('token_expiry'),
lastfm_username=user_dict.get('lastfm_username'))
def update_user(username: str, updates: dict) -> None:
logger.debug(f'updating {username}')
users = [i for i in db.collection(u'spotify_users').where(u'username', u'==', username).stream()]
if len(users) == 0:
logger.error(f'user {username} not found')
return None
if len(users) > 1:
logger.critical(f"multiple {username}'s found")
return None
user = users[0].reference
user.update(updates)
def get_user_playlists(username: str) -> List[Playlist]:
logger.info(f'getting playlists for {username}')
user = get_user(username)
if user:
playlist_refs = [i for i in user.db_ref.collection(u'playlists').stream()]
return [parse_playlist_reference(username=username, playlist_snapshot=i) for i in playlist_refs]
else:
logger.error(f'user {username} not found')
def get_playlist(username: str = None, name: str = None) -> Optional[Playlist]:
logger.info(f'retrieving {name} for {username}')
user = get_user(username)
if user:
playlists = [i for i in user.db_ref.collection(u'playlists').where(u'name', u'==', name).stream()]
if len(playlists) == 0:
logger.error(f'playlist {name} for {user} not found')
return None
if len(playlists) > 1:
logger.critical(f"multiple {name}'s for {user} found")
return None
return parse_playlist_reference(username=username, playlist_snapshot=playlists[0])
else:
logger.error(f'user {username} not found')
def parse_playlist_reference(username, playlist_ref=None, playlist_snapshot=None) -> Playlist:
if playlist_ref is None and playlist_snapshot is None:
raise ValueError('no playlist object supplied')
if playlist_ref is None:
playlist_ref = playlist_snapshot.reference
if playlist_snapshot is None:
playlist_snapshot = playlist_ref.get()
playlist_dict = playlist_snapshot.to_dict()
if playlist_dict.get('type') == 'default':
return Playlist(uri=playlist_dict.get('uri'),
name=playlist_dict.get('name'),
username=username,
db_ref=playlist_ref,
include_recommendations=playlist_dict.get('include_recommendations', False),
recommendation_sample=playlist_dict.get('recommendation_sample', 0),
include_library_tracks=playlist_dict.get('include_library_tracks', False),
parts=playlist_dict.get('parts'),
playlist_references=playlist_dict.get('playlist_references'),
shuffle=playlist_dict.get('shuffle'),
sort=Sort[playlist_dict.get('sort', 'release_date')],
description_overwrite=playlist_dict.get('description_overwrite'),
description_suffix=playlist_dict.get('description_suffix'),
lastfm_stat_count=playlist_dict.get('lastfm_stat_count', 0),
lastfm_stat_album_count=playlist_dict.get('lastfm_stat_album_count', 0),
lastfm_stat_artist_count=playlist_dict.get('lastfm_stat_artist_count', 0),
lastfm_stat_percent=playlist_dict.get('lastfm_stat_percent', 0),
lastfm_stat_album_percent=playlist_dict.get('lastfm_stat_album_percent', 0),
lastfm_stat_artist_percent=playlist_dict.get('lastfm_stat_artist_percent', 0),
lastfm_stat_last_refresh=playlist_dict.get('lastfm_stat_last_refresh'))
elif playlist_dict.get('type') == 'recents':
return RecentsPlaylist(uri=playlist_dict.get('uri'),
name=playlist_dict.get('name'),
username=username,
db_ref=playlist_ref,
include_recommendations=playlist_dict.get('include_recommendations', False),
recommendation_sample=playlist_dict.get('recommendation_sample', 0),
include_library_tracks=playlist_dict.get('include_library_tracks', False),
parts=playlist_dict.get('parts'),
playlist_references=playlist_dict.get('playlist_references'),
shuffle=playlist_dict.get('shuffle'),
sort=Sort[playlist_dict.get('sort', 'release_date')],
description_overwrite=playlist_dict.get('description_overwrite'),
description_suffix=playlist_dict.get('description_suffix'),
lastfm_stat_count=playlist_dict.get('lastfm_stat_count', 0),
lastfm_stat_album_count=playlist_dict.get('lastfm_stat_album_count', 0),
lastfm_stat_artist_count=playlist_dict.get('lastfm_stat_artist_count', 0),
lastfm_stat_percent=playlist_dict.get('lastfm_stat_percent', 0),
lastfm_stat_album_percent=playlist_dict.get('lastfm_stat_album_percent', 0),
lastfm_stat_artist_percent=playlist_dict.get('lastfm_stat_artist_percent', 0),
lastfm_stat_last_refresh=playlist_dict.get('lastfm_stat_last_refresh'),
add_last_month=playlist_dict.get('add_last_month'),
add_this_month=playlist_dict.get('add_this_month'),
day_boundary=playlist_dict.get('day_boundary'))
def update_playlist(username: str, name: str, updates: dict) -> None:
if len(updates) > 0:
logger.debug(f'updating {name} for {username}')
user = get_user(username)
playlists = [i for i in user.db_ref.collection(u'playlists').where(u'name', u'==', name).stream()]
if len(playlists) == 0:
logger.error(f'playlist {name} for {username} not found')
return None
if len(playlists) > 1:
logger.critical(f"multiple {name}'s for {username} found")
return None
playlist = playlists[0].reference
playlist.update(updates)
else:
logger.debug(f'nothing to update for {name} for {username}')
def delete_playlist(username: str, name: str) -> None:
logger.info(f'deleting {name} for {username}')
playlist = get_playlist(username=username, name=name)
if playlist:
playlist.db_ref.delete()
else:
logger.error(f'playlist {name} not found for {username}')

View File

@ -1,5 +1,6 @@
from google.cloud import firestore from google.cloud import firestore
import music.db.database as database import music.db.database as database
from music.model.user import User
import logging import logging
db = firestore.Client() db = firestore.Client()
@ -8,16 +9,16 @@ logger = logging.getLogger(__name__)
class PartGenerator: class PartGenerator:
def __init__(self, user_id=None, username=None): def __init__(self, user: User, username=None):
self.queried_playlists = [] self.queried_playlists = []
self.parts = [] self.parts = []
if user_id: if user:
self.user_id = user_id self.user = user
elif username: elif username:
user_doc = database.get_user_doc_ref(username) pulled_user = database.get_user(username)
if user_doc: if pulled_user:
self.user_id = user_doc.id self.user = pulled_user
else: else:
raise LookupError(f'{username} not found') raise LookupError(f'{username} not found')
else: else:
@ -28,7 +29,7 @@ class PartGenerator:
self.parts = [] self.parts = []
def get_recursive_parts(self, name): def get_recursive_parts(self, name):
logger.info(f'getting part from {name} for {self.user_id}') logger.info(f'getting part from {name} for {self.user.username}')
self.reset() self.reset()
self.process_reference_by_name(name) self.process_reference_by_name(name)
@ -37,27 +38,21 @@ class PartGenerator:
def process_reference_by_name(self, name): def process_reference_by_name(self, name):
playlist_query = [i for i in playlist = database.get_playlist(username=self.user.username, name=name)
database.get_user_playlists_collection(self.user_id).where(u'name', u'==', name).stream()]
if len(playlist_query) > 0: if playlist is not None:
if len(playlist_query) == 1:
if playlist_query[0].id not in self.queried_playlists: if playlist.db_ref.id not in self.queried_playlists:
playlist_doc = playlist_query[0].to_dict() self.parts += playlist.parts
self.parts += playlist_doc['parts']
for i in playlist_doc['playlist_references']: for i in playlist.playlist_references:
if i.id not in self.queried_playlists: if i.id not in self.queried_playlists:
self.process_reference_by_reference(i) self.process_reference_by_reference(i)
else: else:
logger.warning(f'playlist reference {name} already queried') logger.warning(f'playlist reference {name} already queried')
else:
logger.warning(f"multiple {name}'s found")
else: else:
logger.warning(f'playlist reference {name} not found') logger.warning(f'playlist reference {name} not found')

0
music/model/__init__.py Normal file
View File

357
music/model/playlist.py Normal file
View File

@ -0,0 +1,357 @@
from typing import List
from enum import Enum
from datetime import datetime
from google.cloud.firestore import DocumentReference
import music.db.database as database
class Sort(Enum):
shuffle = 1
release_date = 2
class Playlist:
def __init__(self,
uri: str,
name: str,
username: str,
db_ref: DocumentReference,
include_recommendations: bool,
recommendation_sample: int,
include_library_tracks: bool,
parts: List[str],
playlist_references: List[DocumentReference],
shuffle: bool,
sort: Sort = None,
description_overwrite: str = None,
description_suffix: str = None,
lastfm_stat_count: int = None,
lastfm_stat_album_count: int = None,
lastfm_stat_artist_count: int = None,
lastfm_stat_percent: int = None,
lastfm_stat_album_percent: int = None,
lastfm_stat_artist_percent: int = None,
lastfm_stat_last_refresh: datetime = None):
self._uri = uri
self.name = name
self.username = username
self.db_ref = db_ref
self._include_recommendations = include_recommendations
self._recommendation_sample = recommendation_sample
self._include_library_tracks = include_library_tracks
self._parts = parts
self._playlist_references = playlist_references
self._shuffle = shuffle
self._sort = sort
self._description_overwrite = description_overwrite
self._description_suffix = description_suffix
self._lastfm_stat_count = lastfm_stat_count
self._lastfm_stat_album_count = lastfm_stat_album_count
self._lastfm_stat_artist_count = lastfm_stat_artist_count
self._lastfm_stat_percent = lastfm_stat_percent
self._lastfm_stat_album_percent = lastfm_stat_album_percent
self._lastfm_stat_artist_percent = lastfm_stat_artist_percent
self._lastfm_stat_last_refresh = lastfm_stat_last_refresh
def to_dict(self):
return {
'uri': self.uri,
'name': self.name,
'include_recommendations': self.include_recommendations,
'recommendation_sample': self.recommendation_sample,
'include_library_tracks': self.include_library_tracks,
'parts': self.parts,
'playlist_references': [i.get().to_dict().get('name') for i in self.playlist_references],
'shuffle': self.shuffle,
'sort': self.sort.name,
'description_overwrite': self.description_overwrite,
'description_suffix': self.description_suffix,
'lastfm_stat_count': self.lastfm_stat_count,
'lastfm_stat_album_count': self.lastfm_stat_album_count,
'lastfm_stat_artist_count': self.lastfm_stat_artist_count,
'lastfm_stat_percent': self.lastfm_stat_percent,
'lastfm_stat_album_percent': self.lastfm_stat_album_percent,
'lastfm_stat_artist_percent': self.lastfm_stat_artist_percent,
'lastfm_stat_last_refresh': self.lastfm_stat_last_refresh
}
def update_database(self, updates):
database.update_playlist(username=self.username, name=self.name, updates=updates)
@property
def uri(self):
return self._uri
@uri.setter
def uri(self, value):
database.update_playlist(self.username, self.name, {'uri': value})
self._uri = value
@property
def include_recommendations(self):
return self._include_recommendations
@include_recommendations.setter
def include_recommendations(self, value):
database.update_playlist(self.username, self.name, {'include_recommendations': value})
self._include_recommendations = value
@property
def recommendation_sample(self):
return self._recommendation_sample
@recommendation_sample.setter
def recommendation_sample(self, value):
database.update_playlist(self.username, self.name, {'recommendation_sample': value})
self._recommendation_sample = value
@property
def include_library_tracks(self):
return self._include_library_tracks
@include_library_tracks.setter
def include_library_tracks(self, value):
database.update_playlist(self.username, self.name, {'include_library_tracks': value})
self._include_library_tracks = value
@property
def parts(self):
return self._parts
@parts.setter
def parts(self, value):
database.update_playlist(self.username, self.name, {'parts': value})
self._parts = value
@property
def playlist_references(self):
return self._playlist_references
@playlist_references.setter
def playlist_references(self, value):
database.update_playlist(self.username, self.name, {'playlist_references': value})
self._playlist_references = value
@property
def shuffle(self):
return self._shuffle
@shuffle.setter
def shuffle(self, value):
database.update_playlist(self.username, self.name, {'shuffle': value})
self._shuffle = value
@property
def sort(self):
return self._sort
@sort.setter
def sort(self, value):
database.update_playlist(self.username, self.name, {'sort': value.name})
self._sort = value
@property
def description_overwrite(self):
return self._description_overwrite
@description_overwrite.setter
def description_overwrite(self, value):
database.update_playlist(self.username, self.name, {'description_overwrite': value})
self._description_overwrite = value
@property
def description_suffix(self):
return self._description_suffix
@description_suffix.setter
def description_suffix(self, value):
database.update_playlist(self.username, self.name, {'description_suffix': value})
self._description_suffix = value
@property
def lastfm_stat_count(self):
return self._lastfm_stat_count
@lastfm_stat_count.setter
def lastfm_stat_count(self, value):
database.update_playlist(self.username, self.name, {'lastfm_stat_count': value})
self._lastfm_stat_count = value
@property
def lastfm_stat_album_count(self):
return self._lastfm_stat_album_count
@lastfm_stat_album_count.setter
def lastfm_stat_album_count(self, value):
database.update_playlist(self.username, self.name, {'lastfm_stat_album_count': value})
self._lastfm_stat_album_count = value
@property
def lastfm_stat_artist_count(self):
return self._lastfm_stat_artist_count
@lastfm_stat_artist_count.setter
def lastfm_stat_artist_count(self, value):
database.update_playlist(self.username, self.name, {'lastfm_stat_artist_count': value})
self._lastfm_stat_artist_count = value
@property
def lastfm_stat_percent(self):
return self._lastfm_stat_percent
@lastfm_stat_percent.setter
def lastfm_stat_percent(self, value):
database.update_playlist(self.username, self.name, {'lastfm_stat_percent': value})
self._lastfm_stat_percent = value
@property
def lastfm_stat_album_percent(self):
return self._lastfm_stat_album_percent
@lastfm_stat_album_percent.setter
def lastfm_stat_album_percent(self, value):
database.update_playlist(self.username, self.name, {'lastfm_stat_album_percent': value})
self._lastfm_stat_album_percent = value
@property
def lastfm_stat_artist_percent(self):
return self._lastfm_stat_artist_percent
@lastfm_stat_artist_percent.setter
def lastfm_stat_artist_percent(self, value):
database.update_playlist(self.username, self.name, {'lastfm_stat_artist_percent': value})
self._lastfm_stat_artist_percent = value
@property
def lastfm_stat_last_refresh(self):
return self._lastfm_stat_last_refresh
@lastfm_stat_last_refresh.setter
def lastfm_stat_last_refresh(self, value):
database.update_playlist(self.username, self.name, {'lastfm_stat_last_refresh': value})
self._lastfm_stat_last_refresh = value
class RecentsPlaylist(Playlist):
def __init__(self,
uri: str,
name: str,
username: str,
db_ref: DocumentReference,
include_recommendations: bool,
recommendation_sample: int,
include_library_tracks: bool,
parts: List[str],
playlist_references: List[DocumentReference],
shuffle: bool,
sort: Sort = None,
description_overwrite: str = None,
description_suffix: str = None,
lastfm_stat_count: int = None,
lastfm_stat_album_count: int = None,
lastfm_stat_artist_count: int = None,
lastfm_stat_percent: int = None,
lastfm_stat_album_percent: int = None,
lastfm_stat_artist_percent: int = None,
lastfm_stat_last_refresh: datetime = None,
add_last_month: bool = False,
add_this_month: bool = False,
day_boundary: int = 7):
super().__init__(uri=uri,
name=name,
username=username,
db_ref=db_ref,
include_recommendations=include_recommendations,
recommendation_sample=recommendation_sample,
include_library_tracks=include_library_tracks,
parts=parts,
playlist_references=playlist_references,
shuffle=shuffle,
sort=sort,
description_overwrite=description_overwrite,
description_suffix=description_suffix,
lastfm_stat_count=lastfm_stat_count,
lastfm_stat_album_count=lastfm_stat_album_count,
lastfm_stat_artist_count=lastfm_stat_artist_count,
lastfm_stat_percent=lastfm_stat_percent,
lastfm_stat_album_percent=lastfm_stat_album_percent,
lastfm_stat_artist_percent=lastfm_stat_artist_percent,
lastfm_stat_last_refresh=lastfm_stat_last_refresh)
self._add_last_month = add_last_month
self._add_this_month = add_this_month
self._day_boundary = day_boundary
def to_dict(self):
response = super().to_dict()
response.update({
'add_last_month': self.add_last_month,
'add_this_month': self.add_this_month,
'day_boundary': self.day_boundary
})
return response
@property
def add_last_month(self):
return self._add_last_month
@add_last_month.setter
def add_last_month(self, value):
database.update_playlist(self.username, self.name, {'add_last_month': value})
self._add_last_month = value
@property
def add_this_month(self):
return self._add_this_month
@add_this_month.setter
def add_this_month(self, value):
database.update_playlist(self.username, self.name, {'add_this_month': value})
self._add_this_month = value
@property
def day_boundary(self):
return self._day_boundary
@day_boundary.setter
def day_boundary(self, value):
database.update_playlist(self.username, self.name, {'day_boundary': value})
self._day_boundary = value

172
music/model/user.py Normal file
View File

@ -0,0 +1,172 @@
from datetime import datetime
from enum import Enum
from werkzeug.security import generate_password_hash, check_password_hash
import music.db.database as database
class User:
class Type(Enum):
user = 1
admin = 2
def __init__(self,
username: str,
password: str,
db_ref,
email: str,
user_type: Type,
last_login: datetime,
last_refreshed: datetime,
locked: bool,
validated: bool,
spotify_linked: bool,
access_token: str,
refresh_token: str,
token_expiry: int,
lastfm_username: str = None):
self.username = username
self._password = password
self.db_ref = db_ref
self._email = email
self._type = user_type
self._last_login = last_login
self._last_refreshed = last_refreshed
self._locked = locked
self._validated = validated
self._spotify_linked = spotify_linked
self._access_token = access_token
self._refresh_token = refresh_token
self._token_expiry = token_expiry
self._lastfm_username = lastfm_username
def check_password(self, password):
return check_password_hash(self.password, password)
def to_dict(self):
return {
'username': self.username,
'email': self.email,
'type': self.user_type.name,
'last_login': self.last_login,
'spotify_linked': self.spotify_linked,
'lastfm_username': self.lastfm_username
}
def update_database(self, updates):
database.update_user(username=self.username, updates=updates)
@property
def password(self):
return self._password
@password.setter
def password(self, value):
pw_hash = generate_password_hash(value)
database.update_user(self.username, {'password': pw_hash})
self._password = pw_hash
@property
def email(self):
return self._email
@email.setter
def email(self, value):
database.update_user(self.username, {'email': value})
self._email = value
@property
def user_type(self):
return self._type
@user_type.setter
def user_type(self, value):
database.update_user(self.username, {'type': value})
self._type = value
@property
def last_login(self):
return self._last_login
@last_login.setter
def last_login(self, value):
database.update_user(self.username, {'last_login': value})
self._last_login = value
@property
def last_refreshed(self):
return self._last_refreshed
@last_refreshed.setter
def last_refreshed(self, value):
database.update_user(self.username, {'last_refreshed': value})
self._last_refreshed = value
@property
def locked(self):
return self._locked
@locked.setter
def locked(self, value):
database.update_user(self.username, {'locked': value})
self._locked = value
@property
def validated(self):
return self._validated
@validated.setter
def validated(self, value):
database.update_user(self.username, {'validated': value})
self._validated = value
@property
def spotify_linked(self):
return self._spotify_linked
@spotify_linked.setter
def spotify_linked(self, value):
database.update_user(self.username, {'spotify_linked': value})
self._spotify_linked = value
@property
def access_token(self):
return self._access_token
@access_token.setter
def access_token(self, value):
database.update_user(self.username, {'access_token': value})
self._access_token = value
@property
def refresh_token(self):
return self._refresh_token
@refresh_token.setter
def refresh_token(self, value):
database.update_user(self.username, {'refresh_token': value})
self._refresh_token = value
@property
def token_expiry(self):
return self._token_expiry
@token_expiry.setter
def token_expiry(self, value):
database.update_user(self.username, {'refresh_token': value})
self._token_expiry = value
@property
def lastfm_username(self):
return self._lastfm_username
@lastfm_username.setter
def lastfm_username(self, value):
database.update_user(self.username, {'lastfm_username': value})
self._lastfm_username = value

View File

@ -10,13 +10,10 @@ logger = logging.getLogger(__name__)
def create_playlist(username, name): def create_playlist(username, name):
logger.info(f'creating {username} / {name}') logger.info(f'creating {username} / {name}')
users = [i for i in db.collection(u'spotify_users').where(u'username', u'==', username).stream()] user = database.get_user(username)
if user is not None:
if len(users) == 1:
net = database.get_authed_spotify_network(username) net = database.get_authed_spotify_network(username)
playlist = net.create_playlist(net.user.username, name) playlist = net.create_playlist(net.user.username, name)
@ -28,5 +25,5 @@ def create_playlist(username, name):
return None return None
else: else:
logger.error(f'{len(users)} users found') logger.error(f'{username} not found')
return None return None

View File

@ -29,11 +29,11 @@ def play_user_playlist(username,
add_last_month=False, add_last_month=False,
device_name=None): device_name=None):
users = database.get_user_query_stream(username) user = database.get_user(username)
logger.info(f'playing for {username}') logger.info(f'playing for {username}')
if len(users) == 1: if user:
if parts is None and playlists is None: if parts is None and playlists is None:
logger.critical(f'no playlists to use for creation ({username})') logger.critical(f'no playlists to use for creation ({username})')
@ -74,7 +74,7 @@ def play_user_playlist(username,
submit_parts = parts submit_parts = parts
part_generator = PartGenerator(user_id=users[0].id) part_generator = PartGenerator(user=user)
for part in playlists: for part in playlists:
submit_parts += part_generator.get_recursive_parts(part) submit_parts += part_generator.get_recursive_parts(part)
@ -100,5 +100,4 @@ def play_user_playlist(username,
player.play(tracks=tracks, device=device) player.play(tracks=tracks, device=device)
else: else:
logger.critical(f'multiple/no user(s) found ({username})') logger.critical(f'{username} not found')
return None

View File

@ -21,17 +21,18 @@ def refresh_lastfm_track_stats(username, playlist_name):
spotnet = database.get_authed_spotify_network(username=username) spotnet = database.get_authed_spotify_network(username=username)
counter = Counter(fmnet=fmnet, spotnet=spotnet) counter = Counter(fmnet=fmnet, spotnet=spotnet)
database_ref = database.get_user_playlist_ref_by_username(user=username, playlist=playlist_name) playlist = database.get_playlist(username=username, name=playlist_name)
playlist_dict = database_ref.get().to_dict() spotify_playlist = spotnet.get_playlist(uri=Uri(playlist.uri))
spotify_playlist = spotnet.get_playlist(uri=Uri(playlist_dict['uri']))
track_count = counter.count_playlist(playlist=spotify_playlist) track_count = counter.count_playlist(playlist=spotify_playlist)
user_count = fmnet.get_user_scrobble_count() user_count = fmnet.get_user_scrobble_count()
if user_count > 0:
percent = round((track_count * 100) / user_count, 2) percent = round((track_count * 100) / user_count, 2)
else:
percent = 0
database_ref.update({ playlist.update_database({
'lastfm_stat_count': track_count, 'lastfm_stat_count': track_count,
'lastfm_stat_percent': percent, 'lastfm_stat_percent': percent,
@ -47,17 +48,18 @@ def refresh_lastfm_album_stats(username, playlist_name):
spotnet = database.get_authed_spotify_network(username=username) spotnet = database.get_authed_spotify_network(username=username)
counter = Counter(fmnet=fmnet, spotnet=spotnet) counter = Counter(fmnet=fmnet, spotnet=spotnet)
database_ref = database.get_user_playlist_ref_by_username(user=username, playlist=playlist_name) playlist = database.get_playlist(username=username, name=playlist_name)
playlist_dict = database_ref.get().to_dict() spotify_playlist = spotnet.get_playlist(uri=Uri(playlist.uri))
spotify_playlist = spotnet.get_playlist(uri=Uri(playlist_dict['uri']))
album_count = counter.count_playlist(playlist=spotify_playlist, query_album=True) album_count = counter.count_playlist(playlist=spotify_playlist, query_album=True)
user_count = fmnet.get_user_scrobble_count() user_count = fmnet.get_user_scrobble_count()
if user_count > 0:
album_percent = round((album_count * 100) / user_count, 2) album_percent = round((album_count * 100) / user_count, 2)
else:
album_percent = 0
database_ref.update({ playlist.update_database({
'lastfm_stat_album_count': album_count, 'lastfm_stat_album_count': album_count,
'lastfm_stat_album_percent': album_percent, 'lastfm_stat_album_percent': album_percent,
@ -73,17 +75,18 @@ def refresh_lastfm_artist_stats(username, playlist_name):
spotnet = database.get_authed_spotify_network(username=username) spotnet = database.get_authed_spotify_network(username=username)
counter = Counter(fmnet=fmnet, spotnet=spotnet) counter = Counter(fmnet=fmnet, spotnet=spotnet)
database_ref = database.get_user_playlist_ref_by_username(user=username, playlist=playlist_name) playlist = database.get_playlist(username=username, name=playlist_name)
playlist_dict = database_ref.get().to_dict() spotify_playlist = spotnet.get_playlist(uri=Uri(playlist.uri))
spotify_playlist = spotnet.get_playlist(uri=Uri(playlist_dict['uri']))
artist_count = counter.count_playlist(playlist=spotify_playlist, query_artist=True) artist_count = counter.count_playlist(playlist=spotify_playlist, query_artist=True)
user_count = fmnet.get_user_scrobble_count() user_count = fmnet.get_user_scrobble_count()
if user_count > 0:
artist_percent = round((artist_count * 100) / user_count, 2) artist_percent = round((artist_count * 100) / user_count, 2)
else:
artist_percent = 0
database_ref.update({ playlist.update_database({
'lastfm_stat_artist_count': artist_count, 'lastfm_stat_artist_count': artist_count,
'lastfm_stat_artist_percent': artist_percent, 'lastfm_stat_artist_percent': artist_percent,

View File

@ -12,6 +12,7 @@ from spotframework.model.uri import Uri
import music.db.database as database import music.db.database as database
from music.db.part_generator import PartGenerator from music.db.part_generator import PartGenerator
from music.model.playlist import RecentsPlaylist
db = firestore.Client() db = firestore.Client()
@ -19,26 +20,21 @@ logger = logging.getLogger(__name__)
def run_user_playlist(username, playlist_name): def run_user_playlist(username, playlist_name):
user = database.get_user(username)
users = database.get_user_query_stream(username)
logger.info(f'running {username} / {playlist_name}') logger.info(f'running {username} / {playlist_name}')
if len(users) == 1: if user:
playlist_collection = db.collection(u'spotify_users', u'{}'.format(users[0].id), 'playlists') playlist = database.get_playlist(username=username, name=playlist_name)
playlists = [i for i in playlist_collection.where(u'name', u'==', playlist_name).stream()] if playlist is not None:
if len(playlists) == 1: if playlist.uri is None:
playlist_dict = playlists[0].to_dict()
if playlist_dict['uri'] is None:
logger.critical(f'no playlist id to populate ({username}/{playlist_name})') logger.critical(f'no playlist id to populate ({username}/{playlist_name})')
return None return None
if len(playlist_dict['parts']) == 0 and len(playlist_dict['playlist_references']) == 0: if len(playlist.parts) == 0 and len(playlist.playlist_references) == 0:
logger.critical(f'no playlists to use for creation ({username}/{playlist_name})') logger.critical(f'no playlists to use for creation ({username}/{playlist_name})')
return None return None
@ -48,50 +44,49 @@ def run_user_playlist(username, playlist_name):
processors = [DeduplicateByID()] processors = [DeduplicateByID()]
if playlist_dict['shuffle'] is True: if playlist.shuffle is True:
processors.append(Shuffle()) processors.append(Shuffle())
else: else:
processors.append(SortReleaseDate(reverse=True)) processors.append(SortReleaseDate(reverse=True))
part_generator = PartGenerator(user_id=users[0].id) part_generator = PartGenerator(user=user)
submit_parts = part_generator.get_recursive_parts(playlist_dict['name']) submit_parts = part_generator.get_recursive_parts(playlist.name)
params = [ params = [
PlaylistSource.Params(names=submit_parts) PlaylistSource.Params(names=submit_parts)
] ]
if playlist_dict['include_recommendations']: if playlist.include_recommendations:
params.append(RecommendationSource.Params(recommendation_limit=playlist_dict['recommendation_sample'])) params.append(RecommendationSource.Params(recommendation_limit=playlist.recommendation_sample))
if playlist_dict.get('include_library_tracks', False): if playlist.include_library_tracks:
params.append(LibraryTrackSource.Params()) params.append(LibraryTrackSource.Params())
if playlist_dict['type'] == 'recents': if isinstance(playlist, RecentsPlaylist):
boundary_date = datetime.datetime.now(datetime.timezone.utc) - \ boundary_date = datetime.datetime.now(datetime.timezone.utc) - \
datetime.timedelta(days=int(playlist_dict['day_boundary'])) datetime.timedelta(days=int(playlist.day_boundary))
tracks = engine.get_recent_playlist(params=params, tracks = engine.get_recent_playlist(params=params,
processors=processors, processors=processors,
boundary_date=boundary_date, boundary_date=boundary_date,
add_this_month=playlist_dict.get('add_this_month', False), add_this_month=playlist.add_this_month,
add_last_month=playlist_dict.get('add_last_month', False)) add_last_month=playlist.add_last_month)
else: else:
tracks = engine.make_playlist(params=params, tracks = engine.make_playlist(params=params,
processors=processors) processors=processors)
engine.execute_playlist(tracks, Uri(playlist_dict['uri'])) engine.execute_playlist(tracks, Uri(playlist.uri))
overwrite = playlist_dict.get('description_overwrite', None) overwrite = playlist.description_overwrite
suffix = playlist_dict.get('description_suffix', None) suffix = playlist.description_suffix
engine.change_description(sorted(submit_parts), engine.change_description(sorted(submit_parts),
uri=Uri(playlist_dict['uri']), uri=Uri(playlist.uri),
overwrite=overwrite, overwrite=overwrite,
suffix=suffix) suffix=suffix)
else: else:
logger.critical(f'multiple/no playlists found ({username}/{playlist_name})') logger.critical(f'playlist not found ({username}/{playlist_name})')
return None return None
else: else:
logger.critical(f'multiple/no user(s) found ({username}/{playlist_name})') logger.critical(f'{username} not found')
return None