diff --git a/music/api/api.py b/music/api/api.py index c4c3616..6212d32 100644 --- a/music/api/api.py +++ b/music/api/api.py @@ -30,7 +30,7 @@ logger = logging.getLogger(__name__) @blueprint.route('/playlists', methods=['GET']) @login_or_jwt @no_locked_users -def all_playlists_route(auth=None, user=None): +def all_playlists_route(auth: dict = None, user: User = None): """Retrieve all playlists for a given user Args: @@ -50,12 +50,12 @@ def all_playlists_route(auth=None, user=None): @login_or_jwt @no_locked_users @validate_args(('name', str)) -def playlist_get_delete_route(auth=None,user=None): +def playlist_get_delete_route(auth: dict = None, user: User = None): - playlist = user.get_playlist(request.args['name'], raise_error=False) + playlist = user.get_playlist(name := request.args['name'], raise_error=False) if playlist is None: - return jsonify({'error': f'playlist {request.args["name"]} not found'}), 404 + return jsonify({'error': f'playlist {name} not found'}), 404 if request.method == "GET": return jsonify(playlist.to_dict()), 200 @@ -69,7 +69,7 @@ def playlist_get_delete_route(auth=None,user=None): @login_or_jwt @no_locked_users @validate_json(('name', str)) -def playlist_post_put_route(auth=None, user=None): +def playlist_post_put_route(auth: dict = None, user: User = None): request_json = request.get_json() @@ -109,8 +109,8 @@ def playlist_post_put_route(auth=None, user=None): playlist.last_updated = datetime.utcnow() playlist.lastfm_stat_last_refresh = datetime.utcnow() - if request_json.get('type'): - playlist_type = request_json['type'].strip().lower() + if playlist_type := request_json.get('type'): + playlist_type = playlist_type.strip().lower() if playlist_type in ['default', 'recents', 'fmchart']: playlist.type = playlist_type else: @@ -188,7 +188,7 @@ def playlist_post_put_route(auth=None, user=None): @blueprint.route('/user', methods=['GET', 'POST']) @login_or_jwt @no_locked_users -def user_route(auth=None, user: User=None): +def user_route(auth: dict = None, user: User = None): assert user is not None if request.method == 'GET': @@ -197,24 +197,22 @@ def user_route(auth=None, user: User=None): else: # POST request_json = request.get_json() - if 'username' in request_json: - if request_json['username'].strip().lower() != user.username: - if user.type != "admin": - return jsonify({'status': 'error', 'message': 'unauthorized'}), 401 + if (username := request_json.get('username')) and username.strip().lower() != user.username: + if user.type != "admin": + return jsonify({'status': 'error', 'message': 'unauthorized'}), 401 - user = User.collection.filter('username', '==', request_json['username'].strip().lower()).get() + user = User.collection.filter('username', '==', request_json['username'].strip().lower()).get() - if 'locked' in request_json: - if user.type == "admin": - logger.info(f'updating lock {user.username} / {request_json["locked"]}') - user.locked = request_json['locked'] + if (locked := request_json.get('locked')) and user.type == "admin": + logger.info(f'updating lock {user.username} / {locked}') + user.locked = locked - if 'spotify_linked' in request_json: + if (spotify_linked := request_json.get('spotify_linked')) and not spotify_linked: logger.info(f'deauthing {user.username}') - if request_json['spotify_linked'] is False: - user.access_token = None - user.refresh_token = None - user.spotify_linked = False + + user.access_token = None + user.refresh_token = None + user.spotify_linked = False if 'lastfm_username' in request_json: logger.info(f'updating lastfm username {user.username} -> {request_json["lastfm_username"]}') @@ -223,17 +221,15 @@ def user_route(auth=None, user: User=None): if user.lastfm_username is None: user.lastfm_username = "" - if 'apns_token' in request_json: - token = request_json['apns_token'] - + if apns_token := request_json.get('apns_token'): if user.apns_tokens is None: user.apns_tokens = [] - if token not in user.apns_tokens: - logger.info(f'adding apns token {user.username} -> {token}') - user.apns_tokens = user.apns_tokens + [token] + if apns_token not in user.apns_tokens: + logger.info(f'adding apns token {user.username} -> {apns_token}') + user.apns_tokens = user.apns_tokens + [apns_token] else: - logger.info(f'skipping duplicate apns token {user.username} -> {token}') + logger.info(f'skipping duplicate apns token {user.username} -> {apns_token}') user.update() @@ -241,9 +237,10 @@ def user_route(auth=None, user: User=None): return jsonify({'message': 'account updated', 'status': 'succeeded'}), 200 + @blueprint.route('/user', methods=['DELETE']) @login_or_jwt -def user_delete_route(auth=None, user=None): +def user_delete_route(auth: dict = None, user: User = None): assert user is not None if user.type == 'admin' and (username_override := request.args.get('username')) is not None: @@ -255,11 +252,12 @@ def user_delete_route(auth=None, user=None): return jsonify({'message': 'account deleted', 'status': 'succeeded'}), 200 + @blueprint.route('/users', methods=['GET']) @login_or_jwt @admin_required @no_locked_users -def all_users_route(auth=None, user=None): +def all_users_route(auth: dict = None, user: User = None): return jsonify({ 'accounts': [i.to_dict() for i in User.collection.fetch()] }), 200 @@ -269,17 +267,17 @@ def all_users_route(auth=None, user=None): @login_required @no_locked_users @validate_json(('new_password', str), ('current_password', str)) -def change_password(user=None): +def change_password(user: User = None): request_json = request.get_json() - if len(request_json['new_password']) == 0: + if len(new_password := request_json['new_password']) == 0: return jsonify({"error": 'zero length password'}), 400 - if len(request_json['new_password']) > 30: + if len(new_password) > 30: return jsonify({"error": 'password too long'}), 400 if user.check_password(request_json['current_password']): - user.password = generate_password_hash(request_json['new_password']) + user.password = generate_password_hash(new_password) user.update() logger.info(f'password udpated {user.username}') @@ -293,7 +291,7 @@ def change_password(user=None): @login_or_jwt @no_locked_users @validate_args(('name', str)) -def run_playlist(auth=None, user=None): +def run_playlist(auth: dict = None, user: User = None): if os.environ.get('DEPLOY_DESTINATION', None) == 'PROD': queue_run_user_playlist(user.username, request.args['name']) # pass to either cloud tasks or functions @@ -323,7 +321,7 @@ def run_playlist_task(): # receives cloud tasks request for update @blueprint.route('/playlist/run/user', methods=['GET']) @login_or_jwt @no_locked_users -def run_user(auth=None, user=None): +def run_user(auth: dict = None, user: User = None): if user.type == 'admin': user_name = request.args.get('username', user.username) @@ -349,7 +347,7 @@ def run_user_task(): @login_or_jwt @admin_required @no_locked_users -def run_users(auth=None, user=None): +def run_users(auth: dict = None, user: User = None): update_all_user_playlists() return jsonify({'message': 'executed all users', 'status': 'success'}), 200 @@ -360,7 +358,7 @@ def run_users(auth=None, user=None): @spotify_link_required @no_locked_users @validate_args(('name', str)) -def image(auth=None, user=None): +def image(auth: dict = None, user: User = None): _playlist = user.get_playlist(request.args['name'], raise_error=False) if _playlist is None: diff --git a/music/api/spotfm.py b/music/api/spotfm.py index 3ad25e5..cb03888 100644 --- a/music/api/spotfm.py +++ b/music/api/spotfm.py @@ -26,10 +26,9 @@ logger = logging.getLogger(__name__) @no_locked_users def count(auth=None, user=None): - uri = request.args.get('uri', None) playlist_name = request.args.get('playlist_name', None) - if uri is None and playlist_name is None: + if uri := request.args.get('uri') is None and playlist_name is None: return jsonify({'error': 'no input provided'}), 401 if uri: diff --git a/music/api/tag.py b/music/api/tag.py index 9d583e3..be041be 100644 --- a/music/api/tag.py +++ b/music/api/tag.py @@ -60,38 +60,38 @@ def put_tag(tag_id, user): request_json = request.get_json() - if request_json.get('name'): - db_tag.name = request_json['name'].strip() + if name := request_json.get('name'): + db_tag.name = name.strip() - if request_json.get('time_objects') is not None: - db_tag.time_objects = request_json['time_objects'] + if time_objects := request_json.get('time_objects') is not None: + db_tag.time_objects = time_objects - if request_json.get('tracks') is not None: + if tracks := request_json.get('tracks') is not None: db_tag.tracks = [ { 'name': track['name'].strip(), 'artist': track['artist'].strip() } - for track in request_json['tracks'] + for track in tracks if track.get('name') and track.get('artist') ] - if request_json.get('albums') is not None: + if albums := request_json.get('albums') is not None: db_tag.albums = [ { 'name': album['name'].strip(), 'artist': album['artist'].strip() } - for album in request_json['albums'] + for album in albums if album.get('name') and album.get('artist') ] - if request_json.get('artists') is not None: + if artists := request_json.get('artists') is not None: db_tag.artists = [ { 'name': artist['name'].strip() } - for artist in request_json['artists'] + for artist in artists if artist.get('name') ]