walrus refactors

This commit is contained in:
Andy Pack 2022-11-28 22:30:00 +00:00
parent c770ac5c7d
commit de23eb0065
Signed by: sarsoo
GPG Key ID: A55BA3536A5E0ED7
3 changed files with 48 additions and 51 deletions

View File

@ -30,7 +30,7 @@ logger = logging.getLogger(__name__)
@blueprint.route('/playlists', methods=['GET'])
@login_or_jwt
@no_locked_users
def all_playlists_route(auth=None, user=None):
def all_playlists_route(auth: dict = None, user: User = None):
"""Retrieve all playlists for a given user
Args:
@ -50,12 +50,12 @@ def all_playlists_route(auth=None, user=None):
@login_or_jwt
@no_locked_users
@validate_args(('name', str))
def playlist_get_delete_route(auth=None,user=None):
def playlist_get_delete_route(auth: dict = None, user: User = None):
playlist = user.get_playlist(request.args['name'], raise_error=False)
playlist = user.get_playlist(name := request.args['name'], raise_error=False)
if playlist is None:
return jsonify({'error': f'playlist {request.args["name"]} not found'}), 404
return jsonify({'error': f'playlist {name} not found'}), 404
if request.method == "GET":
return jsonify(playlist.to_dict()), 200
@ -69,7 +69,7 @@ def playlist_get_delete_route(auth=None,user=None):
@login_or_jwt
@no_locked_users
@validate_json(('name', str))
def playlist_post_put_route(auth=None, user=None):
def playlist_post_put_route(auth: dict = None, user: User = None):
request_json = request.get_json()
@ -109,8 +109,8 @@ def playlist_post_put_route(auth=None, user=None):
playlist.last_updated = datetime.utcnow()
playlist.lastfm_stat_last_refresh = datetime.utcnow()
if request_json.get('type'):
playlist_type = request_json['type'].strip().lower()
if playlist_type := request_json.get('type'):
playlist_type = playlist_type.strip().lower()
if playlist_type in ['default', 'recents', 'fmchart']:
playlist.type = playlist_type
else:
@ -188,7 +188,7 @@ def playlist_post_put_route(auth=None, user=None):
@blueprint.route('/user', methods=['GET', 'POST'])
@login_or_jwt
@no_locked_users
def user_route(auth=None, user: User=None):
def user_route(auth: dict = None, user: User = None):
assert user is not None
if request.method == 'GET':
@ -197,24 +197,22 @@ def user_route(auth=None, user: User=None):
else: # POST
request_json = request.get_json()
if 'username' in request_json:
if request_json['username'].strip().lower() != user.username:
if user.type != "admin":
return jsonify({'status': 'error', 'message': 'unauthorized'}), 401
if (username := request_json.get('username')) and username.strip().lower() != user.username:
if user.type != "admin":
return jsonify({'status': 'error', 'message': 'unauthorized'}), 401
user = User.collection.filter('username', '==', request_json['username'].strip().lower()).get()
user = User.collection.filter('username', '==', request_json['username'].strip().lower()).get()
if 'locked' in request_json:
if user.type == "admin":
logger.info(f'updating lock {user.username} / {request_json["locked"]}')
user.locked = request_json['locked']
if (locked := request_json.get('locked')) and user.type == "admin":
logger.info(f'updating lock {user.username} / {locked}')
user.locked = locked
if 'spotify_linked' in request_json:
if (spotify_linked := request_json.get('spotify_linked')) and not spotify_linked:
logger.info(f'deauthing {user.username}')
if request_json['spotify_linked'] is False:
user.access_token = None
user.refresh_token = None
user.spotify_linked = False
user.access_token = None
user.refresh_token = None
user.spotify_linked = False
if 'lastfm_username' in request_json:
logger.info(f'updating lastfm username {user.username} -> {request_json["lastfm_username"]}')
@ -223,17 +221,15 @@ def user_route(auth=None, user: User=None):
if user.lastfm_username is None:
user.lastfm_username = ""
if 'apns_token' in request_json:
token = request_json['apns_token']
if apns_token := request_json.get('apns_token'):
if user.apns_tokens is None:
user.apns_tokens = []
if token not in user.apns_tokens:
logger.info(f'adding apns token {user.username} -> {token}')
user.apns_tokens = user.apns_tokens + [token]
if apns_token not in user.apns_tokens:
logger.info(f'adding apns token {user.username} -> {apns_token}')
user.apns_tokens = user.apns_tokens + [apns_token]
else:
logger.info(f'skipping duplicate apns token {user.username} -> {token}')
logger.info(f'skipping duplicate apns token {user.username} -> {apns_token}')
user.update()
@ -241,9 +237,10 @@ def user_route(auth=None, user: User=None):
return jsonify({'message': 'account updated', 'status': 'succeeded'}), 200
@blueprint.route('/user', methods=['DELETE'])
@login_or_jwt
def user_delete_route(auth=None, user=None):
def user_delete_route(auth: dict = None, user: User = None):
assert user is not None
if user.type == 'admin' and (username_override := request.args.get('username')) is not None:
@ -255,11 +252,12 @@ def user_delete_route(auth=None, user=None):
return jsonify({'message': 'account deleted', 'status': 'succeeded'}), 200
@blueprint.route('/users', methods=['GET'])
@login_or_jwt
@admin_required
@no_locked_users
def all_users_route(auth=None, user=None):
def all_users_route(auth: dict = None, user: User = None):
return jsonify({
'accounts': [i.to_dict() for i in User.collection.fetch()]
}), 200
@ -269,17 +267,17 @@ def all_users_route(auth=None, user=None):
@login_required
@no_locked_users
@validate_json(('new_password', str), ('current_password', str))
def change_password(user=None):
def change_password(user: User = None):
request_json = request.get_json()
if len(request_json['new_password']) == 0:
if len(new_password := request_json['new_password']) == 0:
return jsonify({"error": 'zero length password'}), 400
if len(request_json['new_password']) > 30:
if len(new_password) > 30:
return jsonify({"error": 'password too long'}), 400
if user.check_password(request_json['current_password']):
user.password = generate_password_hash(request_json['new_password'])
user.password = generate_password_hash(new_password)
user.update()
logger.info(f'password udpated {user.username}')
@ -293,7 +291,7 @@ def change_password(user=None):
@login_or_jwt
@no_locked_users
@validate_args(('name', str))
def run_playlist(auth=None, user=None):
def run_playlist(auth: dict = None, user: User = None):
if os.environ.get('DEPLOY_DESTINATION', None) == 'PROD':
queue_run_user_playlist(user.username, request.args['name']) # pass to either cloud tasks or functions
@ -323,7 +321,7 @@ def run_playlist_task(): # receives cloud tasks request for update
@blueprint.route('/playlist/run/user', methods=['GET'])
@login_or_jwt
@no_locked_users
def run_user(auth=None, user=None):
def run_user(auth: dict = None, user: User = None):
if user.type == 'admin':
user_name = request.args.get('username', user.username)
@ -349,7 +347,7 @@ def run_user_task():
@login_or_jwt
@admin_required
@no_locked_users
def run_users(auth=None, user=None):
def run_users(auth: dict = None, user: User = None):
update_all_user_playlists()
return jsonify({'message': 'executed all users', 'status': 'success'}), 200
@ -360,7 +358,7 @@ def run_users(auth=None, user=None):
@spotify_link_required
@no_locked_users
@validate_args(('name', str))
def image(auth=None, user=None):
def image(auth: dict = None, user: User = None):
_playlist = user.get_playlist(request.args['name'], raise_error=False)
if _playlist is None:

View File

@ -26,10 +26,9 @@ logger = logging.getLogger(__name__)
@no_locked_users
def count(auth=None, user=None):
uri = request.args.get('uri', None)
playlist_name = request.args.get('playlist_name', None)
if uri is None and playlist_name is None:
if uri := request.args.get('uri') is None and playlist_name is None:
return jsonify({'error': 'no input provided'}), 401
if uri:

View File

@ -60,38 +60,38 @@ def put_tag(tag_id, user):
request_json = request.get_json()
if request_json.get('name'):
db_tag.name = request_json['name'].strip()
if name := request_json.get('name'):
db_tag.name = name.strip()
if request_json.get('time_objects') is not None:
db_tag.time_objects = request_json['time_objects']
if time_objects := request_json.get('time_objects') is not None:
db_tag.time_objects = time_objects
if request_json.get('tracks') is not None:
if tracks := request_json.get('tracks') is not None:
db_tag.tracks = [
{
'name': track['name'].strip(),
'artist': track['artist'].strip()
}
for track in request_json['tracks']
for track in tracks
if track.get('name') and track.get('artist')
]
if request_json.get('albums') is not None:
if albums := request_json.get('albums') is not None:
db_tag.albums = [
{
'name': album['name'].strip(),
'artist': album['artist'].strip()
}
for album in request_json['albums']
for album in albums
if album.get('name') and album.get('artist')
]
if request_json.get('artists') is not None:
if artists := request_json.get('artists') is not None:
db_tag.artists = [
{
'name': artist['name'].strip()
}
for artist in request_json['artists']
for artist in artists
if artist.get('name')
]