breaking down run user playlist task

This commit is contained in:
Andy Pack 2023-10-13 22:28:19 +01:00
parent 75c26e78eb
commit 2ac7b43f5c
Signed by: sarsoo
GPG Key ID: A55BA3536A5E0ED7

View File

@ -1,6 +1,7 @@
import datetime import datetime
import logging import logging
import random import random
from typing import List
import spotframework.util.monthstrings as monthstrings import spotframework.util.monthstrings as monthstrings
from spotframework.model.uri import Uri from spotframework.model.uri import Uri
@ -24,6 +25,27 @@ from music.notif.notifier import notify_user_playlist_update
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def get_user_and_name(user):
if isinstance(user, str):
username = user
user = User.collection.filter('username', '==', username.strip().lower()).get()
return user, username
else:
return user, user.username
def get_playlist_and_name(playlist, user: User):
if isinstance(playlist, str):
playlist_name = playlist
playlist = user.get_playlist(playlist_name)
return playlist, playlist_name
else:
return playlist, playlist.name
def run_user_playlist(user: User, playlist: Playlist, spotnet: SpotNetwork = None, fmnet: Network = None) -> None: def run_user_playlist(user: User, playlist: Playlist, spotnet: SpotNetwork = None, fmnet: Network = None) -> None:
"""Generate and upadate a user's smart playlist """Generate and upadate a user's smart playlist
@ -46,22 +68,13 @@ def run_user_playlist(user: User, playlist: Playlist, spotnet: SpotNetwork = Non
# PRE-RUN CHECKS # PRE-RUN CHECKS
if isinstance(user, str): user, username = get_user_and_name(user)
username = user
user = User.collection.filter('username', '==', username.strip().lower()).get()
else:
username = user.username
if user is None: if user is None:
logger.error(f'user {username} not found') logger.error(f'user {username} not found')
raise NameError(f'User {username} not found') raise NameError(f'User {username} not found')
if isinstance(playlist, str): playlist, playlist_name = get_playlist_and_name(playlist, user)
playlist_name = playlist
playlist = user.get_playlist(playlist_name)
else:
playlist_name = playlist.name
if playlist.uri is None: if playlist.uri is None:
logger.critical(f'no playlist id to populate {username} / {playlist_name}') logger.critical(f'no playlist id to populate {username} / {playlist_name}')
@ -78,18 +91,36 @@ def run_user_playlist(user: User, playlist: Playlist, spotnet: SpotNetwork = Non
logger.error(f'no spotify network returned for {username} / {playlist_name}') logger.error(f'no spotify network returned for {username} / {playlist_name}')
raise NameError(f'No Spotify network returned ({username} / {playlist_name})') raise NameError(f'No Spotify network returned ({username} / {playlist_name})')
try:
if not playlist.include_spotify_owned:
user_playlists = {i.name: i.uri for i in spotnet.playlists() if 'spotify' not in i.owner.display_name.lower()}
else:
user_playlists = {i.name: i.uri for i in spotnet.playlists()}
except SpotifyNetworkException as e:
logger.exception(f'error occured while retrieving playlists {username} / {playlist_name}')
raise e
part_generator = PartGenerator(user=user) part_generator = PartGenerator(user=user)
part_names = part_generator.get_recursive_parts(playlist.name) part_names = part_generator.get_recursive_parts(playlist.name)
playlist_tracks = load_playlist_tracks(spotnet, playlist, part_names, username)
playlist_tracks = do_playlist_type_processing(spotnet, playlist, user, playlist_tracks)
playlist_tracks = sort_tracks(playlist, playlist_tracks)
playlist_tracks += get_recommendations(spotnet, playlist, username, playlist_tracks)
playlist_tracks = deduplicate_by_name(playlist_tracks)
execute_playlist(spotnet, playlist, part_names, username, playlist_tracks)
playlist.last_updated = datetime.datetime.utcnow()
playlist.update()
notify_user_playlist_update(user=user, playlist=playlist)
def load_user_playlists(spotnet: SpotNetwork, playlist: Playlist, username: str):
try:
if not playlist.include_spotify_owned:
return {i.name: i.uri
for i in spotnet.playlists()
if 'spotify' not in i.owner.display_name.lower()}
else:
return {i.name: i.uri
for i in spotnet.playlists()}
except SpotifyNetworkException as e:
logger.exception(f'error occured while retrieving playlists {username} / {playlist.name}')
raise e
def load_playlist_tracks(spotnet: SpotNetwork, playlist: Playlist, part_names: List[str], username: str):
playlist_tracks = [] playlist_tracks = []
if playlist.add_last_month: if playlist.add_last_month:
@ -97,6 +128,8 @@ def run_user_playlist(user: User, playlist: Playlist, spotnet: SpotNetwork = Non
if playlist.add_this_month: if playlist.add_this_month:
part_names.append(monthstrings.get_this_month()) part_names.append(monthstrings.get_this_month())
user_playlists = load_user_playlists(spotnet, playlist, username)
# LOAD PLAYLIST TRACKS # LOAD PLAYLIST TRACKS
for part_name in part_names: for part_name in part_names:
try: # attempt to cast to uri try: # attempt to cast to uri
@ -106,9 +139,9 @@ def run_user_playlist(user: User, playlist: Playlist, spotnet: SpotNetwork = Non
except ValueError: # is a playlist name except ValueError: # is a playlist name
uri = user_playlists.get(part_name) uri = user_playlists.get(part_name)
if uri is None: if uri is None:
logger.warning(f'playlist {part_name} not found {username} / {playlist_name}') logger.warning(f'playlist {part_name} not found {username} / {playlist.name}')
continue continue
log_name = part_name log_name = part_name
try: try:
@ -116,85 +149,105 @@ def run_user_playlist(user: User, playlist: Playlist, spotnet: SpotNetwork = Non
if _tracks and len(_tracks) > 0: if _tracks and len(_tracks) > 0:
playlist_tracks += _tracks playlist_tracks += _tracks
else: else:
logger.warning(f'no tracks returned for {log_name} {username} / {playlist_name}') logger.warning(f'no tracks returned for {log_name} {username} / {playlist.name}')
except SpotifyNetworkException: except SpotifyNetworkException:
logger.exception(f'error occured while retrieving {log_name} {username} / {playlist_name}') logger.exception(f'error occured while retrieving {log_name} {username} / {playlist.name}')
playlist_tracks = list(remove_local(playlist_tracks)) playlist_tracks = list(remove_local(playlist_tracks))
# LIBRARY playlist_tracks += load_library_tracks(spotnet, playlist, username)
return playlist_tracks
def load_library_tracks(spotnet: SpotNetwork, playlist: Playlist, username: str):
tracks = []
if playlist.include_library_tracks: if playlist.include_library_tracks:
try: try:
library_tracks = spotnet.saved_tracks() library_tracks = spotnet.saved_tracks()
if library_tracks and len(library_tracks) > 0: if library_tracks and len(library_tracks) > 0:
playlist_tracks += library_tracks tracks += library_tracks
else: else:
logger.error(f'error getting library tracks {username} / {playlist_name}') logger.error(f'error getting library tracks {username} / {playlist.name}')
except SpotifyNetworkException: except SpotifyNetworkException:
logger.exception(f'error occured while retrieving library tracks {username} / {playlist_name}') logger.exception(f'error occured while retrieving library tracks {username} / {playlist.name}')
# PLAYLIST TYPE SPECIFIC return tracks
def do_playlist_type_processing(spotnet: SpotNetwork, playlist: Playlist, user: User, current_tracks: List):
if playlist.type == 'recents': if playlist.type == 'recents':
boundary_date = datetime.datetime.now(datetime.timezone.utc) - \ return do_recents_processing(playlist, current_tracks)
datetime.timedelta(days=int(playlist.day_boundary))
playlist_tracks = list(added_after(playlist_tracks, boundary_date))
elif playlist.type == 'fmchart': elif playlist.type == 'fmchart':
if user.lastfm_username is None: return do_lastfm_chart_processing(spotnet, playlist, user, current_tracks)
logger.error(f'no associated last.fm username, chart source skipped {username} / {playlist_name}') return current_tracks
else:
chart_range = Network.Range.MONTH
try:
chart_range = Network.Range[playlist.chart_range]
except KeyError:
logger.error(f'invalid last.fm chart range found {playlist.chart_range}, '
f'defaulting to 1 month {username} / {playlist_name}')
if fmnet is None:
fmnet = database.get_authed_lastfm_network(user)
if fmnet is not None: def do_recents_processing(playlist: Playlist, current_tracks: List):
chart_tracks = map_lastfm_track_chart_to_spotify(spotnet=spotnet, boundary_date = datetime.datetime.now(datetime.timezone.utc) - \
fmnet=fmnet, datetime.timedelta(days=int(playlist.day_boundary))
period=chart_range, return list(added_after(current_tracks, boundary_date))
limit=playlist.chart_limit)
if chart_tracks is not None and len(chart_tracks) > 0: def do_lastfm_chart_processing(spotnet: SpotNetwork, playlist: Playlist, user: User, current_tracks: List):
playlist_tracks += chart_tracks if user.lastfm_username is None:
else: logger.error(f'no associated last.fm username, chart source skipped {user.username} / {playlist.name}')
logger.error(f'no tracks returned {username} / {playlist_name}') else:
chart_range = Network.Range.MONTH
try:
chart_range = Network.Range[playlist.chart_range]
except KeyError:
logger.error(f'invalid last.fm chart range found {playlist.chart_range}, '
f'defaulting to 1 month {user.username} / {playlist.name}')
fmnet = database.get_authed_lastfm_network(user)
if fmnet is not None:
chart_tracks = map_lastfm_track_chart_to_spotify(spotnet=spotnet,
fmnet=fmnet,
period=chart_range,
limit=playlist.chart_limit)
if chart_tracks is not None and len(chart_tracks) > 0:
current_tracks += chart_tracks
else: else:
logger.error(f'no last.fm network returned {username} / {playlist_name}') logger.error(f'no tracks returned {user.username} / {playlist.name}')
else:
logger.error(f'no last.fm network returned {user.username} / {playlist.name}')
# SORT METHOD return current_tracks
def sort_tracks(playlist: Playlist, current_tracks: List):
if playlist.shuffle: if playlist.shuffle:
random.shuffle(playlist_tracks) random.shuffle(current_tracks)
return current_tracks
elif playlist.type != 'fmchart': elif playlist.type != 'fmchart':
playlist_tracks = sort_by_release_date(tracks=playlist_tracks, reverse=True) return sort_by_release_date(tracks=current_tracks, reverse=True)
return current_tracks
def get_recommendations(spotnet: SpotNetwork, playlist: Playlist, username: str, current_tracks: List):
recommendations = []
# RECOMMENDATIONS
if playlist.include_recommendations: if playlist.include_recommendations:
try: try:
recommendations = spotnet.recommendations(tracks=[i.uri.object_id for i, j recommendations = spotnet.recommendations(tracks=[i.uri.object_id for i, j
in get_track_objects( in get_track_objects(
random.sample(playlist_tracks, random.sample(current_tracks,
k=min(5, len(playlist_tracks)) k=min(5, len(current_tracks))
) )
) )
if i.uri.object_type == Uri.ObjectType.track], if i.uri.object_type == Uri.ObjectType.track],
response_limit=playlist.recommendation_sample) response_limit=playlist.recommendation_sample)
if recommendations and len(recommendations.tracks) > 0: if recommendations and len(recommendations.tracks) > 0:
playlist_tracks += recommendations.tracks recommendations = recommendations.tracks
else: else:
logger.error(f'error getting recommendations {username} / {playlist_name}') logger.error(f'error getting recommendations {username} / {playlist.name}')
except SpotifyNetworkException: except SpotifyNetworkException:
logger.exception(f'error occured while generating recommendations {username} / {playlist_name}') logger.exception(f'error occured while generating recommendations {username} / {playlist.name}')
# DEDUPLICATE return recommendations
playlist_tracks = deduplicate_by_name(playlist_tracks)
# EXECUTE def execute_playlist(spotnet: SpotNetwork, playlist: Playlist, part_names, username: str, current_tracks: List):
try: try:
spotnet.replace_playlist_tracks(uri=playlist.uri, uris=[i.uri for i, j in get_track_objects(playlist_tracks)]) spotnet.replace_playlist_tracks(uri=playlist.uri, uris=[i.uri for i, j in get_track_objects(current_tracks)])
if playlist.description_overwrite: if playlist.description_overwrite:
string = playlist.description_overwrite string = playlist.description_overwrite
@ -205,18 +258,13 @@ def run_user_playlist(user: User, playlist: Playlist, spotnet: SpotNetwork = Non
string += f' - {str(playlist.description_suffix)}' string += f' - {str(playlist.description_suffix)}'
if string is None or len(string) == 0: if string is None or len(string) == 0:
logger.error(f'no string generated {username} / {playlist_name}') logger.error(f'no string generated {username} / {playlist.name}')
return None return None
try: try:
spotnet.change_playlist_details(uri=playlist.uri, description=string) spotnet.change_playlist_details(uri=playlist.uri, description=string)
except SpotifyNetworkException: except SpotifyNetworkException:
logger.exception(f'error changing description for {username} / {playlist_name}') logger.exception(f'error changing description for {username} / {playlist.name}')
except SpotifyNetworkException: except SpotifyNetworkException:
logger.exception(f'error executing {username} / {playlist_name}') logger.exception(f'error executing {username} / {playlist.name}')
playlist.last_updated = datetime.datetime.utcnow()
playlist.update()
notify_user_playlist_update(user=user, playlist=playlist)