Mixonomer/music/cloud/tasks.py

292 lines
8.9 KiB
Python

"""Functions for creating GCP Cloud Tasks for long running operatings
"""
import datetime
import json
import os
import logging
from google.cloud import tasks_v2
from google.protobuf import timestamp_pb2
from music.tasks.run_user_playlist import run_user_playlist
from music.tasks.refresh_lastfm_stats import refresh_lastfm_track_stats
from music.model.user import User
from music.model.playlist import Playlist
from music.model.tag import Tag
tasker = tasks_v2.CloudTasksClient()
task_path = tasker.queue_path(os.environ['GOOGLE_CLOUD_PROJECT'], 'europe-west2', 'spotify-executions')
logger = logging.getLogger(__name__)
def update_all_user_playlists():
"""Create user playlist refresh task for all users"""
seconds_delay = 0
logger.info('running')
for iter_user in User.collection.fetch():
if iter_user.spotify_linked and not iter_user.locked:
task = {
'app_engine_http_request': { # Specify the type of request.
'http_method': 'POST',
'relative_uri': '/api/playlist/run/user/task',
'body': iter_user.username.encode()
}
}
d = datetime.datetime.utcnow() + datetime.timedelta(seconds=seconds_delay)
timestamp = timestamp_pb2.Timestamp()
timestamp.FromDatetime(d)
task['schedule_time'] = timestamp
tasker.create_task(parent=task_path, task=task)
seconds_delay += 30
def update_playlists(username: str):
"""Refresh all playlists for given user, environment dependent
Args:
username (str): Subject user's username
"""
user = User.collection.filter('username', '==', username.strip().lower()).get()
if user is None:
logger.error(f'user {username} not found')
return
playlists = Playlist.collection.parent(user.key).fetch()
seconds_delay = 0
logger.info(f'running {username}')
for iterate_playlist in playlists:
if iterate_playlist.uri is not None:
if os.environ.get('DEPLOY_DESTINATION', None) == 'PROD':
run_user_playlist_task(username, iterate_playlist.name, seconds_delay)
else:
run_user_playlist(user, iterate_playlist)
seconds_delay += 6
def run_user_playlist_task(username: str, playlist_name: str, delay: int = 0):
"""Create tasks for a users given playlist
Args:
username (str): Subject user's username
playlist_name (str): Subject playlist name
delay (int, optional): Seconds to delay execution by. Defaults to 0.
"""
task = {
'app_engine_http_request': { # Specify the type of request.
'http_method': 'POST',
'relative_uri': '/api/playlist/run/task',
'body': json.dumps({
'username': username,
'name': playlist_name
}).encode()
}
}
if delay > 0:
d = datetime.datetime.utcnow() + datetime.timedelta(seconds=delay)
# Create Timestamp protobuf.
timestamp = timestamp_pb2.Timestamp()
timestamp.FromDatetime(d)
# Add the timestamp to the tasks.
task['schedule_time'] = timestamp
tasker.create_task(parent=task_path, task=task)
def refresh_all_user_playlist_stats():
""""Create user playlist stats refresh task for all users"""
seconds_delay = 0
logger.info('running')
for iter_user in User.collection.fetch():
if iter_user.spotify_linked and iter_user.lastfm_username and \
len(iter_user.lastfm_username) > 0 and not iter_user.locked:
if os.environ.get('DEPLOY_DESTINATION', None) == 'PROD':
refresh_user_stats_task(username=iter_user.username, delay=seconds_delay)
else:
refresh_user_playlist_stats(username=iter_user.username)
seconds_delay += 2400
else:
logger.debug(f'skipping {iter_user.username}')
def refresh_user_playlist_stats(username: str):
"""Refresh all playlist stats for given user, environment dependent
Args:
username (str): Subject user's username
"""
user = User.collection.filter('username', '==', username.strip().lower()).get()
if user is None:
logger.error(f'user {username} not found')
return
playlists = Playlist.collection.parent(user.key).fetch()
seconds_delay = 0
logger.info(f'running stats for {username}')
if user.lastfm_username and len(user.lastfm_username) > 0:
for playlist in playlists:
if playlist.uri is not None:
if os.environ.get('DEPLOY_DESTINATION', None) == 'PROD':
refresh_playlist_task(username, playlist.name, seconds_delay)
else:
refresh_lastfm_track_stats(username, playlist.name)
seconds_delay += 1200
else:
logger.error('no last.fm username')
def refresh_user_stats_task(username: str, delay: int = 0):
"""Create user playlist stats refresh task
Args:
username (str): Subject user's username
delay (int, optional): Seconds to delay execution by. Defaults to 0.
"""
task = {
'app_engine_http_request': { # Specify the type of request.
'http_method': 'POST',
'relative_uri': '/api/spotfm/playlist/refresh/user/task',
'body': username.encode()
}
}
if delay > 0:
d = datetime.datetime.utcnow() + datetime.timedelta(seconds=delay)
timestamp = timestamp_pb2.Timestamp()
timestamp.FromDatetime(d)
task['schedule_time'] = timestamp
tasker.create_task(parent=task_path, task=task)
def refresh_playlist_task(username: str, playlist_name: str, delay: int = 0):
"""Create user playlist stats refresh tasks
Args:
username (str): Subject user's username
playlist_name (str): Subject playlist name
delay (int, optional): Seconds to delay execution by. Defaults to 0.
"""
track_task = {
'app_engine_http_request': { # Specify the type of request.
'http_method': 'POST',
'relative_uri': '/api/spotfm/playlist/refresh/task/track',
'body': json.dumps({
'username': username,
'name': playlist_name
}).encode()
}
}
if delay > 0:
d = datetime.datetime.utcnow() + datetime.timedelta(seconds=delay)
timestamp = timestamp_pb2.Timestamp()
timestamp.FromDatetime(d)
track_task['schedule_time'] = timestamp
album_task = {
'app_engine_http_request': { # Specify the type of request.
'http_method': 'POST',
'relative_uri': '/api/spotfm/playlist/refresh/task/album',
'body': json.dumps({
'username': username,
'name': playlist_name
}).encode()
}
}
d = datetime.datetime.utcnow() + datetime.timedelta(seconds=delay + 180)
timestamp = timestamp_pb2.Timestamp()
timestamp.FromDatetime(d)
album_task['schedule_time'] = timestamp
artist_task = {
'app_engine_http_request': { # Specify the type of request.
'http_method': 'POST',
'relative_uri': '/api/spotfm/playlist/refresh/task/artist',
'body': json.dumps({
'username': username,
'name': playlist_name
}).encode()
}
}
d = datetime.datetime.utcnow() + datetime.timedelta(seconds=delay + 360)
timestamp = timestamp_pb2.Timestamp()
timestamp.FromDatetime(d)
artist_task['schedule_time'] = timestamp
tasker.create_task(parent=task_path, task=track_task)
tasker.create_task(parent=task_path, task=album_task)
tasker.create_task(parent=task_path, task=artist_task)
def update_all_user_tags():
"""Create user tag refresh task for all users"""
seconds_delay = 0
logger.info('running')
for iter_user in User.collection.fetch():
if iter_user.lastfm_username and len(iter_user.lastfm_username) > 0 and not iter_user.locked:
for tag in Tag.collection.parent(iter_user.key).fetch():
task = {
'app_engine_http_request': { # Specify the type of request.
'http_method': 'POST',
'relative_uri': '/api/tag/update/task',
'body': json.dumps({
'username': iter_user.username,
'tag_id': tag.tag_id
}).encode()
}
}
d = datetime.datetime.utcnow() + datetime.timedelta(seconds=seconds_delay)
timestamp = timestamp_pb2.Timestamp()
timestamp.FromDatetime(d)
task['schedule_time'] = timestamp
tasker.create_task(parent=task_path, task=task)
seconds_delay += 10