Mixonomer/music/tasks/run_user_playlist.py

270 lines
10 KiB
Python

import datetime
import logging
import random
from typing import List
import spotframework.util.monthstrings as monthstrings
from spotframework.model.uri import Uri
from spotframework.filter import remove_local, get_track_objects
from spotframework.filter.added import added_after
from spotframework.filter.sort import sort_by_release_date
from spotframework.filter.deduplicate import deduplicate_by_name
from spotframework.net.network import SpotifyNetworkException
from spotframework.net.network import Network as SpotNetwork
from fmframework.net.network import Network
from spotfm.chart import map_lastfm_track_chart_to_spotify
import music.db.database as database
from music.db.part_generator import PartGenerator
from music.model.user import User
from music.model.playlist import Playlist
from music.notif.notifier import notify_user_playlist_update
logger = logging.getLogger(__name__)
def get_user_and_name(user):
if isinstance(user, str):
username = user
user = User.collection.filter('username', '==', username.strip().lower()).get()
return user, username
else:
return user, user.username
def get_playlist_and_name(playlist, user: User):
if isinstance(playlist, str):
playlist_name = playlist
playlist = user.get_playlist(playlist_name)
return playlist, playlist_name
else:
return playlist, playlist.name
def run_user_playlist(user: User, playlist: Playlist, spotnet: SpotNetwork = None, fmnet: Network = None) -> None:
"""Generate and upadate a user's smart playlist
Args:
user (User): Subject user
playlist (Playlist): User's subject playlist
spotnet (SpotNetwork, optional): Spotframework network for Spotify operations. Defaults to None.
fmnet (Network, optional): Fmframework network for Last.fm operations. Defaults to None.
Raises:
NameError: No user provided
NameError: No playlist provided
AttributeError: Playlist has no URI
NameError: No spotframework network available
e: spotframework error when retrieving user playlists
Returns:
[type]: [description]
"""
# PRE-RUN CHECKS
user, username = get_user_and_name(user)
if user is None:
logger.error(f'user {username} not found')
raise NameError(f'User {username} not found')
playlist, playlist_name = get_playlist_and_name(playlist, user)
if playlist.uri is None:
logger.critical(f'no playlist id to populate {username} / {playlist_name}')
raise AttributeError(f'No URI for {playlist_name} ({username})')
# END CHECKS
logger.info(f'running {username} / {playlist_name}')
if spotnet is None:
spotnet = database.get_authed_spotify_network(user)
if spotnet is None:
logger.error(f'no spotify network returned for {username} / {playlist_name}')
raise NameError(f'No Spotify network returned ({username} / {playlist_name})')
part_generator = PartGenerator(user=user)
part_names = part_generator.get_recursive_parts(playlist.name)
playlist_tracks = load_playlist_tracks(spotnet, playlist, part_names, username)
playlist_tracks = do_playlist_type_processing(spotnet, playlist, user, playlist_tracks)
playlist_tracks = sort_tracks(playlist, playlist_tracks)
playlist_tracks += get_recommendations(spotnet, playlist, username, playlist_tracks)
playlist_tracks = deduplicate_by_name(playlist_tracks)
execute_playlist(spotnet, playlist, part_names, username, playlist_tracks)
playlist.last_updated = datetime.datetime.utcnow()
playlist.update()
notify_user_playlist_update(user=user, playlist=playlist)
def load_user_playlists(spotnet: SpotNetwork, playlist: Playlist, username: str):
try:
if not playlist.include_spotify_owned:
return {i.name: i.uri
for i in spotnet.playlists()
if 'spotify' not in i.owner.display_name.lower()}
else:
return {i.name: i.uri
for i in spotnet.playlists()}
except SpotifyNetworkException as e:
logger.exception(f'error occured while retrieving playlists {username} / {playlist.name}')
raise e
def load_playlist_tracks(spotnet: SpotNetwork, playlist: Playlist, part_names: List[str], username: str):
playlist_tracks = []
if playlist.add_last_month:
part_names.append(monthstrings.get_last_month())
if playlist.add_this_month:
part_names.append(monthstrings.get_this_month())
user_playlists = load_user_playlists(spotnet, playlist, username)
# LOAD PLAYLIST TRACKS
for part_name in part_names:
try: # attempt to cast to uri
uri = Uri(part_name)
log_name = uri
except ValueError: # is a playlist name
uri = user_playlists.get(part_name)
if uri is None:
logger.warning(f'playlist {part_name} not found {username} / {playlist.name}')
continue
log_name = part_name
try:
_tracks = spotnet.playlist_tracks(uri=uri, reduced_mem=True)
if _tracks and len(_tracks) > 0:
playlist_tracks += _tracks
else:
logger.warning(f'no tracks returned for {log_name} {username} / {playlist.name}')
except SpotifyNetworkException:
logger.exception(f'error occured while retrieving {log_name} {username} / {playlist.name}')
playlist_tracks = list(remove_local(playlist_tracks))
playlist_tracks += load_library_tracks(spotnet, playlist, username)
return playlist_tracks
def load_library_tracks(spotnet: SpotNetwork, playlist: Playlist, username: str):
tracks = []
if playlist.include_library_tracks:
try:
library_tracks = spotnet.saved_tracks()
if library_tracks and len(library_tracks) > 0:
tracks += library_tracks
else:
logger.error(f'error getting library tracks {username} / {playlist.name}')
except SpotifyNetworkException:
logger.exception(f'error occured while retrieving library tracks {username} / {playlist.name}')
return tracks
def do_playlist_type_processing(spotnet: SpotNetwork, playlist: Playlist, user: User, current_tracks: List):
if playlist.type == 'recents':
return do_recents_processing(playlist, current_tracks)
elif playlist.type == 'fmchart':
return do_lastfm_chart_processing(spotnet, playlist, user, current_tracks)
return current_tracks
def do_recents_processing(playlist: Playlist, current_tracks: List):
boundary_date = datetime.datetime.now(datetime.timezone.utc) - \
datetime.timedelta(days=int(playlist.day_boundary))
return list(added_after(current_tracks, boundary_date))
def do_lastfm_chart_processing(spotnet: SpotNetwork, playlist: Playlist, user: User, current_tracks: List):
if user.lastfm_username is None:
logger.error(f'no associated last.fm username, chart source skipped {user.username} / {playlist.name}')
else:
chart_range = Network.Range.MONTH
try:
chart_range = Network.Range[playlist.chart_range]
except KeyError:
logger.error(f'invalid last.fm chart range found {playlist.chart_range}, '
f'defaulting to 1 month {user.username} / {playlist.name}')
fmnet = database.get_authed_lastfm_network(user)
if fmnet is not None:
chart_tracks = map_lastfm_track_chart_to_spotify(spotnet=spotnet,
fmnet=fmnet,
period=chart_range,
limit=playlist.chart_limit)
if chart_tracks is not None and len(chart_tracks) > 0:
current_tracks += chart_tracks
else:
logger.error(f'no tracks returned {user.username} / {playlist.name}')
else:
logger.error(f'no last.fm network returned {user.username} / {playlist.name}')
return current_tracks
def sort_tracks(playlist: Playlist, current_tracks: List):
if playlist.shuffle:
random.shuffle(current_tracks)
return current_tracks
elif playlist.type != 'fmchart':
return sort_by_release_date(tracks=current_tracks, reverse=True)
return current_tracks
def get_recommendations(spotnet: SpotNetwork, playlist: Playlist, username: str, current_tracks: List):
recommendations = []
if playlist.include_recommendations:
try:
recommendations = spotnet.recommendations(tracks=[i.uri.object_id for i, j
in get_track_objects(
random.sample(current_tracks,
k=min(5, len(current_tracks))
)
)
if i.uri.object_type == Uri.ObjectType.track],
response_limit=playlist.recommendation_sample)
if recommendations and len(recommendations.tracks) > 0:
recommendations = recommendations.tracks
else:
logger.error(f'error getting recommendations {username} / {playlist.name}')
except SpotifyNetworkException:
logger.exception(f'error occured while generating recommendations {username} / {playlist.name}')
return recommendations
def execute_playlist(spotnet: SpotNetwork, playlist: Playlist, part_names, username: str, current_tracks: List):
try:
spotnet.replace_playlist_tracks(uri=playlist.uri, uris=[i.uri for i, j in get_track_objects(current_tracks)])
if playlist.description_overwrite:
string = playlist.description_overwrite
else:
string = ' / '.join(sorted(part_names))
if playlist.description_suffix:
string += f' - {str(playlist.description_suffix)}'
if string is None or len(string) == 0:
logger.error(f'no string generated {username} / {playlist.name}')
return None
try:
spotnet.change_playlist_details(uri=playlist.uri, description=string)
except SpotifyNetworkException:
logger.exception(f'error changing description for {username} / {playlist.name}')
except SpotifyNetworkException:
logger.exception(f'error executing {username} / {playlist.name}')