validate input decorators

This commit is contained in:
aj 2020-07-29 10:45:40 +01:00
parent 6f9eef8d1f
commit 7397b40e9d
6 changed files with 207 additions and 216 deletions

View File

@ -7,9 +7,12 @@ import json
import logging
from datetime import datetime
from music.api.decorators import login_required, login_or_basic_auth, admin_required, gae_cron, cloud_task
from music.api.decorators import login_required, login_or_basic_auth, \
admin_required, gae_cron, cloud_task, validate_json, validate_args
from music.cloud import queue_run_user_playlist, offload_or_run_user_playlist
from music.cloud.tasks import update_all_user_playlists, update_playlists
from music.tasks.create_playlist import create_playlist
from music.tasks.run_user_playlist import run_user_playlist
from music.model.user import User
@ -29,174 +32,125 @@ logger = logging.getLogger(__name__)
def all_playlists_route(user=None):
assert user is not None
return jsonify({
'playlists': [i.to_dict() for i in Playlist.collection.parent(user.key).fetch()]
'playlists': [i.to_dict() for i in Playlist.collection.parent(user.key).fetch()]
}), 200
@blueprint.route('/playlist', methods=['GET', 'POST', 'PUT', 'DELETE'])
@blueprint.route('/playlist', methods=['GET', 'DELETE'])
@login_or_basic_auth
def playlist_route(user=None):
@validate_args(('name', str))
def playlist_get_delete_route(user=None):
if request.method == 'GET' or request.method == 'DELETE':
playlist_name = request.args.get('name', None)
playlist = Playlist.collection.parent(user.key).filter('name', '==', request.args['name']).get()
if playlist_name:
playlist = Playlist.collection.parent(user.key).filter('name', '==', playlist_name).get()
if playlist is None:
return jsonify({'error': f'playlist {request.args["name"]} not found'}), 404
if playlist is None:
return jsonify({'error': f'playlist {playlist_name} not found'}), 404
if request.method == "GET":
return jsonify(playlist.to_dict()), 200
if request.method == "GET":
return jsonify(playlist.to_dict()), 200
elif request.method == 'DELETE':
Playlist.collection.parent(user.key).delete(key=playlist.key)
return jsonify({"message": 'playlist deleted', "status": "success"}), 200
elif request.method == 'DELETE':
Playlist.collection.parent(user.key).delete(key=playlist.key)
return jsonify({"message": 'playlist deleted', "status": "success"}), 200
else:
return jsonify({"error": 'no name requested'}), 400
@blueprint.route('/playlist', methods=['POST', 'PUT'])
@login_or_basic_auth
@validate_json(('name', str))
def playlist_post_put_route(user=None):
elif request.method == 'POST' or request.method == 'PUT':
request_json = request.get_json()
request_json = request.get_json()
playlist_name = request_json['name']
playlist_references = []
if 'name' not in request_json:
return jsonify({'error': "no name provided"}), 400
if request_json.get('playlist_references', None):
if request_json['playlist_references'] != -1:
for i in request_json['playlist_references']:
playlist_name = request_json['name']
playlist = Playlist.collection.parent(user.key).filter('name', '==', i).get()
if playlist is not None:
playlist_references.append(db.document(playlist.key))
else:
return jsonify({"message": f'managed playlist {i} not found', "status": "error"}), 400
playlist_parts = request_json.get('parts', None)
if len(playlist_references) == 0 and request_json.get('playlist_references', None) != -1:
playlist_references = None
playlist_references = []
searched_playlist = Playlist.collection.parent(user.key).filter('name', '==', playlist_name).get()
if request_json.get('playlist_references', None):
if request_json['playlist_references'] != -1:
for i in request_json['playlist_references']:
# CREATE
if request.method == 'PUT':
updating_playlist = Playlist.collection.parent(user.key).filter('name', '==', i).get()
if updating_playlist is not None:
playlist_references.append(db.document(updating_playlist.key))
else:
return jsonify({"message": f'managed playlist {i} not found', "status": "error"}), 400
if searched_playlist is not None:
return jsonify({'error': 'playlist already exists'}), 400
if len(playlist_references) == 0 and request_json.get('playlist_references', None) != -1:
playlist_references = None
playlist = Playlist(parent=user.key)
playlist_uri = request_json.get('uri', None)
playlist_shuffle = request_json.get('shuffle', None)
playlist_type = request_json.get('type', None)
playlist.name = request_json['name']
playlist_day_boundary = request_json.get('day_boundary', None)
playlist_add_this_month = request_json.get('add_this_month', None)
playlist_add_last_month = request_json.get('add_last_month', None)
for key in [i for i in Playlist.mutable_keys if i not in ['playlist_references', 'type']]:
setattr(playlist, key, request_json.get(key, None))
playlist_library_tracks = request_json.get('include_library_tracks', None)
playlist.playlist_references = playlist_references
playlist_recommendation = request_json.get('include_recommendations', None)
playlist_recommendation_sample = request_json.get('recommendation_sample', None)
playlist.last_updated = datetime.utcnow()
playlist.lastfm_stat_last_refresh = datetime.utcnow()
playlist_chart_range = request_json.get('chart_range', None)
playlist_chart_limit = request_json.get('chart_limit', None)
if request_json.get('type'):
playlist_type = request_json['type'].strip().lower()
if playlist_type in ['default', 'recents', 'fmchart']:
playlist.type = playlist_type
else:
playlist.type = 'default'
logger.warning(f'invalid type ({playlist_type}), {user.username} / {playlist_name}')
if user.spotify_linked:
new_playlist = create_playlist(user, playlist_name)
playlist.uri = str(new_playlist.uri)
playlist.save()
logger.info(f'added {user.username} / {playlist_name}')
return jsonify({"message": 'playlist added', "status": "success"}), 201
# UPDATE
elif request.method == 'POST':
if searched_playlist is None:
return jsonify({'error': "playlist doesn't exist"}), 400
playlist = Playlist.collection.parent(user.key).filter('name', '==', playlist_name).get()
if request.method == 'PUT':
# ATTRIBUTES
for rec_key, rec_item in request_json.items():
# type and parts require extra validation
if rec_key in [k for k in Playlist.mutable_keys if k not in ['type', 'parts', 'playlist_references']]:
setattr(playlist, rec_key, request_json[rec_key])
if playlist is not None:
return jsonify({'error': 'playlist already exists'}), 400
# COMPONENTS
if request_json.get('parts'):
if request_json['parts'] == -1:
playlist.parts = []
else:
playlist.parts = request_json['parts']
from music.tasks.create_playlist import create_playlist
if playlist_references is not None:
if playlist_references == -1:
playlist.playlist_references = []
else:
playlist.playlist_references = playlist_references
new_db_playlist = Playlist(parent=user.key)
# ATTRIBUTE WITH CHECKS
if request_json.get('type'):
playlist_type = request_json['type'].strip().lower()
if playlist_type in ['default', 'recents', 'fmchart']:
playlist.type = playlist_type
new_db_playlist.name = playlist_name
new_db_playlist.parts = playlist_parts
new_db_playlist.playlist_references = playlist_references
playlist.update()
logger.info(f'updated {user.username} / {playlist_name}')
new_db_playlist.include_library_tracks = playlist_library_tracks
new_db_playlist.include_recommendations = playlist_recommendation
new_db_playlist.recommendation_sample = playlist_recommendation_sample
new_db_playlist.shuffle = playlist_shuffle
new_db_playlist.type = playlist_type
new_db_playlist.last_updated = datetime.utcnow()
new_db_playlist.lastfm_stat_last_refresh = datetime.utcnow()
new_db_playlist.day_boundary = playlist_day_boundary
new_db_playlist.add_this_month = playlist_add_this_month
new_db_playlist.add_last_month = playlist_add_last_month
new_db_playlist.chart_range = playlist_chart_range
new_db_playlist.chart_limit = playlist_chart_limit
if user.spotify_linked:
new_playlist = create_playlist(user, playlist_name)
new_db_playlist.uri = str(new_playlist.uri)
new_db_playlist.save()
logger.info(f'added {user.username} / {playlist_name}')
return jsonify({"message": 'playlist added', "status": "success"}), 201
elif request.method == 'POST':
if playlist is None:
return jsonify({'error': "playlist doesn't exist"}), 400
updating_playlist = Playlist.collection.parent(user.key).filter('name', '==', playlist_name).get()
if playlist_parts is not None:
if playlist_parts == -1:
updating_playlist.parts = []
else:
updating_playlist.parts = playlist_parts
if playlist_references is not None:
if playlist_references == -1:
updating_playlist.playlist_references = []
else:
updating_playlist.playlist_references = playlist_references
if playlist_uri is not None:
updating_playlist.uri = playlist_uri
if playlist_shuffle is not None:
updating_playlist.shuffle = playlist_shuffle
if playlist_day_boundary is not None:
updating_playlist.day_boundary = playlist_day_boundary
if playlist_add_this_month is not None:
updating_playlist.add_this_month = playlist_add_this_month
if playlist_add_last_month is not None:
updating_playlist.add_last_month = playlist_add_last_month
if playlist_library_tracks is not None:
updating_playlist.include_library_tracks = playlist_library_tracks
if playlist_recommendation is not None:
updating_playlist.include_recommendations = playlist_recommendation
if playlist_recommendation_sample is not None:
updating_playlist.recommendation_sample = playlist_recommendation_sample
if playlist_chart_range is not None:
updating_playlist.chart_range = playlist_chart_range
if playlist_chart_limit is not None:
updating_playlist.chart_limit = playlist_chart_limit
if playlist_type is not None:
playlist_type = playlist_type.strip().lower()
if playlist_type in ['default', 'recents', 'fmchart']:
updating_playlist.type = playlist_type
updating_playlist.update()
logger.info(f'updated {user.username} / {playlist_name}')
return jsonify({"message": 'playlist updated', "status": "success"}), 200
return jsonify({"message": 'playlist updated', "status": "success"}), 200
@blueprint.route('/user', methods=['GET', 'POST'])
@ -251,49 +205,38 @@ def all_users_route(user=None):
@blueprint.route('/user/password', methods=['POST'])
@login_required
@validate_json(('new_password', str), ('current_password', str))
def change_password(user=None):
request_json = request.get_json()
if 'new_password' in request_json and 'current_password' in request_json:
if len(request_json['new_password']) == 0:
return jsonify({"error": 'zero length password'}), 400
if len(request_json['new_password']) == 0:
return jsonify({"error": 'zero length password'}), 400
if len(request_json['new_password']) > 30:
return jsonify({"error": 'password too long'}), 400
if len(request_json['new_password']) > 30:
return jsonify({"error": 'password too long'}), 400
if user.check_password(request_json['current_password']):
user.password = generate_password_hash(request_json['new_password'])
user.update()
logger.info(f'password udpated {user.username}')
return jsonify({"message": 'password changed', "status": "success"}), 200
else:
logger.warning(f"incorrect password {user.username}")
return jsonify({'error': 'wrong password provided'}), 401
if user.check_password(request_json['current_password']):
user.password = generate_password_hash(request_json['new_password'])
user.update()
logger.info(f'password udpated {user.username}')
return jsonify({"message": 'password changed', "status": "success"}), 200
else:
return jsonify({'error': 'malformed request, no old_password/new_password'}), 400
logger.warning(f"incorrect password {user.username}")
return jsonify({'error': 'wrong password provided'}), 401
@blueprint.route('/playlist/run', methods=['GET'])
@login_or_basic_auth
@validate_args(('name', str))
def run_playlist(user=None):
playlist_name = request.args.get('name', None)
if playlist_name:
if os.environ.get('DEPLOY_DESTINATION', None) == 'PROD':
queue_run_user_playlist(user.username, playlist_name) # pass to either cloud tasks or functions
else:
run_user_playlist(user.username, playlist_name) # update synchronously
return jsonify({'message': 'execution requested', 'status': 'success'}), 200
if os.environ.get('DEPLOY_DESTINATION', None) == 'PROD':
queue_run_user_playlist(user.username, request.args['name']) # pass to either cloud tasks or functions
else:
logger.warning('no playlist requested')
return jsonify({"error": 'no name requested'}), 400
run_user_playlist(user.username, request.args['name']) # update synchronously
return jsonify({'message': 'execution requested', 'status': 'success'}), 200
@blueprint.route('/playlist/run/task', methods=['POST'])
@ -356,13 +299,10 @@ def run_users_cron():
@blueprint.route('/playlist/image', methods=['GET'])
@login_or_basic_auth
@validate_args(('name', str))
def image(user=None):
name = request.args.get('name', None)
if name is None:
return jsonify({'error': "no name provided"}), 400
_playlist = Playlist.collection.parent(user.key).filter('name', '==', name).get()
_playlist = Playlist.collection.parent(user.key).filter('name', '==', request.args['name']).get()
if _playlist is None:
return jsonify({'error': "playlist not found"}), 404

View File

@ -35,7 +35,7 @@ def login_required(func):
def login_required_wrapper(*args, **kwargs):
if is_logged_in():
user = User.collection.filter('username', '==', session['username'].strip().lower()).get()
return func(user=user, *args, **kwargs)
return func(*args, user=user, **kwargs)
else:
logger.warning('user not logged in')
return jsonify({'error': 'not logged in'}), 401
@ -47,11 +47,11 @@ def login_or_basic_auth(func):
def login_or_basic_auth_wrapper(*args, **kwargs):
if is_logged_in():
user = User.collection.filter('username', '==', session['username'].strip().lower()).get()
return func(user=user, *args, **kwargs)
return func(*args, user=user, **kwargs)
else:
check, user = is_basic_authed()
if check:
return func(user=user, *args, **kwargs)
return func(*args, user=user, **kwargs)
else:
logger.warning('user not logged in')
return jsonify({'error': 'not logged in'}), 401
@ -137,3 +137,44 @@ def cloud_task(func):
return jsonify({'status': 'error', 'message': 'unauthorised'}), 401
return cloud_task_wrapper
def validate_json(*expected_args):
def decorator_validate_json(func):
@functools.wraps(func)
def wrapper_validate_json(*args, **kwargs):
return check_dict(request_params=request.get_json(),
expected_args=expected_args,
func=func,
args=args, kwargs=kwargs)
return wrapper_validate_json
return decorator_validate_json
def validate_args(*expected_args):
def decorator_validate_args(func):
@functools.wraps(func)
def wrapper_validate_args(*args, **kwargs):
return check_dict(request_params=request.args,
expected_args=expected_args,
func=func,
args=args, kwargs=kwargs)
return wrapper_validate_args
return decorator_validate_args
def check_dict(request_params, expected_args, func, args, kwargs):
for expected_arg in expected_args:
if isinstance(expected_arg, tuple):
arg_key = expected_arg[0]
else:
arg_key = expected_arg
if arg_key not in request_params:
return jsonify({'status': 'error', 'message': f'{arg_key} not provided'}), 400
if isinstance(expected_arg, tuple):
if not isinstance(request_params[arg_key], expected_arg[1]):
return jsonify({'status': 'error', 'message': f'{arg_key} not of type {expected_arg[1]}'}), 400
return func(*args, **kwargs)

View File

@ -2,7 +2,7 @@ from flask import Blueprint, request, jsonify
import logging
from music.api.decorators import login_or_basic_auth, spotify_link_required
from music.api.decorators import login_or_basic_auth, spotify_link_required, validate_json
import music.db.database as database
from spotframework.net.network import SpotifyNetworkException
@ -91,39 +91,29 @@ def next_track(user=None):
@blueprint.route('/shuffle', methods=['POST'])
@login_or_basic_auth
@spotify_link_required
@validate_json(('state', bool))
def shuffle(user=None):
request_json = request.get_json()
if 'state' in request_json:
if isinstance(request_json['state'], bool):
net = database.get_authed_spotify_network(user)
player = Player(net)
net = database.get_authed_spotify_network(user)
player = Player(net)
player.shuffle(state=request_json['state'])
return jsonify({'message': f'shuffle set to {request_json["state"]}', 'status': 'success'}), 200
else:
return jsonify({'error': "state not a boolean"}), 400
else:
return jsonify({'error': "no state provided"}), 400
player.shuffle(state=request_json['state'])
return jsonify({'message': f'shuffle set to {request_json["state"]}', 'status': 'success'}), 200
@blueprint.route('/volume', methods=['POST'])
@login_or_basic_auth
@spotify_link_required
@validate_json(('volume', int))
def volume(user=None):
request_json = request.get_json()
if 'volume' in request_json:
if isinstance(request_json['volume'], int):
if 0 <= request_json['volume'] <= 100:
net = database.get_authed_spotify_network(user)
player = Player(net)
if 0 <= request_json['volume'] <= 100:
net = database.get_authed_spotify_network(user)
player = Player(net)
player.volume(value=request_json['volume'])
return jsonify({'message': f'volume set to {request_json["volume"]}', 'status': 'success'}), 200
else:
return jsonify({'error': "volume must be between 0 and 100"}), 400
else:
return jsonify({'error': "volume not a integer"}), 400
player.volume(value=request_json['volume'])
return jsonify({'message': f'volume set to {request_json["volume"]}', 'status': 'success'}), 200
else:
return jsonify({'error': "no volume provided"}), 400
return jsonify({'error': "volume must be between 0 and 100"}), 400

View File

@ -3,7 +3,8 @@ import logging
import json
import os
from music.api.decorators import admin_required, login_or_basic_auth, lastfm_username_required, spotify_link_required, cloud_task, gae_cron
from music.api.decorators import admin_required, login_or_basic_auth, lastfm_username_required, \
spotify_link_required, cloud_task, gae_cron, validate_args
import music.db.database as database
from music.cloud.tasks import refresh_all_user_playlist_stats, refresh_user_playlist_stats, refresh_playlist_task
from music.tasks.refresh_lastfm_stats import refresh_lastfm_track_stats, \
@ -71,24 +72,19 @@ def count(user=None):
@login_or_basic_auth
@spotify_link_required
@lastfm_username_required
@validate_args(('name', str))
def playlist_refresh(user=None):
playlist_name = request.args.get('name', None)
if playlist_name:
if os.environ.get('DEPLOY_DESTINATION', None) == 'PROD':
refresh_playlist_task(user.username, playlist_name)
else:
refresh_lastfm_track_stats(user.username, playlist_name)
refresh_lastfm_album_stats(user.username, playlist_name)
refresh_lastfm_artist_stats(user.username, playlist_name)
return jsonify({'message': 'execution requested', 'status': 'success'}), 200
playlist_name = request.args['name']
if os.environ.get('DEPLOY_DESTINATION', None) == 'PROD':
refresh_playlist_task(user.username, playlist_name)
else:
logger.warning('no playlist requested')
return jsonify({"error": 'no name requested'}), 400
refresh_lastfm_track_stats(user.username, playlist_name)
refresh_lastfm_album_stats(user.username, playlist_name)
refresh_lastfm_artist_stats(user.username, playlist_name)
return jsonify({'message': 'execution requested', 'status': 'success'}), 200
@blueprint.route('/playlist/refresh/task/track', methods=['POST'])

View File

@ -3,7 +3,7 @@ from werkzeug.security import generate_password_hash
from music.model.user import User
from music.model.config import Config
import urllib
from urllib.parse import urlencode, urlunparse
import datetime
import logging
from base64 import b64encode
@ -111,16 +111,17 @@ def auth():
if 'username' in session:
config = Config.collection.get("config/music-tools")
params = urllib.parse.urlencode(
params = urlencode(
{
'client_id': config.spotify_client_id,
'response_type': 'code',
'scope': 'playlist-modify-public playlist-modify-private playlist-read-private user-read-playback-state user-modify-playback-state user-library-read',
'scope': 'playlist-modify-public playlist-modify-private playlist-read-private '
'user-read-playback-state user-modify-playback-state user-library-read',
'redirect_uri': 'https://music.sarsoo.xyz/auth/spotify/token'
}
)
return redirect(urllib.parse.urlunparse(['https', 'accounts.spotify.com', 'authorize', '', params, '']))
return redirect(urlunparse(['https', 'accounts.spotify.com', 'authorize', '', params, '']))
return redirect(url_for('index'))

View File

@ -49,6 +49,29 @@ class Playlist(Model):
chart_range = TextField(default='MONTH')
chart_limit = NumberField(default=50)
mutable_keys = [
'type',
'include_recommendations',
'recommendation_sample',
'include_library_tracks',
'parts',
'playlist_references',
'shuffle',
'sort',
'description_overwrite',
'description_suffix',
'add_last_month',
'add_this_month',
'day_boundary',
'chart_range',
'chart_limit'
]
def to_dict(self):
to_return = super().to_dict()