diff --git a/.gitignore b/.gitignore index 0c6d0ce..e72b0af 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ env __pycache__ *.csv +.idea +.fm \ No newline at end of file diff --git a/backup.py b/backup.py index f5d34d5..82e876d 100644 --- a/backup.py +++ b/backup.py @@ -1,19 +1,39 @@ -import fmframework.io.csv as csvwrite -import fmframework.net.user as user +from fmframework.io.csv import export_scrobbles +from fmframework.net.network import Network -import sys, datetime, os +import sys +import os +import logging -def backupScrobbles(path): - userobj = user.User('sarsoo') +logger = logging.getLogger('fmframework') - scrobbles = userobj.getRecentTracks() +log_format = '%(asctime)s %(levelname)s %(name)s - %(funcName)s - %(message)s' + +file_handler = logging.FileHandler(".fm/backup.log") +formatter = logging.Formatter(log_format) +file_handler.setFormatter(formatter) + +logger.addHandler(file_handler) + +stream_log_format = '%(levelname)s %(name)s:%(funcName)s - %(message)s' +stream_formatter = logging.Formatter(stream_log_format) + +stream_handler = logging.StreamHandler() +stream_handler.setFormatter(stream_formatter) + +logger.addHandler(stream_handler) + + +def backup_scrobbles(file_path): + net = Network(username='sarsoo', api_key=os.environ['FMKEY']) + + scrobbles = net.get_recent_tracks() - path = sys.argv[1] - - if not os.path.exists(path): - os.makedirs(path) + if not os.path.exists(file_path): + os.makedirs(file_path) + + export_scrobbles(scrobbles, file_path) - csvwrite.exportScrobbles(scrobbles, path) if __name__ == '__main__': - backupScrobbles(sys.argv[1]) + backup_scrobbles(sys.argv[1]) diff --git a/fmframework/__init__.py b/fmframework/__init__.py index e69de29..b061bef 100644 --- a/fmframework/__init__.py +++ b/fmframework/__init__.py @@ -0,0 +1,4 @@ +import logging + +logger = logging.getLogger(__name__) +logger.setLevel('DEBUG') diff --git a/fmframework/io/csv.py b/fmframework/io/csv.py index b76dec7..8743fcc 100644 --- a/fmframework/io/csv.py +++ b/fmframework/io/csv.py @@ -1,26 +1,28 @@ -import csv +from csv import DictWriter import datetime - +import logging +from typing import List +from fmframework.model.fm import Scrobble +logger = logging.getLogger(__name__) headers = ['track', 'album', 'artist', 'time', 'track id', 'album id', 'artist id'] -def exportScrobbles(scrobbles, path): - date = str(datetime.datetime.now()).split(' ')[0] +def export_scrobbles(scrobbles: List[Scrobble], path: str): + logger.info(f'dumping {len(scrobbles)} to {path}') + date = str(datetime.date.today()) with open('{}/{}_scrobbles.csv'.format(path, date), 'w') as fileobj: - writer = csv.DictWriter(fileobj, fieldnames = headers) + writer = DictWriter(fileobj, fieldnames=headers) writer.writeheader() - for track in scrobbles: - if '@attr' not in track: - trackdict = { - 'track':track['name'].replace(';', '_').replace(',', '_'), - 'album':track['album']['#text'].replace(';', '_').replace(',', '_'), - 'artist':track['artist']['#text'].replace(';', '_').replace(',', '_'), - 'time': datetime.datetime.fromtimestamp(int(track['date']['uts'])), - 'track id':track['mbid'], - 'album id':track['album']['mbid'], - 'artist id':track['artist']['mbid']} - - writer.writerow(trackdict) + for scrobble in scrobbles: + writer.writerow({ + 'track': scrobble.track.name.replace(';', '_').replace(',', '_'), + 'album': scrobble.track.album.name.replace(';', '_').replace(',', '_'), + 'artist': scrobble.track.artist.name.replace(';', '_').replace(',', '_'), + 'time': scrobble.time, + 'track id': scrobble.track.mbid, + 'album id': scrobble.track.album.mbid, + 'artist id': scrobble.track.artist.mbid + }) diff --git a/fmframework/model/__init__.py b/fmframework/model/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/fmframework/model/album.py b/fmframework/model/album.py new file mode 100644 index 0000000..c6495ff --- /dev/null +++ b/fmframework/model/album.py @@ -0,0 +1,46 @@ +from fmframework.model.fm import LastFM, Wiki +from fmframework.model.artist import Artist +from fmframework.util.console import Color + + +class Album(LastFM): + def __init__(self, + name: str = None, + url: str = None, + mbid: str = None, + listeners: int = None, + play_count: int = None, + user_scrobbles: int = None, + wiki: Wiki = None, + artist: Artist = None, + ): + super().__init__(name=name, + url=url, + mbid=mbid, + listeners=listeners, + play_count=play_count, + user_scrobbles=user_scrobbles, + wiki=wiki) + self.artist = artist + + def __str__(self): + return f'{self.name} / {self.artist}' + + def __repr__(self): + return super().__repr__() + Color.DARKCYAN + Color.BOLD + ' Album' + Color.END + f': {self.artist}' + + @staticmethod + def wrap(name: str = None, + artist: str = None, + url: str = None, + mbid: str = None, + listeners: int = None, + play_count: int = None, + user_scrobbles: int = None): + return Album(name=name, + artist=Artist(name=artist), + url=url, + mbid=mbid, + listeners=listeners, + play_count=play_count, + user_scrobbles=user_scrobbles) diff --git a/fmframework/model/artist.py b/fmframework/model/artist.py new file mode 100644 index 0000000..c9eddd7 --- /dev/null +++ b/fmframework/model/artist.py @@ -0,0 +1,26 @@ +from fmframework.util.console import Color +from fmframework.model.fm import LastFM, Wiki + + +class Artist(LastFM): + def __init__(self, + name: str, + url: str = None, + mbid: str = None, + listeners: int = None, + play_count: int = None, + user_scrobbles: int = None, + wiki: Wiki = None): + super().__init__(name=name, + url=url, + mbid=mbid, + listeners=listeners, + play_count=play_count, + user_scrobbles=user_scrobbles, + wiki=wiki) + + def __str__(self): + return f'{self.name}' + + def __repr__(self): + return super().__repr__() + Color.PURPLE + Color.BOLD + ' Artist' + Color.END diff --git a/fmframework/model/fm.py b/fmframework/model/fm.py new file mode 100644 index 0000000..7d51864 --- /dev/null +++ b/fmframework/model/fm.py @@ -0,0 +1,61 @@ +from __future__ import annotations +from fmframework.util.console import Color +from datetime import datetime + +from typing import TYPE_CHECKING +if TYPE_CHECKING: + from fmframework.model.track import Track + + +class Wiki: + def __init__(self, + date: datetime = None, + summary: str = None, + content: str = None): + self.date = date + self.summary = summary + self.content = content + + def __repr__(self): + return Color.YELLOW + Color.BOLD + 'Wiki:' + Color.END + \ + f': {self.date}, {self.summary}, {self.content}' + + +class LastFM: + def __init__(self, + name: str = None, + url: str = None, + mbid: str = None, + listeners: int = None, + play_count: int = None, + user_scrobbles: int = None, + wiki: Wiki = None): + self.name = name + self.url = url, + self.mbid = mbid, + self.listeners = listeners + self.play_count = play_count + self.user_scrobbles = user_scrobbles + self.wiki = wiki + + def __str__(self): + return self.name + + def __repr__(self): + return Color.RED + Color.BOLD + 'LastFM' + Color.END + \ + f': {self.name}, user({self.user_scrobbles}), play_count({self.play_count}), ' \ + f'listeners({self.listeners}), wiki({self.wiki})' + + +class Scrobble: + def __init__(self, + track: Track = None, + time: datetime = None): + self.track = track + self.time = time + + def __str__(self): + return self.track + + def __repr__(self): + return Color.BLUE + Color.BOLD + 'Scrobble' + Color.END + f': {self.time} {repr(self.track)}' diff --git a/fmframework/model/track.py b/fmframework/model/track.py new file mode 100644 index 0000000..95d14d2 --- /dev/null +++ b/fmframework/model/track.py @@ -0,0 +1,53 @@ +from fmframework.model.fm import LastFM, Wiki +from fmframework.model.album import Album +from fmframework.model.artist import Artist +from fmframework.util.console import Color + + +class Track(LastFM): + def __init__(self, + name: str = None, + url: str = None, + mbid: str = None, + listeners: int = None, + play_count: int = None, + user_scrobbles: int = None, + wiki: Wiki = None, + album: Album = None, + artist: Artist = None, + ): + super().__init__(name=name, + url=url, + mbid=mbid, + listeners=listeners, + play_count=play_count, + user_scrobbles=user_scrobbles, + wiki=wiki) + self.album = album + self.artist = artist + + def __str__(self): + return f'{self.name} / {self.album} / {self.artist}' + + def __repr__(self): + return super().__repr__() + Color.YELLOW + Color.BOLD + ' Track' + Color.END + \ + f': album({repr(self.album)}), artist({repr(self.artist)})' + + @staticmethod + def wrap(name: str = None, + artist: str = None, + album: str = None, + album_artist: str = None, + url: str = None, + mbid: str = None, + listeners: int = None, + play_count: int = None, + user_scrobbles: int = None): + return Track(name=name, + album=Album.wrap(name=album, artist=album_artist), + artist=Artist(artist), + url=url, + mbid=mbid, + listeners=listeners, + play_count=play_count, + user_scrobbles=user_scrobbles) diff --git a/fmframework/net/network.py b/fmframework/net/network.py new file mode 100644 index 0000000..bf09da6 --- /dev/null +++ b/fmframework/net/network.py @@ -0,0 +1,264 @@ +import requests +from typing import Optional, List +from copy import deepcopy +import logging +from datetime import datetime + +from fmframework.model.fm import Scrobble, Wiki +from fmframework.model.track import Track +from fmframework.model.album import Album +from fmframework.model.artist import Artist + +logger = logging.getLogger(__name__) + + +class Network: + + def __init__(self, username, api_key): + self.api_key = api_key + + self.username = username + self.retry_counter = 0 + + def get_request(self, + method: str, + params: dict = None) -> Optional[dict]: + + data = { + "format": 'json', + "method": method, + "api_key": self.api_key, + } + if params is not None: + data.update(params) + + req = requests.get('http://ws.audioscrobbler.com/2.0/', params=data) + + if 200 <= req.status_code < 300: + logger.debug(f'{method} {req.status_code}') + return req.json() + else: + resp = req.json() + + code = resp.get('error', None) + message = resp.get('message', None) + + if code: + if code == 8: + if self.retry_counter < 5: + self.retry_counter += 1 + logger.warning(f'{method} {req.status_code} {code} {message} retyring') + return self.get_request(method, params) + else: + self.retry_counter = 0 + logger.error(f'{method} {req.status_code} {code} {message} retry limit reached') + else: + logger.error(f'{method} {req.status_code} {code} {message} retry limit reached') + else: + if message: + logger.error(f'{method} {req.status_code} {message}') + else: + logger.error(f'{method} {req.status_code}') + + def get_recent_tracks(self, + username: str = None, + limit: int = None, + from_time: datetime = None, + to_time: datetime = None) -> Optional[List[Scrobble]]: + if limit is not None: + logger.info(f'pulling {limit} tracks') + else: + logger.info(f'pulling all tracks') + + params = { + 'user': self.username if username is None else username + } + + if from_time is not None: + params['from'] = from_time.timestamp() + if to_time is not None: + params['to'] = to_time.timestamp() + + iterator = PageCollection(net=self, method='user.getrecenttracks', params=params, response_limit=limit) + iterator.response_limit = limit + iterator.load() + + return [self.parse_scrobble(i) for i in iterator.items] + + @staticmethod + def parse_wiki(wiki_dict) -> Optional[Wiki]: + if wiki_dict: + return Wiki(date=datetime.strptime(wiki_dict.get('published', None), '%d %b %Y, %H:%M'), + summary=wiki_dict.get('summary', None), + content=wiki_dict.get('content', None)) + else: + return None + + def parse_artist(self, artist_dict) -> Artist: + return Artist(name=artist_dict.get('name', 'n/a'), + url=artist_dict.get('url', None), + mbid=artist_dict.get('mbid', None), + listeners=artist_dict.get('listeners', None), + play_count=artist_dict.get('playcount', None), + user_scrobbles=artist_dict.get('userplaycount', None), + wiki=self.parse_wiki(artist_dict['wiki']) if artist_dict.get('wiki', None) else None) + + def parse_album(self, album_dict) -> Album: + return Album(name=album_dict.get('name', 'n/a'), + url=album_dict.get('url', 'n/a'), + mbid=album_dict.get('mbid', 'n/a'), + listeners=album_dict.get('listeners', 'n/a'), + play_count=album_dict.get('playcount', 'n/a'), + user_scrobbles=album_dict.get('userplaycount', 'n/a'), + wiki=self.parse_wiki(album_dict['wiki']) if album_dict.get('wiki', None) else None, + artist=album_dict.get('artist')) + + def parse_track(self, track_dict): + track = Track(name=track_dict.get('name', 'n/a'), + url=track_dict.get('url', 'n/a'), + mbid=track_dict.get('mbid', 'n/a'), + listeners=track_dict.get('listeners', 'n/a'), + play_count=track_dict.get('playcount', 'n/a'), + user_scrobbles=track_dict.get('userplaycount', 'n/a'), + wiki=self.parse_wiki(track_dict['wiki']) if track_dict.get('wiki', None) else None) + + if track_dict.get('album', None): + track.album = self.parse_album(track_dict['album']) + + if track_dict.get('artist', None): + track.album = self.parse_album(track_dict['artist']) + + return track + + @staticmethod + def parse_scrobble(scrobble_dict): + album = None + if scrobble_dict.get('album', None): + album = Album(name=scrobble_dict['album'].get('#text', 'n/a'), + mbid=scrobble_dict['album'].get('mbid', None)) + + artist = None + if scrobble_dict.get('artist', None): + artist = Artist(name=scrobble_dict['artist'].get('#text', 'n/a'), + mbid=scrobble_dict['artist'].get('mbid', None)) + + if artist is not None and album is not None: + if album.artist is None: + album.artist = artist + + track = Track(name=scrobble_dict.get('name', 'n/a'), + album=album, + artist=artist, + mbid=scrobble_dict.get('mbid', None), + url=scrobble_dict.get('url', None)) + + return Scrobble(track=track, time=datetime.fromtimestamp(int(scrobble_dict['date']['uts']))) + + +class PageCollection: + def __init__(self, + net: Network, + method: str, + params: dict = None, + page_limit: int = 50, + response_limit: int = 50): + self.net = net + self.method = method + self.params = params + self.pages: List[Page] = [] + self.page_limit = page_limit + self.response_limit = response_limit + self.counter = 1 + + def __len__(self): + length = 0 + for page in self.pages: + length += len(page.items) + return length + + @property + def total(self): + if len(self.pages) > 0: + return self.pages[0].total + return 0 + + @property + def items(self): + items = [] + for page in self.pages: + items += page.items + return items[:self.response_limit] + + def load(self): + if self.response_limit: + tracker = True + while len(self) < self.response_limit and tracker: + page = self.iterate() + if len(page) == 0: + tracker = False + else: + self.pages.append(page) + else: + tracker = True + while tracker: + page = self.iterate() + if len(page) == 0: + tracker = False + else: + self.pages.append(page) + + def iterate(self): + logger.debug(f'iterating {self.method}') + self.counter += 1 + + params = deepcopy(self.params) + + params.update({ + 'limit': self.page_limit, + 'page': self.counter + }) + resp = self.net.get_request(method=self.method, params=params) + + if resp: + return self.parse_page(resp) + # if len(page) > 0: + # if self.response_limit: + # if len(self) < self.response_limit: + # self.iterate() + # else: + # self.iterate() + else: + logger.error('no response') + + def add_page(self, page_dict): + page = self.parse_page(page_dict) + self.pages.append(page) + return page + + @staticmethod + def parse_page(page_dict): + first_value = list(page_dict.values())[0] + items = list(first_value.values())[1] + return Page( + number=first_value['@attr'].get('page', None), + size=first_value['@attr'].get('perPage', None), + total=first_value['@attr'].get('total', None), + total_pages=first_value['@attr'].get('totalPages', None), + items=items) + + +class Page: + def __init__(self, + number: int, + size: int, + total: int, + total_pages: int, + items: list): + self.number = number + self.size = size + self.total = total + self.total_pages = total_pages + self.items = items + + def __len__(self): + return len(self.items) diff --git a/fmframework/net/user.py b/fmframework/net/user.py deleted file mode 100644 index 145c407..0000000 --- a/fmframework/net/user.py +++ /dev/null @@ -1,52 +0,0 @@ -import os -import requests - -class User: - - def __init__(self, username, pagesize = 200): - self.api_key = os.environ['FMKEY'] - - self.username = username - self.pagesize = pagesize - - def __makeRequest(self, method, extra = {}, page = 1): - - data = { - "format": 'json', - "method": method, - "limit": self.pagesize, - "page": page, - "user": self.username, - "api_key": self.api_key - } - data.update(extra) - - req = requests.get('http://ws.audioscrobbler.com/2.0/', params = data) - - if req.status_code < 200 or req.status_code > 299: - - if req.json()['error'] == 8: - print('ERROR: retrying call ' + method) - return self.__makeRequest(method, extra, page) - else: - raise ValueError('HTTP Error Raised: ' + str(req.json()['error']) + ' ' + req.json()['message']) - - return req.json() - - def getRecentTracks(self, offset = 1, pagelimit = 0): - - scrobbles = [] - - print(str(offset) + ' offset') - - json = self.__makeRequest('user.getrecenttracks', page = offset) - scrobbles += json['recenttracks']['track'] - - if pagelimit > 0: - if offset < pagelimit and offset < int(json['recenttracks']['@attr']['totalPages']): - scrobbles += self.getRecentTracks(offset = offset + 1, pagelimit = pagelimit) - else: - if offset < int(json['recenttracks']['@attr']['totalPages']): - scrobbles += self.getRecentTracks(offset = offset + 1, pagelimit = pagelimit) - - return scrobbles diff --git a/fmframework/util/__init__.py b/fmframework/util/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/fmframework/util/console.py b/fmframework/util/console.py new file mode 100644 index 0000000..c27a63f --- /dev/null +++ b/fmframework/util/console.py @@ -0,0 +1,11 @@ +class Color: + PURPLE = '\033[95m' + CYAN = '\033[96m' + DARKCYAN = '\033[36m' + BLUE = '\033[94m' + GREEN = '\033[92m' + YELLOW = '\033[93m' + RED = '\033[91m' + BOLD = '\033[1m' + UNDERLINE = '\033[4m' + END = '\033[0m' diff --git a/main.py b/main.py deleted file mode 100644 index 02d3b7d..0000000 --- a/main.py +++ /dev/null @@ -1,12 +0,0 @@ -import fmframework.net.user as user - -if __name__ == '__main__': - print('hello world') - - sarsoo = user.User('sarsoo') - - tracks = sarsoo.getRecentTracks() - print(len(tracks)) - - for track in tracks: - print(track['name'])