Mixonomer/music/auth/auth.py

291 lines
9.2 KiB
Python

from flask import Blueprint, session, flash, request, redirect, url_for, render_template, jsonify
from werkzeug.security import generate_password_hash
from music.model.user import User, get_admins
from music.model.config import Config
from music.auth.jwt_keys import generate_key
from music.api.decorators import no_cache
from music.notif.notifier import notify_admin_new_user
from music.magic_strings import SPOT_CLIENT_URI, SPOT_SECRET_URI
from urllib.parse import urlencode, urlunparse
import datetime
from datetime import timedelta
from numbers import Number
import logging
from base64 import b64encode
from google.cloud import secretmanager
import requests
blueprint = Blueprint('authapi', __name__)
logger = logging.getLogger(__name__)
secret_client = secretmanager.SecretManagerServiceClient()
@blueprint.route('/login', methods=['GET', 'POST'])
@no_cache
def login():
"""Login route allowing retrieval of HTML page and submission of results
Returns:
HTTP Response: Home page redirect for GET, login request on POST
"""
if request.method == 'POST':
session.pop('username', None)
username = request.form.get('username', None)
password = request.form.get('password', None)
if username is None or password is None:
flash('malformed request')
return redirect(url_for('index'))
user = User.collection.filter('username', '==', username.strip().lower()).get()
if user is None:
flash('user not found')
return redirect(url_for('index'))
if user.check_password(password):
if user.locked:
logger.warning(f'locked account attempt {username}')
flash('account locked')
return redirect(url_for('index'))
user.last_login = datetime.datetime.utcnow()
user.update()
logger.info(f'success {username}')
session['username'] = username
return redirect(url_for('app_route'))
else:
logger.warning(f'failed attempt {username}')
flash('incorrect password')
return redirect(url_for('index'))
else:
return redirect(url_for('index'))
@blueprint.route('/logout', methods=['GET', 'POST'])
@no_cache
def logout():
if 'username' in session:
logger.info(f'logged out {session["username"]}')
session.pop('username', None)
flash('logged out')
return redirect(url_for('index'))
@blueprint.route('/token', methods=['POST'])
@no_cache
def jwt_token():
"""Generate JWT
Returns:
HTTP Response: token request on POST
"""
request_json = request.get_json()
username = request_json.get('username', None)
password = request_json.get('password', None)
if username is None or password is None:
return jsonify({"message": 'username and password fields required', "status": "error"}), 400
user = User.collection.filter('username', '==', username.strip().lower()).get()
if user is None:
return jsonify({"message": 'user not found', "status": "error"}), 404
if user.check_password(password):
if user.locked:
logger.warning(f'locked account token attempt {username}')
return jsonify({"message": 'user locked', "status": "error"}), 403
user.last_keygen = datetime.datetime.utcnow()
user.update()
logger.info(f'generating token for {username}')
config = Config.collection.get("config/music-tools")
if isinstance(expiry := request_json.get('expiry', None), Number):
expiry = min(expiry, config.jwt_max_length)
else:
expiry = config.jwt_default_length
generated_token = generate_key(user, timeout=timedelta(seconds=expiry))
return jsonify({"token": generated_token, "status": "success"}), 200
else:
logger.warning(f'failed token attempt {username}')
return jsonify({"message": 'authentication failed', "status": "error"}), 401
@blueprint.route('/register', methods=['GET', 'POST'])
@no_cache
def register():
if 'username' in session:
return redirect(url_for('index'))
if request.method == 'GET':
return render_template('register.html')
else:
api_user = False
username = request.form.get('username', None)
password = request.form.get('password', None)
password_again = request.form.get('password_again', None)
if username is None or password is None or password_again is None:
if (request_json := request.get_json()) is not None:
username = request_json.get('username', None)
password = request_json.get('password', None)
password_again = request_json.get('password_again', None)
api_user = True
if username is None or password is None or password_again is None:
logger.info(f'malformed register api request, {username}')
return jsonify({'status': 'error', 'message': 'malformed request'}), 400
else:
flash('malformed request')
return redirect('authapi.register')
username = username.lower()
if password != password_again:
if api_user:
return jsonify({'message': 'passwords didnt match', 'status': 'error'}), 400
else:
flash('password mismatch')
return redirect('authapi.register')
if username in [i.username for i in
User.collection.fetch()]:
if api_user:
return jsonify({'message': 'user already exists', 'status': 'error'}), 409
else:
flash('username already registered')
return redirect('authapi.register')
user = User()
user.username = username
user.password = generate_password_hash(password)
user.last_login = datetime.datetime.utcnow()
user.save()
logger.info(f'new user {username}')
for admin in get_admins():
notify_admin_new_user(admin, username)
if api_user:
return jsonify({'message': 'account created', 'status': 'succeeded'}), 201
else:
session['username'] = username
return redirect(url_for('authapi.auth'))
@blueprint.route('/spotify')
@no_cache
def auth():
if 'username' in session:
spot_client = secret_client.access_secret_version(request={"name": SPOT_CLIENT_URI})
params = urlencode(
{
'client_id': spot_client.payload.data.decode("UTF-8"),
'response_type': 'code',
'scope': 'playlist-modify-public playlist-modify-private playlist-read-private '
'user-read-playback-state user-modify-playback-state user-library-read',
'redirect_uri': 'https://mixonomer.sarsoo.xyz/auth/spotify/token'
}
)
return redirect(urlunparse(['https', 'accounts.spotify.com', 'authorize', '', params, '']))
return redirect(url_for('index'))
@blueprint.route('/spotify/token')
@no_cache
def token():
if 'username' in session:
code = request.args.get('code', None)
if code is None:
flash('authorization failed')
return redirect('app_route')
else:
spot_client = secret_client.access_secret_version(request={"name": SPOT_CLIENT_URI})
spot_secret = secret_client.access_secret_version(request={"name": SPOT_SECRET_URI})
idsecret = b64encode(
bytes(spot_client.payload.data.decode("UTF-8") + ':' + spot_secret.payload.data.decode("UTF-8"), "utf-8")
).decode("ascii")
headers = {'Authorization': 'Basic %s' % idsecret}
data = {
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': 'https://mixonomer.sarsoo.xyz/auth/spotify/token'
}
req = requests.post('https://accounts.spotify.com/api/token', data=data, headers=headers)
if 200 <= req.status_code < 300:
resp = req.json()
user = User.collection.filter('username', '==', session['username'].strip().lower()).get()
user.access_token = resp['access_token']
user.refresh_token = resp['refresh_token']
user.last_refreshed = datetime.datetime.now(datetime.timezone.utc)
user.token_expiry = resp['expires_in']
user.spotify_linked = True
user.update()
else:
flash('http error on token request')
return redirect('app_route')
return redirect('/app/settings/spotify')
return redirect(url_for('index'))
@blueprint.route('/spotify/deauth')
@no_cache
def deauth():
if 'username' in session:
user = User.collection.filter('username', '==', session['username'].strip().lower()).get()
user.access_token = None
user.refresh_token = None
user.last_refreshed = datetime.datetime.now(datetime.timezone.utc)
user.token_expiry = None
user.spotify_linked = False
user.update()
return redirect('/app/settings/spotify')
return redirect(url_for('index'))