"""Functions for creating GCP Cloud Tasks for long running operatings """ import datetime import json import os import logging from google.cloud import tasks_v2 from google.protobuf import timestamp_pb2 from music.tasks.run_user_playlist import run_user_playlist from music.tasks.refresh_lastfm_stats import refresh_lastfm_track_stats from music.model.user import User from music.model.playlist import Playlist from music.model.tag import Tag tasker = tasks_v2.CloudTasksClient() task_path = tasker.queue_path('sarsooxyz', 'europe-west2', 'spotify-executions') logger = logging.getLogger(__name__) def update_all_user_playlists(): """Create user playlist refresh task for all users""" seconds_delay = 0 logger.info('running') for iter_user in User.collection.fetch(): if iter_user.spotify_linked and not iter_user.locked: task = { 'app_engine_http_request': { # Specify the type of request. 'http_method': 'POST', 'relative_uri': '/api/playlist/run/user/task', 'body': iter_user.username.encode() } } d = datetime.datetime.utcnow() + datetime.timedelta(seconds=seconds_delay) timestamp = timestamp_pb2.Timestamp() timestamp.FromDatetime(d) task['schedule_time'] = timestamp tasker.create_task(parent=task_path, task=task) seconds_delay += 30 def update_playlists(username: str): """Refresh all playlists for given user, environment dependent Args: username (str): Subject user's username """ user = User.collection.filter('username', '==', username.strip().lower()).get() if user is None: logger.error(f'user {username} not found') return playlists = Playlist.collection.parent(user.key).fetch() seconds_delay = 0 logger.info(f'running {username}') for iterate_playlist in playlists: if iterate_playlist.uri is not None: if os.environ.get('DEPLOY_DESTINATION', None) == 'PROD': run_user_playlist_task(username, iterate_playlist.name, seconds_delay) else: run_user_playlist(user, iterate_playlist) seconds_delay += 6 def run_user_playlist_task(username: str, playlist_name: str, delay: int = 0): """Create tasks for a users given playlist Args: username (str): Subject user's username playlist_name (str): Subject playlist name delay (int, optional): Seconds to delay execution by. Defaults to 0. """ task = { 'app_engine_http_request': { # Specify the type of request. 'http_method': 'POST', 'relative_uri': '/api/playlist/run/task', 'body': json.dumps({ 'username': username, 'name': playlist_name }).encode() } } if delay > 0: d = datetime.datetime.utcnow() + datetime.timedelta(seconds=delay) # Create Timestamp protobuf. timestamp = timestamp_pb2.Timestamp() timestamp.FromDatetime(d) # Add the timestamp to the tasks. task['schedule_time'] = timestamp tasker.create_task(parent=task_path, task=task) def refresh_all_user_playlist_stats(): """"Create user playlist stats refresh task for all users""" seconds_delay = 0 logger.info('running') for iter_user in User.collection.fetch(): if iter_user.spotify_linked and iter_user.lastfm_username and \ len(iter_user.lastfm_username) > 0 and not iter_user.locked: if os.environ.get('DEPLOY_DESTINATION', None) == 'PROD': refresh_user_stats_task(username=iter_user.username, delay=seconds_delay) else: refresh_user_playlist_stats(username=iter_user.username) seconds_delay += 2400 else: logger.debug(f'skipping {iter_user.username}') def refresh_user_playlist_stats(username: str): """Refresh all playlist stats for given user, environment dependent Args: username (str): Subject user's username """ user = User.collection.filter('username', '==', username.strip().lower()).get() if user is None: logger.error(f'user {username} not found') return playlists = Playlist.collection.parent(user.key).fetch() seconds_delay = 0 logger.info(f'running stats for {username}') if user.lastfm_username and len(user.lastfm_username) > 0: for playlist in playlists: if playlist.uri is not None: if os.environ.get('DEPLOY_DESTINATION', None) == 'PROD': refresh_playlist_task(username, playlist.name, seconds_delay) else: refresh_lastfm_track_stats(username, playlist.name) seconds_delay += 1200 else: logger.error('no last.fm username') def refresh_user_stats_task(username: str, delay: int = 0): """Create user playlist stats refresh task Args: username (str): Subject user's username delay (int, optional): Seconds to delay execution by. Defaults to 0. """ task = { 'app_engine_http_request': { # Specify the type of request. 'http_method': 'POST', 'relative_uri': '/api/spotfm/playlist/refresh/user/task', 'body': username.encode() } } if delay > 0: d = datetime.datetime.utcnow() + datetime.timedelta(seconds=delay) timestamp = timestamp_pb2.Timestamp() timestamp.FromDatetime(d) task['schedule_time'] = timestamp tasker.create_task(parent=task_path, task=task) def refresh_playlist_task(username: str, playlist_name: str, delay: int = 0): """Create user playlist stats refresh tasks Args: username (str): Subject user's username playlist_name (str): Subject playlist name delay (int, optional): Seconds to delay execution by. Defaults to 0. """ track_task = { 'app_engine_http_request': { # Specify the type of request. 'http_method': 'POST', 'relative_uri': '/api/spotfm/playlist/refresh/task/track', 'body': json.dumps({ 'username': username, 'name': playlist_name }).encode() } } if delay > 0: d = datetime.datetime.utcnow() + datetime.timedelta(seconds=delay) timestamp = timestamp_pb2.Timestamp() timestamp.FromDatetime(d) track_task['schedule_time'] = timestamp album_task = { 'app_engine_http_request': { # Specify the type of request. 'http_method': 'POST', 'relative_uri': '/api/spotfm/playlist/refresh/task/album', 'body': json.dumps({ 'username': username, 'name': playlist_name }).encode() } } d = datetime.datetime.utcnow() + datetime.timedelta(seconds=delay + 180) timestamp = timestamp_pb2.Timestamp() timestamp.FromDatetime(d) album_task['schedule_time'] = timestamp artist_task = { 'app_engine_http_request': { # Specify the type of request. 'http_method': 'POST', 'relative_uri': '/api/spotfm/playlist/refresh/task/artist', 'body': json.dumps({ 'username': username, 'name': playlist_name }).encode() } } d = datetime.datetime.utcnow() + datetime.timedelta(seconds=delay + 360) timestamp = timestamp_pb2.Timestamp() timestamp.FromDatetime(d) artist_task['schedule_time'] = timestamp tasker.create_task(parent=task_path, task=track_task) tasker.create_task(parent=task_path, task=album_task) tasker.create_task(parent=task_path, task=artist_task) def update_all_user_tags(): """Create user tag refresh task for all users""" seconds_delay = 0 logger.info('running') for iter_user in User.collection.fetch(): if iter_user.lastfm_username and len(iter_user.lastfm_username) > 0 and not iter_user.locked: for tag in Tag.collection.parent(iter_user.key).fetch(): task = { 'app_engine_http_request': { # Specify the type of request. 'http_method': 'POST', 'relative_uri': '/api/tag/update/task', 'body': json.dumps({ 'username': iter_user.username, 'tag_id': tag.tag_id }).encode() } } d = datetime.datetime.utcnow() + datetime.timedelta(seconds=seconds_delay) timestamp = timestamp_pb2.Timestamp() timestamp.FromDatetime(d) task['schedule_time'] = timestamp tasker.create_task(parent=task_path, task=task) seconds_delay += 10