From 7272379a9c7a8b49671b95c353a15818aee10bbf Mon Sep 17 00:00:00 2001 From: aj Date: Thu, 8 Aug 2019 12:25:53 +0100 Subject: [PATCH] migrated to asynchronous tasks for staggering user and users playlist execution --- requirements.txt | 15 ++++---- spotify/api/api.py | 87 ++++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 93 insertions(+), 9 deletions(-) diff --git a/requirements.txt b/requirements.txt index c39c323..a003e55 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,11 +3,12 @@ certifi==2019.6.16 chardet==3.0.4 Click==7.0 Flask==1.1.1 -google-api-core==1.14.0 +google-api-core==1.14.2 google-auth==1.6.3 -google-cloud-core==1.0.2 -google-cloud-firestore==1.3.0 +google-cloud-core==1.0.3 +google-cloud-firestore==1.4.0 google-cloud-pubsub==0.45.0 +google-cloud-tasks==1.2.0 googleapis-common-protos==1.6.0 grpc-google-iam-v1==0.12.3 grpcio==1.22.0 @@ -15,10 +16,10 @@ idna==2.8 itsdangerous==1.1.0 Jinja2==2.10.1 MarkupSafe==1.1.1 -protobuf==3.9.0 -pyasn1==0.4.5 -pyasn1-modules==0.2.5 -pytz==2019.1 +protobuf==3.9.1 +pyasn1==0.4.6 +pyasn1-modules==0.2.6 +pytz==2019.2 requests==2.22.0 rsa==4.0 six==1.12.0 diff --git a/spotify/api/api.py b/spotify/api/api.py index a20de09..149fae9 100644 --- a/spotify/api/api.py +++ b/spotify/api/api.py @@ -1,6 +1,12 @@ from flask import Blueprint, session, request, jsonify + +import datetime +import json + from google.cloud import firestore from google.cloud import pubsub_v1 +from google.cloud import tasks_v2 +from google.protobuf import timestamp_pb2 from werkzeug.security import check_password_hash, generate_password_hash import spotify.api.database as database @@ -8,6 +14,9 @@ import spotify.api.database as database blueprint = Blueprint('api', __name__) db = firestore.Client() publisher = pubsub_v1.PublisherClient() +tasker = tasks_v2.CloudTasksClient() + +task_path = tasker.queue_path('sarsooxyz', 'europe-west2', 'spotify-executions') run_playlist_topic_path = publisher.topic_path('sarsooxyz', 'run_user_playlist') @@ -309,6 +318,19 @@ def run_playlist(): return jsonify({'error': 'not logged in'}), 401 +@blueprint.route('/playlist/run/task', methods=['POST']) +def run_playlist_task(): + + if request.headers.get('X-AppEngine-QueueName', None): + payload = request.get_data(as_text=True) + if payload: + payload = json.loads(payload) + execute_playlist(payload['username'], payload['name']) + return jsonify({'message': 'executed playlist', 'status': 'success'}), 200 + else: + return jsonify({'error': 'unauthorized'}), 401 + + @blueprint.route('/playlist/run/user', methods=['GET']) def run_user(): @@ -327,6 +349,18 @@ def run_user(): return jsonify({'error': 'not logged in'}), 401 +@blueprint.route('/playlist/run/user/task', methods=['POST']) +def run_user_task(): + + if request.headers.get('X-AppEngine-QueueName', None): + payload = request.get_data(as_text=True) + if payload: + execute_user(payload) + return jsonify({'message': 'executed user', 'status': 'success'}), 200 + else: + return jsonify({'error': 'unauthorized'}), 401 + + @blueprint.route('/playlist/run/users', methods=['GET']) def run_users(): @@ -355,10 +389,32 @@ def run_users_cron(): def execute_all_users(): + seconds_delay = 0 + for iter_user in [i.to_dict() for i in db.collection(u'spotify_users').stream()]: if iter_user['spotify_linked'] and not iter_user['locked']: - execute_user(iter_user['username']) + + task = { + 'app_engine_http_request': { # Specify the type of request. + 'http_method': 'POST', + 'relative_uri': '/api/playlist/run/user/task', + 'body': iter_user['username'].encode() + } + } + + d = datetime.datetime.utcnow() + datetime.timedelta(seconds=seconds_delay) + + # Create Timestamp protobuf. + timestamp = timestamp_pb2.Timestamp() + timestamp.FromDatetime(d) + + # Add the timestamp to the tasks. + task['schedule_time'] = timestamp + + tasker.create_task(task_path, task) + + seconds_delay += 30 def execute_user(username): @@ -366,10 +422,37 @@ def execute_user(username): playlists = [i.to_dict() for i in database.get_user_playlists_collection(database.get_user_query_stream(username)[0].id).stream()] + seconds_delay = 0 + for iterate_playlist in playlists: if len(iterate_playlist['parts']) > 0 or len(iterate_playlist['playlist_references']) > 0: if iterate_playlist.get('playlist_id', None): - execute_playlist(username, iterate_playlist['name']) + + task = { + 'app_engine_http_request': { # Specify the type of request. + 'http_method': 'POST', + 'relative_uri': '/api/playlist/run/task', + 'body': json.dumps({ + 'username': username, + 'name': iterate_playlist['name'] + }).encode() + } + } + + d = datetime.datetime.utcnow() + datetime.timedelta(seconds=seconds_delay) + + # Create Timestamp protobuf. + timestamp = timestamp_pb2.Timestamp() + timestamp.FromDatetime(d) + + # Add the timestamp to the tasks. + task['schedule_time'] = timestamp + + tasker.create_task(task_path, task) + + seconds_delay += 10 + + # execute_playlist(username, iterate_playlist['name']) def execute_playlist(username, name):