diff --git a/fmframework/net/__init__.py b/fmframework/net/__init__.py index e69de29..0708c1d 100644 --- a/fmframework/net/__init__.py +++ b/fmframework/net/__init__.py @@ -0,0 +1 @@ +from .network import Network, LastFMNetworkException diff --git a/fmframework/net/scrape.py b/fmframework/net/scrape.py index 047ca9d..fa8e08a 100644 --- a/fmframework/net/scrape.py +++ b/fmframework/net/scrape.py @@ -1,4 +1,4 @@ -from datetime import date, datetime +from datetime import date, datetime, timedelta from typing import Union from bs4 import BeautifulSoup @@ -33,6 +33,30 @@ class LibraryScraper: else: raise TypeError(f'invalid period provided, {period} / {type(period)}') + @staticmethod + def artist_scrobbles(username: str, artist: str, net: Network = None, whole_track=True, + from_date: datetime = None, to_date: datetime = None, + date_preset: str = None, reverse: bool = False): + logger.info(f"loading {artist}'s tracks for {username}") + + artists_tracks = LibraryScraper.artist_tracks(username=username, artist=artist, net=net, + whole_track=False, from_date=from_date, to_date=to_date, + date_preset=date_preset) + + scrobbles = [] + for track in artists_tracks: + tracks_scrobbles = LibraryScraper.track_scrobbles(username=username, artist=artist, track=track.name, + net=net, whole_track=whole_track, + from_date=from_date, to_date=to_date, + date_preset=date_preset) + + if tracks_scrobbles is not None: + scrobbles += tracks_scrobbles + else: + logger.warning(f'no scrobbles returned for {track.name} / {track.artist.name} / {username}') + + return sorted(scrobbles, key=lambda x: x.time, reverse=reverse) + @staticmethod def artist_tracks(username: str, artist: str, net: Network = None, whole_track=True, from_date: datetime = None, to_date: datetime = None, @@ -90,9 +114,9 @@ class LibraryScraper: count_cell = album.find(class_='chartlist-count-bar-value') track_objects.append(Track(name=name_cell.string, - artist=Artist(name=artist), - url=name_cell['href'], - user_scrobbles=int(count_cell.contents[0].strip()))) + artist=Artist(name=artist), + url=name_cell['href'], + user_scrobbles=int(count_cell.contents[0].strip()))) return track_objects else: @@ -164,6 +188,34 @@ class LibraryScraper: else: logger.error(f'no albums returned for page 1 of {artist} / {username}') + @staticmethod + def album_scrobbles(username: str, artist: str, album: str, net: Network = None, whole_track=True, + from_date: datetime = None, to_date: datetime = None, + date_preset: str = None, reverse: bool = False): + logger.info(f"loading {album} / {artist}'s tracks for {username}") + + albums_tracks = LibraryScraper.album_tracks(username=username, artist=artist, album=album, net=net, + whole_track=False, from_date=from_date, to_date=to_date, + date_preset=date_preset) + + if albums_tracks is None: + logger.error(f'no tracks returned for {album} / {artist} / {username}') + return + + scrobbles = [] + for track in albums_tracks: + tracks_scrobbles = LibraryScraper.track_scrobbles(username=username, artist=artist, track=track.name, + net=net, whole_track=whole_track, + from_date=from_date, to_date=to_date, + date_preset=date_preset) + + if tracks_scrobbles is not None: + scrobbles += tracks_scrobbles + else: + logger.warning(f'no scrobbles returned for {track.name} / {track.artist.name} / {username}') + + return sorted(scrobbles, key=lambda x: x.time, reverse=reverse) + @staticmethod def album_tracks(username: str, artist: str, album: str, net: Network = None, whole_track=True, from_date: datetime = None, to_date: datetime = None, @@ -232,6 +284,7 @@ class LibraryScraper: else: logger.error(f'no tracks returned for page 1 of {album} / {artist} / {username}') + # TODO cache pulled album to reduce requests @staticmethod def track_scrobbles(username: str, artist: str, track: str, net: Network = None, whole_track=True, from_date: datetime = None, to_date: datetime = None, @@ -245,19 +298,21 @@ class LibraryScraper: if whole_track and net is None: raise NameError('Network required for populating tracks') - populated_tracks = [] + populated_scrobbles = [] if tracks is not None: if whole_track: - for track in tracks: - pulled_track = net.track(name=track.track.name, - artist=track.track.artist.name, - username=username) - pulled_track.album = net.album(name=track.track.album.name, - artist=track.track.album.name, - username=username) - populated_tracks.append(pulled_track) + for scrobble in tracks: + pulled_scrobble = net.track(name=scrobble.track.name, + artist=scrobble.track.artist.name, + username=username) + pulled_scrobble.album = net.album(name=scrobble.track.album.name, + artist=scrobble.track.album.artist.name, + username=username) - return populated_tracks + scrobble.track = pulled_scrobble + populated_scrobbles.append(scrobble) + + return populated_scrobbles else: return tracks else: @@ -301,11 +356,14 @@ class LibraryScraper: timestamp_parts = [i.strip() for i in scrobble_timestamp.string.split(', ')] if len(timestamp_parts) == 1: - scrobble_datetime = datetime.strptime(timestamp_parts[0], '%d %b %I:%M%p') - scrobble_datetime = scrobble_datetime.replace(year=date.today().year) + try: + scrobble_datetime = datetime.strptime(timestamp_parts[0], '%d %b %I:%M%p') # this year + scrobble_datetime = scrobble_datetime.replace(year=date.today().year) + except ValueError: + scrobble_datetime = datetime.now() - timedelta(hours=int(timestamp_parts[0][0])) # X hours ago elif len(timestamp_parts) == 2: recombined = ' '.join(timestamp_parts) - scrobble_datetime = datetime.strptime(recombined, '%d %b %Y %I:%M%p') + scrobble_datetime = datetime.strptime(recombined, '%d %b %Y %I:%M%p') # previous year else: scrobble_datetime = None logger.error(f'{len(timestamp_parts)} timestamp parts found, {timestamp_parts}') @@ -366,22 +424,26 @@ class LibraryScraper: else: raise TypeError(f'invalid period provided, {date_preset} / {type(date_preset)}') - html = LibraryScraper.rsession.get(url) + headers = { + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", + "Accept-Encoding": "gzip, deflate, br", + "Accept-Language": "en-GB,en;q=0.5", + "DNT": "1", + "Host": "www.last.fm", + "Upgrade-Insecure-Requests": "1", + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:75.0) Gecko/20100101 Firefox/75.0", + } + html = LibraryScraper.rsession.get(url, headers=headers) if 200 <= html.status_code < 300: parser = BeautifulSoup(html.content, 'html.parser') - list_section = parser.find('table', class_='chartlist') + objs = [i for i in parser.find_all('tr') if i.find('td', class_='chartlist-name')] - if list_section: - objs = [i for i in list_section.tbody.find_all('tr') if i.find('td', class_='chartlist-name')] - - if include_pages: - return objs, len(parser.find_all('li', class_='pagination-page')) - else: - return objs + if include_pages: + return objs, len(parser.find_all('li', class_='pagination-page')) else: - logger.error(f'no objects scrobbled for {artist} by {username}') + return objs else: logger.error(f'HTTP error occurred {html.status_code}') @@ -430,16 +492,23 @@ class UserScraper: logger.debug(f'loading page {page} from {from_date} to {to_date} for {username}') + headers = { + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", + "Accept-Encoding": "gzip, deflate, br", + "Accept-Language": "en-GB,en;q=0.5", + "DNT": "1", + "Host": "www.last.fm", + "Upgrade-Insecure-Requests": "1", + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:75.0) Gecko/20100101 Firefox/75.0", + } html = UserScraper.rsession.get(f'https://www.last.fm/user/{username}/library/albums' f'?from={from_date.strftime("%Y-%m-%d")}' f'&to={to_date.strftime("%Y-%m-%d")}' - f'&page={page}') + f'&page={page}', + headers=headers) if 200 <= html.status_code < 300: parser = BeautifulSoup(html.content, 'html.parser') - - chart_section = parser.find('section', id='top-albums-section') - - rows = chart_section.find_all('tr', 'chartlist-row') + rows = parser.find_all('tr', 'chartlist-row') albums = [] for row in rows: @@ -463,4 +532,4 @@ class UserScraper: return albums else: - logger.error(f'HTTP error occurred {html.status_code}') \ No newline at end of file + logger.error(f'HTTP error occurred {html.status_code}')