diff --git a/README.md b/README.md index 9d4cd3b..0d3723d 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,7 @@ fmframework ================ -python scripting framework for last.fm \ No newline at end of file +Python net wrapper and scripting framework for Last.fm. +component library of [Music Tools](https://github.com/Sarsoo/Music-Tools). + +* Photo downloading and arrangement using OpenCV (Charts) \ No newline at end of file diff --git a/fmframework/chart/__init__.py b/fmframework/chart/__init__.py index b12e38b..0d6dbab 100644 --- a/fmframework/chart/__init__.py +++ b/fmframework/chart/__init__.py @@ -2,10 +2,8 @@ from bs4 import BeautifulSoup import requests from datetime import date -from fmframework.model.album import Album -from fmframework.model.artist import Artist -from fmframework.model.fm import Image -from fmframework.net.network import Network +from fmframework.model import Album, Artist, Image +from fmframework.net.network import Network, LastFMNetworkException import fmframework.image import logging @@ -35,7 +33,10 @@ def get_populated_album_chart(net: Network, username: str, from_date: date, to_d albums = [] for counter, scraped in enumerate(chart): logger.debug(f'populating {counter+1} of {len(chart)}') - albums.append(net.get_album(name=scraped.name, artist=scraped.artist.name)) + try: + albums.append(net.get_album(name=scraped.name, artist=scraped.artist.name)) + except LastFMNetworkException as e: + logger.error(f'error occured during album retrieval - {e}') return albums diff --git a/fmframework/image/__init__.py b/fmframework/image/__init__.py index 75767e3..2c3d033 100644 --- a/fmframework/image/__init__.py +++ b/fmframework/image/__init__.py @@ -1,7 +1,7 @@ import numpy as np from typing import List from fmframework.net.network import Network, ImageSizeNotAvailableException -from fmframework.model.fm import Image +from fmframework.model import Image import logging logger = logging.getLogger(__name__) diff --git a/fmframework/io/csv.py b/fmframework/io/csv.py index 8743fcc..d49927c 100644 --- a/fmframework/io/csv.py +++ b/fmframework/io/csv.py @@ -2,7 +2,8 @@ from csv import DictWriter import datetime import logging from typing import List -from fmframework.model.fm import Scrobble +from fmframework.model import Scrobble + logger = logging.getLogger(__name__) headers = ['track', 'album', 'artist', 'time', 'track id', 'album id', 'artist id'] diff --git a/fmframework/model/__init__.py b/fmframework/model/__init__.py index e69de29..4a63ce4 100644 --- a/fmframework/model/__init__.py +++ b/fmframework/model/__init__.py @@ -0,0 +1,96 @@ +from dataclasses import dataclass +from datetime import datetime +from enum import Enum +from typing import List + + +class Image: + class Size(Enum): + other = 0 + small = 1 + medium = 2 + large = 3 + extralarge = 4 + mega = 5 + + def __init__(self, size: Size, link: str): + self.size = size + self.link = link + + def __str__(self): + return f'{self.size.name} - {self.link}' + + +@dataclass +class Wiki: + published: datetime = None + summary: str = None + content: str = None + + def __post_init__(self): + if isinstance(self.published, str): + self.published = datetime.strptime(self.published, '%d %b %Y, %H:%M') + + +@dataclass +class LastFM: + name: str = None + url: str = None + mbid: str = None + listeners: int = None + play_count: int = None + user_scrobbles: int = None + wiki: Wiki = None + images: List[Image] = None + + def __str__(self): + return self.name + + +@dataclass +class Artist(LastFM): + def __str__(self): + return f'{self.name}' + + +@dataclass +class Album(LastFM): + artist: Artist = None + + def __str__(self): + return f'{self.name} / {self.artist}' + + +@dataclass +class Track(LastFM): + album: Album = None + artist: Artist = None + + def __str__(self): + return f'{self.name} / {self.album} / {self.artist}' + + +class WeeklyChart: + def __init__(self, from_time, to_time): + self.from_secs = from_time + self.to_secs = to_time + + @property + def from_date(self): + return datetime.fromtimestamp(self.from_secs) + + @property + def to_date(self): + return datetime.fromtimestamp(self.to_secs) + + def __str__(self): + return f'{self.from_secs} -> {self.to_secs}' + + +@dataclass +class Scrobble: + track: Track = None + time: datetime = None + + def __str__(self): + return self.track diff --git a/fmframework/model/album.py b/fmframework/model/album.py deleted file mode 100644 index 8aae056..0000000 --- a/fmframework/model/album.py +++ /dev/null @@ -1,49 +0,0 @@ -from typing import List -from fmframework.model.fm import LastFM, Wiki, Image -from fmframework.model.artist import Artist -from fmframework.util.console import Color - - -class Album(LastFM): - def __init__(self, - name: str = None, - url: str = None, - mbid: str = None, - listeners: int = None, - play_count: int = None, - user_scrobbles: int = None, - wiki: Wiki = None, - artist: Artist = None, - images: List[Image] = None): - super().__init__(name=name, - url=url, - mbid=mbid, - listeners=listeners, - play_count=play_count, - user_scrobbles=user_scrobbles, - wiki=wiki, - images=images) - self.artist = artist - - def __str__(self): - return f'{self.name} / {self.artist}' - - def __repr__(self): - return Color.DARKCYAN + Color.BOLD + 'Album' + Color.END + f': {self.name} artist({repr(self.artist)}) ' \ - + super().__repr__() - - @staticmethod - def wrap(name: str = None, - artist: str = None, - url: str = None, - mbid: str = None, - listeners: int = None, - play_count: int = None, - user_scrobbles: int = None): - return Album(name=name, - artist=Artist(name=artist), - url=url, - mbid=mbid, - listeners=listeners, - play_count=play_count, - user_scrobbles=user_scrobbles) diff --git a/fmframework/model/artist.py b/fmframework/model/artist.py deleted file mode 100644 index 70ac84b..0000000 --- a/fmframework/model/artist.py +++ /dev/null @@ -1,29 +0,0 @@ -from typing import List -from fmframework.util.console import Color -from fmframework.model.fm import LastFM, Wiki, Image - - -class Artist(LastFM): - def __init__(self, - name: str, - url: str = None, - mbid: str = None, - listeners: int = None, - play_count: int = None, - user_scrobbles: int = None, - wiki: Wiki = None, - images: List[Image] = None): - super().__init__(name=name, - url=url, - mbid=mbid, - listeners=listeners, - play_count=play_count, - user_scrobbles=user_scrobbles, - wiki=wiki, - images=images) - - def __str__(self): - return f'{self.name}' - - def __repr__(self): - return Color.PURPLE + Color.BOLD + 'Artist' + Color.END + f': {self.name} ' + super().__repr__() diff --git a/fmframework/model/fm.py b/fmframework/model/fm.py deleted file mode 100644 index 632aaf2..0000000 --- a/fmframework/model/fm.py +++ /dev/null @@ -1,99 +0,0 @@ -from __future__ import annotations -from fmframework.util.console import Color -from datetime import datetime -from enum import Enum -from typing import List - -from typing import TYPE_CHECKING -if TYPE_CHECKING: - from fmframework.model.track import Track - - -class Image: - class Size(Enum): - other = 0 - small = 1 - medium = 2 - large = 3 - extralarge = 4 - mega = 5 - - def __init__(self, size: Size, link: str): - self.size = size - self.link = link - - def __str__(self): - return f'{self.size.name} - {self.link}' - - -class Wiki: - def __init__(self, - date: datetime = None, - summary: str = None, - content: str = None): - self.date = date - self.summary = summary - self.content = content - - def __repr__(self): - return Color.YELLOW + Color.BOLD + 'Wiki:' + Color.END + \ - f': {self.date}, {self.summary}, {self.content}' - - -class LastFM: - def __init__(self, - name: str = None, - url: str = None, - mbid: str = None, - listeners: int = None, - play_count: int = None, - user_scrobbles: int = None, - wiki: Wiki = None, - images: List[Image] = None): - self.name = name - self.url = url - self.mbid = mbid - self.listeners = listeners - self.play_count = play_count - self.user_scrobbles = user_scrobbles - self.wiki = wiki - self.images = images - - def __str__(self): - return self.name - - def __repr__(self): - return Color.RED + Color.BOLD + 'LastFM' + Color.END + \ - f': {self.name}, user({self.user_scrobbles}), play_count({self.play_count}), ' \ - f'listeners({self.listeners}), wiki({self.wiki})' - - -class WeeklyChart: - def __init__(self, from_time, to_time): - self.from_secs = from_time - self.to_secs = to_time - - @property - def from_date(self): - return datetime.fromtimestamp(self.from_secs) - - @property - def to_date(self): - return datetime.fromtimestamp(self.to_secs) - - def __str__(self): - return f'{self.from_secs} -> {self.to_secs}' - - -class Scrobble: - def __init__(self, - track: Track = None, - time: datetime = None): - self.track = track - self.time = time - - def __str__(self): - return self.track - - def __repr__(self): - return Color.BLUE + Color.BOLD + 'Scrobble' + Color.END + f': {self.time} {repr(self.track)}' diff --git a/fmframework/model/track.py b/fmframework/model/track.py deleted file mode 100644 index cac3d40..0000000 --- a/fmframework/model/track.py +++ /dev/null @@ -1,55 +0,0 @@ -from typing import List -from fmframework.model.fm import LastFM, Wiki, Image -from fmframework.model.album import Album -from fmframework.model.artist import Artist -from fmframework.util.console import Color - - -class Track(LastFM): - def __init__(self, - name: str = None, - url: str = None, - mbid: str = None, - listeners: int = 0, - play_count: int = 0, - user_scrobbles: int = 0, - wiki: Wiki = None, - album: Album = None, - artist: Artist = None, - images: List[Image] = None): - super().__init__(name=name, - url=url, - mbid=mbid, - listeners=listeners, - play_count=play_count, - user_scrobbles=user_scrobbles, - wiki=wiki, - images=images) - self.album = album - self.artist = artist - - def __str__(self): - return f'{self.name} / {self.album} / {self.artist}' - - def __repr__(self): - return Color.YELLOW + Color.BOLD + 'Track' + Color.END + \ - f': {self.name} album({repr(self.album)}), artist({repr(self.artist)}) ' + super().__repr__() - - @staticmethod - def wrap(name: str = None, - artist: str = None, - album: str = None, - album_artist: str = None, - url: str = None, - mbid: str = None, - listeners: int = None, - play_count: int = None, - user_scrobbles: int = None): - return Track(name=name, - album=Album.wrap(name=album, artist=album_artist), - artist=Artist(artist), - url=url, - mbid=mbid, - listeners=listeners, - play_count=play_count, - user_scrobbles=user_scrobbles) diff --git a/fmframework/net/network.py b/fmframework/net/network.py index e125554..9af0e18 100644 --- a/fmframework/net/network.py +++ b/fmframework/net/network.py @@ -1,4 +1,5 @@ import requests +from dataclasses import dataclass from typing import Optional, List, Union from copy import deepcopy import logging @@ -9,10 +10,7 @@ from datetime import datetime, date, time, timedelta import numpy as np import cv2 -from fmframework.model.fm import Scrobble, Wiki, Image, WeeklyChart -from fmframework.model.track import Track -from fmframework.model.album import Album -from fmframework.model.artist import Artist +from fmframework.model import Album, Artist, Image, Wiki, WeeklyChart, Scrobble, Track from fmframework import config_directory logger = logging.getLogger(__name__) @@ -22,6 +20,16 @@ class ImageSizeNotAvailableException(Exception): pass +@dataclass +class LastFMNetworkException(Exception): + http_code: int + error_code: int + message: str = None + + def __str__(self): + return "Last.fm Network Exception: (%s/%s) %s" % (self.http_code, self.error_code, self.message) + + class Network: class Range(Enum): @@ -36,11 +44,55 @@ class Network: self.api_key = api_key self.username = username + self.rsession = requests.Session() self.retry_counter = 0 + def net_call(self, + http_method: str, + method: str, + params: dict = None, + data: dict = None, + json: dict = None, + headers: dict = None) -> dict: + + http_method = http_method.strip().upper() + + response = self.rsession.request(method=http_method, + url='http://ws.audioscrobbler.com/2.0/', + headers=headers, + params=params, + json=json, + data=data) + resp = response.json() + + if 200 <= response.status_code < 300: + logger.debug(f'{http_method} {method} {response.status_code}') + return resp + + code = resp.get('error', None) + message = resp.get('message', None) + + if code: + if code in [8, 11, 16]: + if self.retry_counter < 5: + self.retry_counter += 1 + logger.warning(f'{method} {response.status_code} {code} {message} retyring') + return self.net_call(http_method=http_method, + method=method, + params=params, + data=data, + json=json, + headers=headers) + else: + self.retry_counter = 0 + + logger.error(f'{method} {response.status_code} {code} {message} retry limit reached') + raise LastFMNetworkException(http_code=response.status_code, error_code=code, message=message) + def get_request(self, method: str, - params: dict = None) -> Optional[dict]: + params: dict = None, + **kwargs) -> dict: data = { "format": 'json', @@ -49,50 +101,20 @@ class Network: } if params is not None: data.update(params) + if kwargs is not None: + data.update({i: j for i, j in kwargs.items() if j is not None}) - req = requests.get('http://ws.audioscrobbler.com/2.0/', params=data) + return self.net_call(http_method='GET', method=method, params=data) - if 200 <= req.status_code < 300: - logger.debug(f'{method} {req.status_code}') - return req.json() - else: - resp = req.json() - - code = resp.get('error', None) - message = resp.get('message', None) - - if code: - if code == 8: - if self.retry_counter < 5: - self.retry_counter += 1 - logger.warning(f'{method} {req.status_code} {code} {message} retyring') - return self.get_request(method, params) - else: - self.retry_counter = 0 - logger.error(f'{method} {req.status_code} {code} {message} retry limit reached') - else: - logger.error(f'{method} {req.status_code} {code} {message} retry limit reached') - else: - if message: - logger.error(f'{method} {req.status_code} {message}') - else: - logger.error(f'{method} {req.status_code}') - - def get_user_scrobble_count(self, username: str = None): + def get_user_scrobble_count(self, username: str = None) -> int: if username is None: username = self.username logger.info(f'getting scrobble count {username}') - - params = { - 'user': username - } - - resp = self.get_request(method='user.getinfo', params=params) - - if resp: - return int(resp.get('user', {}).get('playcount', None)) - else: - logger.error('no response') + return int( + self.get_request(method='user.getinfo', user=username) + .get('user', {}) + .get('playcount', None) + ) def get_recent_tracks(self, username: str = None, @@ -106,7 +128,7 @@ class Network: logger.info(f'pulling all tracks') params = { - 'user': self.username if username is None else username + 'user': username or self.username } if from_time is not None: @@ -130,21 +152,19 @@ class Network: input_date: date, username: str = None, limit: int = None) -> Optional[List[Scrobble]]: - logger.info(f'getting {input_date} scrobbles for {self.username if username is None else username}') + logger.info(f'getting {input_date} scrobbles for {username or self.username}') midnight = time(hour=0, minute=0, second=0) from_date = datetime.combine(date=input_date, time=midnight) to_date = datetime.combine(date=input_date + timedelta(days=1), time=midnight) - scrobbles = self.get_recent_tracks(username=username, from_time=from_date, to_time=to_date, limit=limit) - - return scrobbles + return self.get_recent_tracks(username=username, from_time=from_date, to_time=to_date, limit=limit) def get_scrobble_count_from_date(self, input_date: date, username: str = None, limit: int = None) -> int: - logger.info(f'getting {input_date} scrobble count for {self.username if username is None else username}') + logger.info(f'getting {input_date} scrobble count for {username or self.username}') scrobbles = self.get_scrobbles_from_date(input_date=input_date, username=username, limit=limit) @@ -157,78 +177,59 @@ class Network: name: str, artist: str, username: str = None) -> Optional[Track]: - logger.info(f'getting {name} / {artist} for {self.username if username is None else username}') + logger.info(f'getting {name} / {artist} for {username or self.username}') - params = { - 'track': name, - 'artist': artist, - 'user': self.username if username is None else username - } + resp = self.get_request('track.getInfo', + track=name, + artist=artist, + user=username or self.username) - resp = self.get_request('track.getInfo', params=params) - - if resp: - if resp.get('track'): - return self.parse_track(resp['track']) - else: - logging.error(f'abnormal response - {resp}') + if resp.get('track'): + return self.parse_track(resp['track']) else: - logger.error('no response') + logging.error(f'abnormal response - {resp}') def get_album(self, name: str, artist: str, username: str = None) -> Optional[Album]: - logger.info(f'getting {name} / {artist} for {self.username if username is None else username}') + logger.info(f'getting {name} / {artist} for {username or self.username}') - params = { - 'album': name, - 'artist': artist, - 'user': self.username if username is None else username - } + resp = self.get_request('album.getInfo', + album=name, + artist=artist, + user=username or self.username) - resp = self.get_request('album.getInfo', params=params) - - if resp: - if resp.get('album'): - return self.parse_album(resp['album']) - else: - logging.error(f'abnormal response - {resp}') + if resp.get('album'): + return self.parse_album(resp['album']) else: - logger.error('no response') + logging.error(f'abnormal response - {resp}') def get_artist(self, name: str, username: str = None) -> Optional[Artist]: - logger.info(f'getting {name} for {self.username if username is None else username}') + logger.info(f'getting {name} for {username or self.username}') - params = { - 'artist': name, - 'user': self.username if username is None else username - } + resp = self.get_request('artist.getInfo', + artist=name, + user=username or self.username) - resp = self.get_request('artist.getInfo', params=params) - - if resp: - if resp.get('artist'): - return self.parse_artist(resp['artist']) - else: - logging.error(f'abnormal response - {resp}') + if resp.get('artist'): + return self.parse_artist(resp['artist']) else: - logger.error('no response') + logging.error(f'abnormal response - {resp}') def get_top_tracks(self, period: Range, username: str = None, limit: int = None): if limit is not None: - logger.info(f'pulling top {limit} tracks from {period.value} ' - f'for {self.username if username is None else username}') + logger.info(f'pulling top {limit} tracks from {period.value} for {username or self.username}') else: - logger.info(f'pulling top tracks from {period.value} for {self.username if username is None else username}') + logger.info(f'pulling top tracks from {period.value} for {username or self.username}') params = { - 'user': self.username if username is None else username, + 'user': username or self.username, 'period': period.value } @@ -242,13 +243,12 @@ class Network: username: str = None, limit: int = None): if limit is not None: - logger.info(f'pulling top {limit} albums from {period.value} ' - f'for {self.username if username is None else username}') + logger.info(f'pulling top {limit} albums from {period.value} for {username or self.username}') else: - logger.info(f'pulling top albums from {period.value} for {self.username if username is None else username}') + logger.info(f'pulling top albums from {period.value} for {username or self.username}') params = { - 'user': self.username if username is None else username, + 'user': username or self.username, 'period': period.value } @@ -262,14 +262,12 @@ class Network: username: str = None, limit: int = None): if limit is not None: - logger.info(f'pulling top {limit} artists from {period.value} ' - f'for {self.username if username is None else username}') + logger.info(f'pulling top {limit} artists from {period.value} for {username or self.username}') else: - logger.info(f'pulling top artists from {period.value} ' - f'for {self.username if username is None else username}') + logger.info(f'pulling top artists from {period.value} for {username or self.username}') params = { - 'user': self.username if username is None else username, + 'user': username or self.username, 'period': period.value } @@ -368,8 +366,7 @@ class Network: def get_weekly_charts(self, username: str = None): logger.info('getting weekly chart list') - params = {'user': self.username if username is None else username} - resp = self.get_request('user.getweeklychartlist', params=params) + resp = self.get_request('user.getweeklychartlist', user=username or self.username) if resp: return [WeeklyChart(from_time=int(i['from']), to_time=int(i['to'])) for i in resp.get('weeklychartlist', {}).get('chart', [])] @@ -396,13 +393,13 @@ class Network: if limit is not None: logger.info(f'pulling top {limit} {object_type}s from {chart.from_date} to {chart.to_date} ' - f'for {self.username if username is None else username}') + f'for {username or self.username}') else: logger.info(f'pulling top {object_type}s from {chart.from_date} to {chart.to_date} ' - f'for {self.username if username is None else username}') + f'for {username or self.username}') params = { - 'user': self.username if username is None else username, + 'user': username or self.username, 'from': from_time, 'to': to_time } @@ -422,7 +419,7 @@ class Network: @staticmethod def parse_wiki(wiki_dict) -> Optional[Wiki]: if wiki_dict: - return Wiki(date=datetime.strptime(wiki_dict.get('published', None), '%d %b %Y, %H:%M'), + return Wiki(published=datetime.strptime(wiki_dict.get('published', None), '%d %b %Y, %H:%M'), summary=wiki_dict.get('summary', None), content=wiki_dict.get('content', None)) else: @@ -606,18 +603,13 @@ class PageCollection: items=items) +@dataclass class Page: - def __init__(self, - number: int, - size: int, - total: int, - total_pages: int, - items: list): - self.number = number - self.size = size - self.total = total - self.total_pages = total_pages - self.items = items + number: int + size: int + total: int + total_pages: int + items: list def __len__(self): return len(self.items)