updated net code with exceptions, using dataclasses

This commit is contained in:
aj 2020-06-28 10:04:47 +01:00
parent 2394ccc536
commit 6715c46721
10 changed files with 222 additions and 361 deletions

View File

@ -1,4 +1,7 @@
fmframework fmframework
================ ================
python scripting framework for last.fm Python net wrapper and scripting framework for Last.fm.
component library of [Music Tools](https://github.com/Sarsoo/Music-Tools).
* Photo downloading and arrangement using OpenCV (Charts)

View File

@ -2,10 +2,8 @@ from bs4 import BeautifulSoup
import requests import requests
from datetime import date from datetime import date
from fmframework.model.album import Album from fmframework.model import Album, Artist, Image
from fmframework.model.artist import Artist from fmframework.net.network import Network, LastFMNetworkException
from fmframework.model.fm import Image
from fmframework.net.network import Network
import fmframework.image import fmframework.image
import logging import logging
@ -35,7 +33,10 @@ def get_populated_album_chart(net: Network, username: str, from_date: date, to_d
albums = [] albums = []
for counter, scraped in enumerate(chart): for counter, scraped in enumerate(chart):
logger.debug(f'populating {counter+1} of {len(chart)}') logger.debug(f'populating {counter+1} of {len(chart)}')
try:
albums.append(net.get_album(name=scraped.name, artist=scraped.artist.name)) albums.append(net.get_album(name=scraped.name, artist=scraped.artist.name))
except LastFMNetworkException as e:
logger.error(f'error occured during album retrieval - {e}')
return albums return albums

View File

@ -1,7 +1,7 @@
import numpy as np import numpy as np
from typing import List from typing import List
from fmframework.net.network import Network, ImageSizeNotAvailableException from fmframework.net.network import Network, ImageSizeNotAvailableException
from fmframework.model.fm import Image from fmframework.model import Image
import logging import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)

View File

@ -2,7 +2,8 @@ from csv import DictWriter
import datetime import datetime
import logging import logging
from typing import List from typing import List
from fmframework.model.fm import Scrobble from fmframework.model import Scrobble
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
headers = ['track', 'album', 'artist', 'time', 'track id', 'album id', 'artist id'] headers = ['track', 'album', 'artist', 'time', 'track id', 'album id', 'artist id']

View File

@ -0,0 +1,96 @@
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from typing import List
class Image:
class Size(Enum):
other = 0
small = 1
medium = 2
large = 3
extralarge = 4
mega = 5
def __init__(self, size: Size, link: str):
self.size = size
self.link = link
def __str__(self):
return f'{self.size.name} - {self.link}'
@dataclass
class Wiki:
published: datetime = None
summary: str = None
content: str = None
def __post_init__(self):
if isinstance(self.published, str):
self.published = datetime.strptime(self.published, '%d %b %Y, %H:%M')
@dataclass
class LastFM:
name: str = None
url: str = None
mbid: str = None
listeners: int = None
play_count: int = None
user_scrobbles: int = None
wiki: Wiki = None
images: List[Image] = None
def __str__(self):
return self.name
@dataclass
class Artist(LastFM):
def __str__(self):
return f'{self.name}'
@dataclass
class Album(LastFM):
artist: Artist = None
def __str__(self):
return f'{self.name} / {self.artist}'
@dataclass
class Track(LastFM):
album: Album = None
artist: Artist = None
def __str__(self):
return f'{self.name} / {self.album} / {self.artist}'
class WeeklyChart:
def __init__(self, from_time, to_time):
self.from_secs = from_time
self.to_secs = to_time
@property
def from_date(self):
return datetime.fromtimestamp(self.from_secs)
@property
def to_date(self):
return datetime.fromtimestamp(self.to_secs)
def __str__(self):
return f'{self.from_secs} -> {self.to_secs}'
@dataclass
class Scrobble:
track: Track = None
time: datetime = None
def __str__(self):
return self.track

View File

@ -1,49 +0,0 @@
from typing import List
from fmframework.model.fm import LastFM, Wiki, Image
from fmframework.model.artist import Artist
from fmframework.util.console import Color
class Album(LastFM):
def __init__(self,
name: str = None,
url: str = None,
mbid: str = None,
listeners: int = None,
play_count: int = None,
user_scrobbles: int = None,
wiki: Wiki = None,
artist: Artist = None,
images: List[Image] = None):
super().__init__(name=name,
url=url,
mbid=mbid,
listeners=listeners,
play_count=play_count,
user_scrobbles=user_scrobbles,
wiki=wiki,
images=images)
self.artist = artist
def __str__(self):
return f'{self.name} / {self.artist}'
def __repr__(self):
return Color.DARKCYAN + Color.BOLD + 'Album' + Color.END + f': {self.name} artist({repr(self.artist)}) ' \
+ super().__repr__()
@staticmethod
def wrap(name: str = None,
artist: str = None,
url: str = None,
mbid: str = None,
listeners: int = None,
play_count: int = None,
user_scrobbles: int = None):
return Album(name=name,
artist=Artist(name=artist),
url=url,
mbid=mbid,
listeners=listeners,
play_count=play_count,
user_scrobbles=user_scrobbles)

View File

@ -1,29 +0,0 @@
from typing import List
from fmframework.util.console import Color
from fmframework.model.fm import LastFM, Wiki, Image
class Artist(LastFM):
def __init__(self,
name: str,
url: str = None,
mbid: str = None,
listeners: int = None,
play_count: int = None,
user_scrobbles: int = None,
wiki: Wiki = None,
images: List[Image] = None):
super().__init__(name=name,
url=url,
mbid=mbid,
listeners=listeners,
play_count=play_count,
user_scrobbles=user_scrobbles,
wiki=wiki,
images=images)
def __str__(self):
return f'{self.name}'
def __repr__(self):
return Color.PURPLE + Color.BOLD + 'Artist' + Color.END + f': {self.name} ' + super().__repr__()

View File

@ -1,99 +0,0 @@
from __future__ import annotations
from fmframework.util.console import Color
from datetime import datetime
from enum import Enum
from typing import List
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from fmframework.model.track import Track
class Image:
class Size(Enum):
other = 0
small = 1
medium = 2
large = 3
extralarge = 4
mega = 5
def __init__(self, size: Size, link: str):
self.size = size
self.link = link
def __str__(self):
return f'{self.size.name} - {self.link}'
class Wiki:
def __init__(self,
date: datetime = None,
summary: str = None,
content: str = None):
self.date = date
self.summary = summary
self.content = content
def __repr__(self):
return Color.YELLOW + Color.BOLD + 'Wiki:' + Color.END + \
f': {self.date}, {self.summary}, {self.content}'
class LastFM:
def __init__(self,
name: str = None,
url: str = None,
mbid: str = None,
listeners: int = None,
play_count: int = None,
user_scrobbles: int = None,
wiki: Wiki = None,
images: List[Image] = None):
self.name = name
self.url = url
self.mbid = mbid
self.listeners = listeners
self.play_count = play_count
self.user_scrobbles = user_scrobbles
self.wiki = wiki
self.images = images
def __str__(self):
return self.name
def __repr__(self):
return Color.RED + Color.BOLD + 'LastFM' + Color.END + \
f': {self.name}, user({self.user_scrobbles}), play_count({self.play_count}), ' \
f'listeners({self.listeners}), wiki({self.wiki})'
class WeeklyChart:
def __init__(self, from_time, to_time):
self.from_secs = from_time
self.to_secs = to_time
@property
def from_date(self):
return datetime.fromtimestamp(self.from_secs)
@property
def to_date(self):
return datetime.fromtimestamp(self.to_secs)
def __str__(self):
return f'{self.from_secs} -> {self.to_secs}'
class Scrobble:
def __init__(self,
track: Track = None,
time: datetime = None):
self.track = track
self.time = time
def __str__(self):
return self.track
def __repr__(self):
return Color.BLUE + Color.BOLD + 'Scrobble' + Color.END + f': {self.time} {repr(self.track)}'

View File

@ -1,55 +0,0 @@
from typing import List
from fmframework.model.fm import LastFM, Wiki, Image
from fmframework.model.album import Album
from fmframework.model.artist import Artist
from fmframework.util.console import Color
class Track(LastFM):
def __init__(self,
name: str = None,
url: str = None,
mbid: str = None,
listeners: int = 0,
play_count: int = 0,
user_scrobbles: int = 0,
wiki: Wiki = None,
album: Album = None,
artist: Artist = None,
images: List[Image] = None):
super().__init__(name=name,
url=url,
mbid=mbid,
listeners=listeners,
play_count=play_count,
user_scrobbles=user_scrobbles,
wiki=wiki,
images=images)
self.album = album
self.artist = artist
def __str__(self):
return f'{self.name} / {self.album} / {self.artist}'
def __repr__(self):
return Color.YELLOW + Color.BOLD + 'Track' + Color.END + \
f': {self.name} album({repr(self.album)}), artist({repr(self.artist)}) ' + super().__repr__()
@staticmethod
def wrap(name: str = None,
artist: str = None,
album: str = None,
album_artist: str = None,
url: str = None,
mbid: str = None,
listeners: int = None,
play_count: int = None,
user_scrobbles: int = None):
return Track(name=name,
album=Album.wrap(name=album, artist=album_artist),
artist=Artist(artist),
url=url,
mbid=mbid,
listeners=listeners,
play_count=play_count,
user_scrobbles=user_scrobbles)

View File

@ -1,4 +1,5 @@
import requests import requests
from dataclasses import dataclass
from typing import Optional, List, Union from typing import Optional, List, Union
from copy import deepcopy from copy import deepcopy
import logging import logging
@ -9,10 +10,7 @@ from datetime import datetime, date, time, timedelta
import numpy as np import numpy as np
import cv2 import cv2
from fmframework.model.fm import Scrobble, Wiki, Image, WeeklyChart from fmframework.model import Album, Artist, Image, Wiki, WeeklyChart, Scrobble, Track
from fmframework.model.track import Track
from fmframework.model.album import Album
from fmframework.model.artist import Artist
from fmframework import config_directory from fmframework import config_directory
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -22,6 +20,16 @@ class ImageSizeNotAvailableException(Exception):
pass pass
@dataclass
class LastFMNetworkException(Exception):
http_code: int
error_code: int
message: str = None
def __str__(self):
return "Last.fm Network Exception: (%s/%s) %s" % (self.http_code, self.error_code, self.message)
class Network: class Network:
class Range(Enum): class Range(Enum):
@ -36,11 +44,55 @@ class Network:
self.api_key = api_key self.api_key = api_key
self.username = username self.username = username
self.rsession = requests.Session()
self.retry_counter = 0 self.retry_counter = 0
def net_call(self,
http_method: str,
method: str,
params: dict = None,
data: dict = None,
json: dict = None,
headers: dict = None) -> dict:
http_method = http_method.strip().upper()
response = self.rsession.request(method=http_method,
url='http://ws.audioscrobbler.com/2.0/',
headers=headers,
params=params,
json=json,
data=data)
resp = response.json()
if 200 <= response.status_code < 300:
logger.debug(f'{http_method} {method} {response.status_code}')
return resp
code = resp.get('error', None)
message = resp.get('message', None)
if code:
if code in [8, 11, 16]:
if self.retry_counter < 5:
self.retry_counter += 1
logger.warning(f'{method} {response.status_code} {code} {message} retyring')
return self.net_call(http_method=http_method,
method=method,
params=params,
data=data,
json=json,
headers=headers)
else:
self.retry_counter = 0
logger.error(f'{method} {response.status_code} {code} {message} retry limit reached')
raise LastFMNetworkException(http_code=response.status_code, error_code=code, message=message)
def get_request(self, def get_request(self,
method: str, method: str,
params: dict = None) -> Optional[dict]: params: dict = None,
**kwargs) -> dict:
data = { data = {
"format": 'json', "format": 'json',
@ -49,50 +101,20 @@ class Network:
} }
if params is not None: if params is not None:
data.update(params) data.update(params)
if kwargs is not None:
data.update({i: j for i, j in kwargs.items() if j is not None})
req = requests.get('http://ws.audioscrobbler.com/2.0/', params=data) return self.net_call(http_method='GET', method=method, params=data)
if 200 <= req.status_code < 300: def get_user_scrobble_count(self, username: str = None) -> int:
logger.debug(f'{method} {req.status_code}')
return req.json()
else:
resp = req.json()
code = resp.get('error', None)
message = resp.get('message', None)
if code:
if code == 8:
if self.retry_counter < 5:
self.retry_counter += 1
logger.warning(f'{method} {req.status_code} {code} {message} retyring')
return self.get_request(method, params)
else:
self.retry_counter = 0
logger.error(f'{method} {req.status_code} {code} {message} retry limit reached')
else:
logger.error(f'{method} {req.status_code} {code} {message} retry limit reached')
else:
if message:
logger.error(f'{method} {req.status_code} {message}')
else:
logger.error(f'{method} {req.status_code}')
def get_user_scrobble_count(self, username: str = None):
if username is None: if username is None:
username = self.username username = self.username
logger.info(f'getting scrobble count {username}') logger.info(f'getting scrobble count {username}')
return int(
params = { self.get_request(method='user.getinfo', user=username)
'user': username .get('user', {})
} .get('playcount', None)
)
resp = self.get_request(method='user.getinfo', params=params)
if resp:
return int(resp.get('user', {}).get('playcount', None))
else:
logger.error('no response')
def get_recent_tracks(self, def get_recent_tracks(self,
username: str = None, username: str = None,
@ -106,7 +128,7 @@ class Network:
logger.info(f'pulling all tracks') logger.info(f'pulling all tracks')
params = { params = {
'user': self.username if username is None else username 'user': username or self.username
} }
if from_time is not None: if from_time is not None:
@ -130,21 +152,19 @@ class Network:
input_date: date, input_date: date,
username: str = None, username: str = None,
limit: int = None) -> Optional[List[Scrobble]]: limit: int = None) -> Optional[List[Scrobble]]:
logger.info(f'getting {input_date} scrobbles for {self.username if username is None else username}') logger.info(f'getting {input_date} scrobbles for {username or self.username}')
midnight = time(hour=0, minute=0, second=0) midnight = time(hour=0, minute=0, second=0)
from_date = datetime.combine(date=input_date, time=midnight) from_date = datetime.combine(date=input_date, time=midnight)
to_date = datetime.combine(date=input_date + timedelta(days=1), time=midnight) to_date = datetime.combine(date=input_date + timedelta(days=1), time=midnight)
scrobbles = self.get_recent_tracks(username=username, from_time=from_date, to_time=to_date, limit=limit) return self.get_recent_tracks(username=username, from_time=from_date, to_time=to_date, limit=limit)
return scrobbles
def get_scrobble_count_from_date(self, def get_scrobble_count_from_date(self,
input_date: date, input_date: date,
username: str = None, username: str = None,
limit: int = None) -> int: limit: int = None) -> int:
logger.info(f'getting {input_date} scrobble count for {self.username if username is None else username}') logger.info(f'getting {input_date} scrobble count for {username or self.username}')
scrobbles = self.get_scrobbles_from_date(input_date=input_date, username=username, limit=limit) scrobbles = self.get_scrobbles_from_date(input_date=input_date, username=username, limit=limit)
@ -157,78 +177,59 @@ class Network:
name: str, name: str,
artist: str, artist: str,
username: str = None) -> Optional[Track]: username: str = None) -> Optional[Track]:
logger.info(f'getting {name} / {artist} for {self.username if username is None else username}') logger.info(f'getting {name} / {artist} for {username or self.username}')
params = { resp = self.get_request('track.getInfo',
'track': name, track=name,
'artist': artist, artist=artist,
'user': self.username if username is None else username user=username or self.username)
}
resp = self.get_request('track.getInfo', params=params)
if resp:
if resp.get('track'): if resp.get('track'):
return self.parse_track(resp['track']) return self.parse_track(resp['track'])
else: else:
logging.error(f'abnormal response - {resp}') logging.error(f'abnormal response - {resp}')
else:
logger.error('no response')
def get_album(self, def get_album(self,
name: str, name: str,
artist: str, artist: str,
username: str = None) -> Optional[Album]: username: str = None) -> Optional[Album]:
logger.info(f'getting {name} / {artist} for {self.username if username is None else username}') logger.info(f'getting {name} / {artist} for {username or self.username}')
params = { resp = self.get_request('album.getInfo',
'album': name, album=name,
'artist': artist, artist=artist,
'user': self.username if username is None else username user=username or self.username)
}
resp = self.get_request('album.getInfo', params=params)
if resp:
if resp.get('album'): if resp.get('album'):
return self.parse_album(resp['album']) return self.parse_album(resp['album'])
else: else:
logging.error(f'abnormal response - {resp}') logging.error(f'abnormal response - {resp}')
else:
logger.error('no response')
def get_artist(self, def get_artist(self,
name: str, name: str,
username: str = None) -> Optional[Artist]: username: str = None) -> Optional[Artist]:
logger.info(f'getting {name} for {self.username if username is None else username}') logger.info(f'getting {name} for {username or self.username}')
params = { resp = self.get_request('artist.getInfo',
'artist': name, artist=name,
'user': self.username if username is None else username user=username or self.username)
}
resp = self.get_request('artist.getInfo', params=params)
if resp:
if resp.get('artist'): if resp.get('artist'):
return self.parse_artist(resp['artist']) return self.parse_artist(resp['artist'])
else: else:
logging.error(f'abnormal response - {resp}') logging.error(f'abnormal response - {resp}')
else:
logger.error('no response')
def get_top_tracks(self, def get_top_tracks(self,
period: Range, period: Range,
username: str = None, username: str = None,
limit: int = None): limit: int = None):
if limit is not None: if limit is not None:
logger.info(f'pulling top {limit} tracks from {period.value} ' logger.info(f'pulling top {limit} tracks from {period.value} for {username or self.username}')
f'for {self.username if username is None else username}')
else: else:
logger.info(f'pulling top tracks from {period.value} for {self.username if username is None else username}') logger.info(f'pulling top tracks from {period.value} for {username or self.username}')
params = { params = {
'user': self.username if username is None else username, 'user': username or self.username,
'period': period.value 'period': period.value
} }
@ -242,13 +243,12 @@ class Network:
username: str = None, username: str = None,
limit: int = None): limit: int = None):
if limit is not None: if limit is not None:
logger.info(f'pulling top {limit} albums from {period.value} ' logger.info(f'pulling top {limit} albums from {period.value} for {username or self.username}')
f'for {self.username if username is None else username}')
else: else:
logger.info(f'pulling top albums from {period.value} for {self.username if username is None else username}') logger.info(f'pulling top albums from {period.value} for {username or self.username}')
params = { params = {
'user': self.username if username is None else username, 'user': username or self.username,
'period': period.value 'period': period.value
} }
@ -262,14 +262,12 @@ class Network:
username: str = None, username: str = None,
limit: int = None): limit: int = None):
if limit is not None: if limit is not None:
logger.info(f'pulling top {limit} artists from {period.value} ' logger.info(f'pulling top {limit} artists from {period.value} for {username or self.username}')
f'for {self.username if username is None else username}')
else: else:
logger.info(f'pulling top artists from {period.value} ' logger.info(f'pulling top artists from {period.value} for {username or self.username}')
f'for {self.username if username is None else username}')
params = { params = {
'user': self.username if username is None else username, 'user': username or self.username,
'period': period.value 'period': period.value
} }
@ -368,8 +366,7 @@ class Network:
def get_weekly_charts(self, username: str = None): def get_weekly_charts(self, username: str = None):
logger.info('getting weekly chart list') logger.info('getting weekly chart list')
params = {'user': self.username if username is None else username} resp = self.get_request('user.getweeklychartlist', user=username or self.username)
resp = self.get_request('user.getweeklychartlist', params=params)
if resp: if resp:
return [WeeklyChart(from_time=int(i['from']), to_time=int(i['to'])) return [WeeklyChart(from_time=int(i['from']), to_time=int(i['to']))
for i in resp.get('weeklychartlist', {}).get('chart', [])] for i in resp.get('weeklychartlist', {}).get('chart', [])]
@ -396,13 +393,13 @@ class Network:
if limit is not None: if limit is not None:
logger.info(f'pulling top {limit} {object_type}s from {chart.from_date} to {chart.to_date} ' logger.info(f'pulling top {limit} {object_type}s from {chart.from_date} to {chart.to_date} '
f'for {self.username if username is None else username}') f'for {username or self.username}')
else: else:
logger.info(f'pulling top {object_type}s from {chart.from_date} to {chart.to_date} ' logger.info(f'pulling top {object_type}s from {chart.from_date} to {chart.to_date} '
f'for {self.username if username is None else username}') f'for {username or self.username}')
params = { params = {
'user': self.username if username is None else username, 'user': username or self.username,
'from': from_time, 'from': from_time,
'to': to_time 'to': to_time
} }
@ -422,7 +419,7 @@ class Network:
@staticmethod @staticmethod
def parse_wiki(wiki_dict) -> Optional[Wiki]: def parse_wiki(wiki_dict) -> Optional[Wiki]:
if wiki_dict: if wiki_dict:
return Wiki(date=datetime.strptime(wiki_dict.get('published', None), '%d %b %Y, %H:%M'), return Wiki(published=datetime.strptime(wiki_dict.get('published', None), '%d %b %Y, %H:%M'),
summary=wiki_dict.get('summary', None), summary=wiki_dict.get('summary', None),
content=wiki_dict.get('content', None)) content=wiki_dict.get('content', None))
else: else:
@ -606,18 +603,13 @@ class PageCollection:
items=items) items=items)
@dataclass
class Page: class Page:
def __init__(self, number: int
number: int, size: int
size: int, total: int
total: int, total_pages: int
total_pages: int, items: list
items: list):
self.number = number
self.size = size
self.total = total
self.total_pages = total_pages
self.items = items
def __len__(self): def __len__(self):
return len(self.items) return len(self.items)