migrated to asynchronous tasks for staggering user and users playlist execution

This commit is contained in:
aj 2019-08-08 12:25:53 +01:00
parent 61ac85d41f
commit 7272379a9c
2 changed files with 93 additions and 9 deletions

View File

@ -3,11 +3,12 @@ certifi==2019.6.16
chardet==3.0.4 chardet==3.0.4
Click==7.0 Click==7.0
Flask==1.1.1 Flask==1.1.1
google-api-core==1.14.0 google-api-core==1.14.2
google-auth==1.6.3 google-auth==1.6.3
google-cloud-core==1.0.2 google-cloud-core==1.0.3
google-cloud-firestore==1.3.0 google-cloud-firestore==1.4.0
google-cloud-pubsub==0.45.0 google-cloud-pubsub==0.45.0
google-cloud-tasks==1.2.0
googleapis-common-protos==1.6.0 googleapis-common-protos==1.6.0
grpc-google-iam-v1==0.12.3 grpc-google-iam-v1==0.12.3
grpcio==1.22.0 grpcio==1.22.0
@ -15,10 +16,10 @@ idna==2.8
itsdangerous==1.1.0 itsdangerous==1.1.0
Jinja2==2.10.1 Jinja2==2.10.1
MarkupSafe==1.1.1 MarkupSafe==1.1.1
protobuf==3.9.0 protobuf==3.9.1
pyasn1==0.4.5 pyasn1==0.4.6
pyasn1-modules==0.2.5 pyasn1-modules==0.2.6
pytz==2019.1 pytz==2019.2
requests==2.22.0 requests==2.22.0
rsa==4.0 rsa==4.0
six==1.12.0 six==1.12.0

View File

@ -1,6 +1,12 @@
from flask import Blueprint, session, request, jsonify from flask import Blueprint, session, request, jsonify
import datetime
import json
from google.cloud import firestore from google.cloud import firestore
from google.cloud import pubsub_v1 from google.cloud import pubsub_v1
from google.cloud import tasks_v2
from google.protobuf import timestamp_pb2
from werkzeug.security import check_password_hash, generate_password_hash from werkzeug.security import check_password_hash, generate_password_hash
import spotify.api.database as database import spotify.api.database as database
@ -8,6 +14,9 @@ import spotify.api.database as database
blueprint = Blueprint('api', __name__) blueprint = Blueprint('api', __name__)
db = firestore.Client() db = firestore.Client()
publisher = pubsub_v1.PublisherClient() publisher = pubsub_v1.PublisherClient()
tasker = tasks_v2.CloudTasksClient()
task_path = tasker.queue_path('sarsooxyz', 'europe-west2', 'spotify-executions')
run_playlist_topic_path = publisher.topic_path('sarsooxyz', 'run_user_playlist') run_playlist_topic_path = publisher.topic_path('sarsooxyz', 'run_user_playlist')
@ -309,6 +318,19 @@ def run_playlist():
return jsonify({'error': 'not logged in'}), 401 return jsonify({'error': 'not logged in'}), 401
@blueprint.route('/playlist/run/task', methods=['POST'])
def run_playlist_task():
if request.headers.get('X-AppEngine-QueueName', None):
payload = request.get_data(as_text=True)
if payload:
payload = json.loads(payload)
execute_playlist(payload['username'], payload['name'])
return jsonify({'message': 'executed playlist', 'status': 'success'}), 200
else:
return jsonify({'error': 'unauthorized'}), 401
@blueprint.route('/playlist/run/user', methods=['GET']) @blueprint.route('/playlist/run/user', methods=['GET'])
def run_user(): def run_user():
@ -327,6 +349,18 @@ def run_user():
return jsonify({'error': 'not logged in'}), 401 return jsonify({'error': 'not logged in'}), 401
@blueprint.route('/playlist/run/user/task', methods=['POST'])
def run_user_task():
if request.headers.get('X-AppEngine-QueueName', None):
payload = request.get_data(as_text=True)
if payload:
execute_user(payload)
return jsonify({'message': 'executed user', 'status': 'success'}), 200
else:
return jsonify({'error': 'unauthorized'}), 401
@blueprint.route('/playlist/run/users', methods=['GET']) @blueprint.route('/playlist/run/users', methods=['GET'])
def run_users(): def run_users():
@ -355,10 +389,32 @@ def run_users_cron():
def execute_all_users(): def execute_all_users():
seconds_delay = 0
for iter_user in [i.to_dict() for i in db.collection(u'spotify_users').stream()]: for iter_user in [i.to_dict() for i in db.collection(u'spotify_users').stream()]:
if iter_user['spotify_linked'] and not iter_user['locked']: if iter_user['spotify_linked'] and not iter_user['locked']:
execute_user(iter_user['username'])
task = {
'app_engine_http_request': { # Specify the type of request.
'http_method': 'POST',
'relative_uri': '/api/playlist/run/user/task',
'body': iter_user['username'].encode()
}
}
d = datetime.datetime.utcnow() + datetime.timedelta(seconds=seconds_delay)
# Create Timestamp protobuf.
timestamp = timestamp_pb2.Timestamp()
timestamp.FromDatetime(d)
# Add the timestamp to the tasks.
task['schedule_time'] = timestamp
tasker.create_task(task_path, task)
seconds_delay += 30
def execute_user(username): def execute_user(username):
@ -366,10 +422,37 @@ def execute_user(username):
playlists = [i.to_dict() for i in playlists = [i.to_dict() for i in
database.get_user_playlists_collection(database.get_user_query_stream(username)[0].id).stream()] database.get_user_playlists_collection(database.get_user_query_stream(username)[0].id).stream()]
seconds_delay = 0
for iterate_playlist in playlists: for iterate_playlist in playlists:
if len(iterate_playlist['parts']) > 0 or len(iterate_playlist['playlist_references']) > 0: if len(iterate_playlist['parts']) > 0 or len(iterate_playlist['playlist_references']) > 0:
if iterate_playlist.get('playlist_id', None): if iterate_playlist.get('playlist_id', None):
execute_playlist(username, iterate_playlist['name'])
task = {
'app_engine_http_request': { # Specify the type of request.
'http_method': 'POST',
'relative_uri': '/api/playlist/run/task',
'body': json.dumps({
'username': username,
'name': iterate_playlist['name']
}).encode()
}
}
d = datetime.datetime.utcnow() + datetime.timedelta(seconds=seconds_delay)
# Create Timestamp protobuf.
timestamp = timestamp_pb2.Timestamp()
timestamp.FromDatetime(d)
# Add the timestamp to the tasks.
task['schedule_time'] = timestamp
tasker.create_task(task_path, task)
seconds_delay += 10
# execute_playlist(username, iterate_playlist['name'])
def execute_playlist(username, name): def execute_playlist(username, name):