added handling for now playing scrobble, made page parsing more resilient

added get top methods
This commit is contained in:
aj 2019-10-06 10:59:18 +01:00
parent 13120b0a47
commit 028bce6ee8
3 changed files with 93 additions and 16 deletions

View File

@ -27,7 +27,8 @@ class Album(LastFM):
return f'{self.name} / {self.artist}'
def __repr__(self):
return Color.DARKCYAN + Color.BOLD + 'Album' + Color.END + f': {self.name} {self.artist} ' + super().__repr__()
return Color.DARKCYAN + Color.BOLD + 'Album' + Color.END + f': {self.name} artist({repr(self.artist)}) ' \
+ super().__repr__()
@staticmethod
def wrap(name: str = None,

View File

@ -2,6 +2,7 @@ import requests
from typing import Optional, List
from copy import deepcopy
import logging
from enum import Enum
from datetime import datetime, date, time, timedelta
from fmframework.model.fm import Scrobble, Wiki
@ -14,6 +15,14 @@ logger = logging.getLogger(__name__)
class Network:
class Range(Enum):
OVERALL = 'overall'
WEEK = '7day'
MONTH = '1month'
QUARTER = '3month'
HALFYEAR = '6month'
YEAR = '12month'
def __init__(self, username, api_key):
self.api_key = api_key
@ -85,6 +94,7 @@ class Network:
items = iterator.items
if len(items) >= 1:
if items[0].get('@attr', {}).get('nowplaying', None):
items.pop(0)
@ -172,6 +182,67 @@ class Network:
else:
logger.error('no response')
def get_top_tracks(self,
period: Range,
username: str = None,
limit: int = None):
if limit is not None:
logger.info(f'pulling top {limit} tracks from {period.value} '
f'for {self.username if username is None else username}')
else:
logger.info(f'pulling top tracks from {period.value} for {self.username if username is None else username}')
params = {
'user': self.username if username is None else username,
'period': period.value
}
iterator = PageCollection(net=self, method='user.gettoptracks', params=params, response_limit=limit)
iterator.load()
return [self.parse_track(i) for i in iterator.items]
def get_top_albums(self,
period: Range,
username: str = None,
limit: int = None):
if limit is not None:
logger.info(f'pulling top {limit} albums from {period.value} '
f'for {self.username if username is None else username}')
else:
logger.info(f'pulling top albums from {period.value} for {self.username if username is None else username}')
params = {
'user': self.username if username is None else username,
'period': period.value
}
iterator = PageCollection(net=self, method='user.gettopalbums', params=params, response_limit=limit)
iterator.load()
return [self.parse_album(i) for i in iterator.items]
def get_top_artists(self,
period: Range,
username: str = None,
limit: int = None):
if limit is not None:
logger.info(f'pulling top {limit} artists from {period.value} '
f'for {self.username if username is None else username}')
else:
logger.info(f'pulling top artists from {period.value} '
f'for {self.username if username is None else username}')
params = {
'user': self.username if username is None else username,
'period': period.value
}
iterator = PageCollection(net=self, method='user.gettopartists', params=params, response_limit=limit)
iterator.load()
return [self.parse_artist(i) for i in iterator.items]
@staticmethod
def parse_wiki(wiki_dict) -> Optional[Wiki]:
if wiki_dict:
@ -185,13 +256,14 @@ class Network:
return Artist(name=artist_dict.get('name', 'n/a'),
url=artist_dict.get('url', None),
mbid=artist_dict.get('mbid', None),
listeners=int(artist_dict["stats"].get('listeners', 0)),
play_count=int(artist_dict["stats"].get('playcount', 0)),
user_scrobbles=int(artist_dict["stats"].get('userplaycount', 0)),
listeners=int(artist_dict.get('stats', {}).get('listeners', 0)),
play_count=int(artist_dict.get('stats', {}).get('playcount', 0)),
user_scrobbles=int(artist_dict.get('stats', {}).get('userplaycount',
artist_dict.get('playcount', 0))),
wiki=self.parse_wiki(artist_dict['wiki']) if artist_dict.get('wiki', None) else None)
def parse_album(self, album_dict) -> Album:
return Album(name=album_dict.get('name', 'n/a'),
return Album(name=album_dict.get('name', album_dict.get('title', 'n/a')),
url=album_dict.get('url', 'n/a'),
mbid=album_dict.get('mbid', 'n/a'),
listeners=int(album_dict.get('listeners', 0)),
@ -213,7 +285,7 @@ class Network:
track.album = self.parse_album(track_dict['album'])
if track_dict.get('artist', None):
track.album = self.parse_album(track_dict['artist'])
track.artist = self.parse_artist(track_dict['artist'])
return track
@ -281,7 +353,7 @@ class PageCollection:
tracker = True
while len(self) < self.response_limit and tracker:
page = self.iterate()
if len(page) == 0:
if len(page) == 0 or self.counter > page.total_pages:
tracker = False
else:
self.pages.append(page)
@ -289,7 +361,7 @@ class PageCollection:
tracker = True
while tracker:
page = self.iterate()
if len(page) == 0:
if len(page) == 0 or self.counter > page.total_pages:
tracker = False
else:
self.pages.append(page)
@ -325,12 +397,16 @@ class PageCollection:
@staticmethod
def parse_page(page_dict):
first_value = list(page_dict.values())[0]
items = list(first_value.values())[1]
attr = first_value['@attr']
del first_value['@attr']
items = list(first_value.values())[0]
return Page(
number=first_value['@attr'].get('page', None),
size=first_value['@attr'].get('perPage', None),
total=first_value['@attr'].get('total', None),
total_pages=first_value['@attr'].get('totalPages', None),
number=int(attr.get('page', None)),
size=int(attr.get('perPage', None)),
total=int(attr.get('total', None)),
total_pages=int(attr.get('totalPages', None)),
items=items)