added full cron stack of endpoints for refreshing stats
This commit is contained in:
parent
b54ef10541
commit
2ebad1fad6
@ -2,19 +2,23 @@ from flask import Blueprint, jsonify, request
|
|||||||
import logging
|
import logging
|
||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
|
import datetime
|
||||||
|
|
||||||
from music.api.decorators import login_or_basic_auth, lastfm_username_required, spotify_link_required, cloud_task
|
from music.api.decorators import admin_required, login_or_basic_auth, lastfm_username_required, spotify_link_required, cloud_task, gae_cron
|
||||||
import music.db.database as database
|
import music.db.database as database
|
||||||
from music.tasks.refresh_lastfm_stats import refresh_lastfm_stats
|
from music.tasks.refresh_lastfm_stats import refresh_lastfm_stats
|
||||||
|
|
||||||
from spotfm.maths.counter import Counter
|
from spotfm.maths.counter import Counter
|
||||||
from spotframework.model.uri import Uri
|
from spotframework.model.uri import Uri
|
||||||
|
|
||||||
|
from google.cloud import firestore
|
||||||
from google.cloud import tasks_v2
|
from google.cloud import tasks_v2
|
||||||
|
from google.protobuf import timestamp_pb2
|
||||||
|
|
||||||
blueprint = Blueprint('spotfm-api', __name__)
|
blueprint = Blueprint('spotfm-api', __name__)
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
db = firestore.Client()
|
||||||
tasker = tasks_v2.CloudTasksClient()
|
tasker = tasks_v2.CloudTasksClient()
|
||||||
task_path = tasker.queue_path('sarsooxyz', 'europe-west2', 'spotify-executions')
|
task_path = tasker.queue_path('sarsooxyz', 'europe-west2', 'spotify-executions')
|
||||||
|
|
||||||
@ -102,7 +106,116 @@ def run_playlist_task():
|
|||||||
return jsonify({'message': 'executed playlist', 'status': 'success'}), 200
|
return jsonify({'message': 'executed playlist', 'status': 'success'}), 200
|
||||||
|
|
||||||
|
|
||||||
def create_refresh_playlist_task(username, playlist_name):
|
@blueprint.route('/playlist/refresh/users', methods=['GET'])
|
||||||
|
@login_or_basic_auth
|
||||||
|
@admin_required
|
||||||
|
def run_users(username=None):
|
||||||
|
execute_all_users()
|
||||||
|
return jsonify({'message': 'executed all users', 'status': 'success'}), 200
|
||||||
|
|
||||||
|
|
||||||
|
@blueprint.route('/playlist/refresh/users/cron', methods=['GET'])
|
||||||
|
@gae_cron
|
||||||
|
def run_users_task():
|
||||||
|
execute_all_users()
|
||||||
|
return jsonify({'status': 'success'}), 200
|
||||||
|
|
||||||
|
|
||||||
|
@blueprint.route('/playlist/refresh/user', methods=['GET'])
|
||||||
|
@login_or_basic_auth
|
||||||
|
def run_user(username=None):
|
||||||
|
|
||||||
|
if database.get_user_doc_ref(username).get().to_dict()['type'] == 'admin':
|
||||||
|
user_name = request.args.get('username', username)
|
||||||
|
else:
|
||||||
|
user_name = username
|
||||||
|
|
||||||
|
execute_user(user_name)
|
||||||
|
|
||||||
|
return jsonify({'message': 'executed user', 'status': 'success'}), 200
|
||||||
|
|
||||||
|
|
||||||
|
@blueprint.route('/playlist/refresh/user/task', methods=['POST'])
|
||||||
|
@cloud_task
|
||||||
|
def run_user_task():
|
||||||
|
|
||||||
|
payload = request.get_data(as_text=True)
|
||||||
|
if payload:
|
||||||
|
execute_user(payload)
|
||||||
|
return jsonify({'message': 'executed user', 'status': 'success'}), 200
|
||||||
|
|
||||||
|
|
||||||
|
def execute_all_users():
|
||||||
|
|
||||||
|
seconds_delay = 0
|
||||||
|
logger.info('running')
|
||||||
|
|
||||||
|
for iter_user in [i.to_dict() for i in db.collection(u'spotify_users').stream()]:
|
||||||
|
|
||||||
|
if iter_user.get('spotify_linked') \
|
||||||
|
and iter_user.get('lastfm_username') \
|
||||||
|
and len(iter_user.get('lastfm_username')) > 0 \
|
||||||
|
and not iter_user['locked']:
|
||||||
|
|
||||||
|
if os.environ.get('DEPLOY_DESTINATION', None) == 'PROD':
|
||||||
|
create_refresh_user_task(username=iter_user.get('username'), delay=seconds_delay)
|
||||||
|
else:
|
||||||
|
execute_user(username=iter_user.get('username'))
|
||||||
|
|
||||||
|
seconds_delay += 2400
|
||||||
|
|
||||||
|
else:
|
||||||
|
logger.debug(f'skipping {iter_user.get("username")}')
|
||||||
|
|
||||||
|
|
||||||
|
def execute_user(username):
|
||||||
|
|
||||||
|
playlists = [i.to_dict() for i in
|
||||||
|
database.get_user_playlists_collection(database.get_user_query_stream(username)[0].id).stream()]
|
||||||
|
|
||||||
|
seconds_delay = 0
|
||||||
|
logger.info(f'running {username}')
|
||||||
|
|
||||||
|
user = database.get_user_doc_ref(username).get().to_dict()
|
||||||
|
|
||||||
|
if user.get('lastfm_username') and len(user.get('lastfm_username')) > 0:
|
||||||
|
for iterate_playlist in playlists:
|
||||||
|
if iterate_playlist.get('uri', None):
|
||||||
|
|
||||||
|
if os.environ.get('DEPLOY_DESTINATION', None) == 'PROD':
|
||||||
|
create_refresh_playlist_task(username, iterate_playlist['name'], seconds_delay)
|
||||||
|
else:
|
||||||
|
refresh_lastfm_stats(username, iterate_playlist['name'])
|
||||||
|
|
||||||
|
seconds_delay += 1200
|
||||||
|
else:
|
||||||
|
logger.error('no last.fm username')
|
||||||
|
|
||||||
|
|
||||||
|
def create_refresh_user_task(username, delay=0):
|
||||||
|
|
||||||
|
task = {
|
||||||
|
'app_engine_http_request': { # Specify the type of request.
|
||||||
|
'http_method': 'POST',
|
||||||
|
'relative_uri': '/api/spotfm/playlist/refresh/user/task',
|
||||||
|
'body': username.encode()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if delay > 0:
|
||||||
|
d = datetime.datetime.utcnow() + datetime.timedelta(seconds=delay)
|
||||||
|
|
||||||
|
# Create Timestamp protobuf.
|
||||||
|
timestamp = timestamp_pb2.Timestamp()
|
||||||
|
timestamp.FromDatetime(d)
|
||||||
|
|
||||||
|
# Add the timestamp to the tasks.
|
||||||
|
task['schedule_time'] = timestamp
|
||||||
|
|
||||||
|
tasker.create_task(task_path, task)
|
||||||
|
|
||||||
|
|
||||||
|
def create_refresh_playlist_task(username, playlist_name, delay=0):
|
||||||
|
|
||||||
task = {
|
task = {
|
||||||
'app_engine_http_request': { # Specify the type of request.
|
'app_engine_http_request': { # Specify the type of request.
|
||||||
@ -115,4 +228,15 @@ def create_refresh_playlist_task(username, playlist_name):
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if delay > 0:
|
||||||
|
|
||||||
|
d = datetime.datetime.utcnow() + datetime.timedelta(seconds=delay)
|
||||||
|
|
||||||
|
# Create Timestamp protobuf.
|
||||||
|
timestamp = timestamp_pb2.Timestamp()
|
||||||
|
timestamp.FromDatetime(d)
|
||||||
|
|
||||||
|
# Add the timestamp to the tasks.
|
||||||
|
task['schedule_time'] = timestamp
|
||||||
|
|
||||||
tasker.create_task(task_path, task)
|
tasker.create_task(task_path, task)
|
||||||
|
Loading…
Reference in New Issue
Block a user