proof of concept jwt auth

This commit is contained in:
andy 2022-08-08 18:37:17 +01:00
parent eb479274cd
commit ac6cc976eb
8 changed files with 194 additions and 5 deletions

View File

@ -7,7 +7,7 @@ import json
import logging
from datetime import datetime
from music.api.decorators import login_required, login_or_basic_auth, \
from music.api.decorators import login_or_jwt, login_required, login_or_basic_auth, \
admin_required, cloud_task, validate_json, validate_args, spotify_link_required
from music.cloud import queue_run_user_playlist, offload_or_run_user_playlist
from music.cloud.tasks import update_all_user_playlists, update_playlists
@ -46,9 +46,9 @@ def all_playlists_route(user=None):
@blueprint.route('/playlist', methods=['GET', 'DELETE'])
@login_or_basic_auth
@login_or_jwt
@validate_args(('name', str))
def playlist_get_delete_route(user=None):
def playlist_get_delete_route(auth=None,user=None):
playlist = user.get_playlist(request.args['name'], raise_error=False)

View File

@ -4,6 +4,7 @@ import logging
from flask import session, request, jsonify
from music.model.user import User
from music.auth.jwt_keys import validate_key
logger = logging.getLogger(__name__)
@ -30,6 +31,19 @@ def is_basic_authed():
return False, None
def is_jwt_authed():
if request.headers.get('Authorization', None):
unparsed = request.headers.get('Authorization')
if unparsed.startswith('Bearer '):
token = validate_key(unparsed.removeprefix('Bearer ').strip())
if token is not None:
return token
def login_required(func):
@functools.wraps(func)
def login_required_wrapper(*args, **kwargs):
@ -42,6 +56,48 @@ def login_required(func):
return login_required_wrapper
def login_or_jwt(func):
@functools.wraps(func)
def login_or_jwt_wrapper(*args, **kwargs):
if is_logged_in():
user = User.collection.filter('username', '==', session['username'].strip().lower()).get()
return func(*args, user=user, **kwargs)
else:
token = is_jwt_authed()
if token is not None:
user = User.collection.filter('username', '==', token['sub'].strip().lower()).get()
if user is not None:
return func(*args, auth=token, user=user, **kwargs)
else:
logger.warning(f'user {token["sub"]} not found')
return jsonify({'error': f'user {token["sub"]} not found'}), 404
else:
logger.warning('user not authorised')
return jsonify({'error': 'not authorised'}), 401
return login_or_jwt_wrapper
def jwt_required(func):
@functools.wraps(func)
def jwt_required_wrapper(*args, **kwargs):
token = is_jwt_authed()
if token is not None:
user = User.collection.filter('username', '==', token['sub'].strip().lower()).get()
if user is not None:
return func(*args, auth=token, user=user, **kwargs)
else:
logger.warning(f'user {token["sub"]} not found')
return jsonify({'error': f'user {token["sub"]} not found'}), 404
else:
logger.warning('user not authorised')
return jsonify({'error': 'not authorised'}), 401
return jwt_required_wrapper
def login_or_basic_auth(func):
@functools.wraps(func)
def login_or_basic_auth_wrapper(*args, **kwargs):

View File

@ -1,7 +1,8 @@
from flask import Blueprint, session, flash, request, redirect, url_for, render_template
from flask import Blueprint, session, flash, request, redirect, url_for, render_template, jsonify
from werkzeug.security import generate_password_hash
from music.model.user import User
from music.model.config import Config
from music.auth.jwt_keys import generate_key
from urllib.parse import urlencode, urlunparse
import datetime
@ -68,6 +69,44 @@ def logout():
flash('logged out')
return redirect(url_for('index'))
@blueprint.route('/token', methods=['POST'])
def jwt_token():
"""Generate JWT
Returns:
HTTP Response: token request on POST
"""
request_json = request.get_json()
username = request_json.get('username', None)
password = request_json.get('password', None)
if username is None or password is None:
return jsonify({"message": 'username and password fields required', "status": "error"}), 400
user = User.collection.filter('username', '==', username.strip().lower()).get()
if user is None:
return jsonify({"message": 'user not found', "status": "error"}), 404
if user.check_password(password):
if user.locked:
logger.warning(f'locked account token attempt {username}')
return jsonify({"message": 'user locked', "status": "error"}), 403
user.last_login = datetime.datetime.utcnow()
user.update()
logger.info(f'generating token for {username}')
token = generate_key(user)
return jsonify({"token": token, "status": "success"}), 200
else:
logger.warning(f'failed token attempt {username}')
return jsonify({"message": 'authentication failed', "status": "error"}), 401
@blueprint.route('/register', methods=['GET', 'POST'])
def register():

41
music/auth/jwt_keys.py Normal file
View File

@ -0,0 +1,41 @@
from ctypes import Union
from datetime import timedelta, datetime, timezone
import jwt
from music.model.user import User
from music.model.config import Config
def get_jwt_secret_key() -> str:
config = Config.collection.get("config/music-tools")
if config.jwt_secret_key is None or len(config.jwt_secret_key) == 0:
raise KeyError("no jwt secret key found")
return config.jwt_secret_key
def generate_key(user: User, timeout: datetime | timedelta = timedelta(minutes=60)) -> str:
if isinstance(timeout, timedelta):
exp = timeout + datetime.now(tz=timezone.utc)
else:
exp = timeout
payload = {
"exp": exp,
"iss": "mixonomer-api",
"sub": user.username
}
return jwt.encode(payload, get_jwt_secret_key(), algorithm="HS512")
def validate_key(key: str) -> dict:
try:
decoded = jwt.decode(key, get_jwt_secret_key(), algorithms=["HS512"], options={
"require": ["exp", "sub"]
})
return decoded
except Exception as e:
pass

View File

@ -19,3 +19,4 @@ class Config(Model):
"""Determines whether playlist and tag update operations are done by Cloud Tasks or Functions
"""
secret_key = TextField()
jwt_secret_key = TextField()

20
poetry.lock generated
View File

@ -462,6 +462,20 @@ python-versions = "*"
[package.dependencies]
pyasn1 = ">=0.4.6,<0.5.0"
[[package]]
name = "pyjwt"
version = "2.4.0"
description = "JSON Web Token implementation in Python"
category = "main"
optional = false
python-versions = ">=3.6"
[package.extras]
tests = ["coverage[toml] (==5.0.4)", "pytest (>=6.0.0,<7.0.0)"]
docs = ["zope.interface", "sphinx-rtd-theme", "sphinx"]
dev = ["pre-commit", "mypy", "coverage[toml] (==5.0.4)", "pytest (>=6.0.0,<7.0.0)", "cryptography (>=3.3.1)", "zope.interface", "sphinx-rtd-theme", "sphinx"]
crypto = ["cryptography (>=3.3.1)"]
[[package]]
name = "pylint"
version = "2.14.5"
@ -655,7 +669,7 @@ python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,>=2.7"
[metadata]
lock-version = "1.1"
python-versions = "~3.10"
content-hash = "e3a9a1b705fe06227a9e09263b2e84aa6ee1bc6e41096732f06715a109daca4e"
content-hash = "740e615c75f16d1d097c47b49a18fc3487be9541f257d5181d198693feb87380"
[metadata.files]
astroid = [
@ -958,6 +972,10 @@ pyasn1-modules = [
{file = "pyasn1_modules-0.2.8-py3.6.egg", hash = "sha256:cbac4bc38d117f2a49aeedec4407d23e8866ea4ac27ff2cf7fb3e5b570df19e0"},
{file = "pyasn1_modules-0.2.8-py3.7.egg", hash = "sha256:c29a5e5cc7a3f05926aff34e097e84f8589cd790ce0ed41b67aed6857b26aafd"},
]
pyjwt = [
{file = "PyJWT-2.4.0-py3-none-any.whl", hash = "sha256:72d1d253f32dbd4f5c88eaf1fdc62f3a19f676ccbadb9dbc5d07e951b2b26daf"},
{file = "PyJWT-2.4.0.tar.gz", hash = "sha256:d42908208c699b3b973cbeb01a969ba6a96c821eefb1c5bfe4c390c01d67abba"},
]
pylint = [
{file = "pylint-2.14.5-py3-none-any.whl", hash = "sha256:fabe30000de7d07636d2e82c9a518ad5ad7908590fe135ace169b44839c15f90"},
{file = "pylint-2.14.5.tar.gz", hash = "sha256:487ce2192eee48211269a0e976421f334cf94de1806ca9d0a99449adcdf0285e"},

View File

@ -21,6 +21,7 @@ google-cloud-logging = "^3.2.1"
google-cloud-pubsub = "^2.13.4"
google-cloud-tasks = "^2.10.0"
requests = "^2.28.1"
PyJWT = "^2.4.0"
spotframework = { git = "https://github.com/Sarsoo/spotframework.git" }
fmframework = { git = "https://github.com/Sarsoo/pyfmframework.git" }

33
tests/test_auth.py Normal file
View File

@ -0,0 +1,33 @@
from ast import Assert
from time import sleep
import unittest
from datetime import timedelta
from music.model.user import User
from music.auth.jwt_keys import generate_key, validate_key
class TestAuth(unittest.TestCase):
def test_encode_decode(self):
test_user = User.collection.filter('username', '==', "test").get()
key = generate_key(test_user, timedelta(minutes=10))
decoded = validate_key(key)
self.assertEqual(decoded["sub"], test_user.username)
def test_timeout(self):
test_user = User.collection.filter('username', '==', "test").get()
key = generate_key(test_user, timedelta(seconds=2))
decoded = validate_key(key)
self.assertIsNotNone(decoded)
sleep(2.5)
decoded = validate_key(key)
self.assertIsNone(decoded)