From 028bce6ee8339184a064afb5b7ebb8e0948c3175 Mon Sep 17 00:00:00 2001 From: aj Date: Sun, 6 Oct 2019 10:59:18 +0100 Subject: [PATCH] added handling for now playing scrobble, made page parsing more resilient added get top methods --- fmframework/model/album.py | 3 +- fmframework/model/artist.py | 2 +- fmframework/net/network.py | 104 +++++++++++++++++++++++++++++++----- 3 files changed, 93 insertions(+), 16 deletions(-) diff --git a/fmframework/model/album.py b/fmframework/model/album.py index 1629d4d..8621c80 100644 --- a/fmframework/model/album.py +++ b/fmframework/model/album.py @@ -27,7 +27,8 @@ class Album(LastFM): return f'{self.name} / {self.artist}' def __repr__(self): - return Color.DARKCYAN + Color.BOLD + 'Album' + Color.END + f': {self.name} {self.artist} ' + super().__repr__() + return Color.DARKCYAN + Color.BOLD + 'Album' + Color.END + f': {self.name} artist({repr(self.artist)}) ' \ + + super().__repr__() @staticmethod def wrap(name: str = None, diff --git a/fmframework/model/artist.py b/fmframework/model/artist.py index 3097747..75a3577 100644 --- a/fmframework/model/artist.py +++ b/fmframework/model/artist.py @@ -23,4 +23,4 @@ class Artist(LastFM): return f'{self.name}' def __repr__(self): - return Color.PURPLE + Color.BOLD + 'Artist' + Color.END + f': {self.name}' + super().__repr__() + return Color.PURPLE + Color.BOLD + 'Artist' + Color.END + f': {self.name} ' + super().__repr__() diff --git a/fmframework/net/network.py b/fmframework/net/network.py index b8b9bea..250a311 100644 --- a/fmframework/net/network.py +++ b/fmframework/net/network.py @@ -2,6 +2,7 @@ import requests from typing import Optional, List from copy import deepcopy import logging +from enum import Enum from datetime import datetime, date, time, timedelta from fmframework.model.fm import Scrobble, Wiki @@ -14,6 +15,14 @@ logger = logging.getLogger(__name__) class Network: + class Range(Enum): + OVERALL = 'overall' + WEEK = '7day' + MONTH = '1month' + QUARTER = '3month' + HALFYEAR = '6month' + YEAR = '12month' + def __init__(self, username, api_key): self.api_key = api_key @@ -85,8 +94,9 @@ class Network: items = iterator.items - if items[0].get('@attr', {}).get('nowplaying', None): - items.pop(0) + if len(items) >= 1: + if items[0].get('@attr', {}).get('nowplaying', None): + items.pop(0) return [self.parse_scrobble(i) for i in items[:limit]] @@ -172,6 +182,67 @@ class Network: else: logger.error('no response') + def get_top_tracks(self, + period: Range, + username: str = None, + limit: int = None): + if limit is not None: + logger.info(f'pulling top {limit} tracks from {period.value} ' + f'for {self.username if username is None else username}') + else: + logger.info(f'pulling top tracks from {period.value} for {self.username if username is None else username}') + + params = { + 'user': self.username if username is None else username, + 'period': period.value + } + + iterator = PageCollection(net=self, method='user.gettoptracks', params=params, response_limit=limit) + iterator.load() + + return [self.parse_track(i) for i in iterator.items] + + def get_top_albums(self, + period: Range, + username: str = None, + limit: int = None): + if limit is not None: + logger.info(f'pulling top {limit} albums from {period.value} ' + f'for {self.username if username is None else username}') + else: + logger.info(f'pulling top albums from {period.value} for {self.username if username is None else username}') + + params = { + 'user': self.username if username is None else username, + 'period': period.value + } + + iterator = PageCollection(net=self, method='user.gettopalbums', params=params, response_limit=limit) + iterator.load() + + return [self.parse_album(i) for i in iterator.items] + + def get_top_artists(self, + period: Range, + username: str = None, + limit: int = None): + if limit is not None: + logger.info(f'pulling top {limit} artists from {period.value} ' + f'for {self.username if username is None else username}') + else: + logger.info(f'pulling top artists from {period.value} ' + f'for {self.username if username is None else username}') + + params = { + 'user': self.username if username is None else username, + 'period': period.value + } + + iterator = PageCollection(net=self, method='user.gettopartists', params=params, response_limit=limit) + iterator.load() + + return [self.parse_artist(i) for i in iterator.items] + @staticmethod def parse_wiki(wiki_dict) -> Optional[Wiki]: if wiki_dict: @@ -185,13 +256,14 @@ class Network: return Artist(name=artist_dict.get('name', 'n/a'), url=artist_dict.get('url', None), mbid=artist_dict.get('mbid', None), - listeners=int(artist_dict["stats"].get('listeners', 0)), - play_count=int(artist_dict["stats"].get('playcount', 0)), - user_scrobbles=int(artist_dict["stats"].get('userplaycount', 0)), + listeners=int(artist_dict.get('stats', {}).get('listeners', 0)), + play_count=int(artist_dict.get('stats', {}).get('playcount', 0)), + user_scrobbles=int(artist_dict.get('stats', {}).get('userplaycount', + artist_dict.get('playcount', 0))), wiki=self.parse_wiki(artist_dict['wiki']) if artist_dict.get('wiki', None) else None) def parse_album(self, album_dict) -> Album: - return Album(name=album_dict.get('name', 'n/a'), + return Album(name=album_dict.get('name', album_dict.get('title', 'n/a')), url=album_dict.get('url', 'n/a'), mbid=album_dict.get('mbid', 'n/a'), listeners=int(album_dict.get('listeners', 0)), @@ -213,7 +285,7 @@ class Network: track.album = self.parse_album(track_dict['album']) if track_dict.get('artist', None): - track.album = self.parse_album(track_dict['artist']) + track.artist = self.parse_artist(track_dict['artist']) return track @@ -281,7 +353,7 @@ class PageCollection: tracker = True while len(self) < self.response_limit and tracker: page = self.iterate() - if len(page) == 0: + if len(page) == 0 or self.counter > page.total_pages: tracker = False else: self.pages.append(page) @@ -289,7 +361,7 @@ class PageCollection: tracker = True while tracker: page = self.iterate() - if len(page) == 0: + if len(page) == 0 or self.counter > page.total_pages: tracker = False else: self.pages.append(page) @@ -325,12 +397,16 @@ class PageCollection: @staticmethod def parse_page(page_dict): first_value = list(page_dict.values())[0] - items = list(first_value.values())[1] + attr = first_value['@attr'] + + del first_value['@attr'] + items = list(first_value.values())[0] + return Page( - number=first_value['@attr'].get('page', None), - size=first_value['@attr'].get('perPage', None), - total=first_value['@attr'].get('total', None), - total_pages=first_value['@attr'].get('totalPages', None), + number=int(attr.get('page', None)), + size=int(attr.get('perPage', None)), + total=int(attr.get('total', None)), + total_pages=int(attr.get('totalPages', None)), items=items)