From ebe2fc94bfc235c410a222fffb57e9630eebec83 Mon Sep 17 00:00:00 2001 From: aj Date: Sun, 9 Aug 2020 12:50:15 +0100 Subject: [PATCH] added scraping methods for unavailable api methods, added duration to track for timing --- fmframework/chart/__init__.py | 84 ------- fmframework/image/__init__.py | 12 +- fmframework/model/__init__.py | 1 + fmframework/net/network.py | 1 + fmframework/net/scrape.py | 438 ++++++++++++++++++++++++++++++++++ 5 files changed, 446 insertions(+), 90 deletions(-) delete mode 100644 fmframework/chart/__init__.py create mode 100644 fmframework/net/scrape.py diff --git a/fmframework/chart/__init__.py b/fmframework/chart/__init__.py deleted file mode 100644 index 189951f..0000000 --- a/fmframework/chart/__init__.py +++ /dev/null @@ -1,84 +0,0 @@ -from bs4 import BeautifulSoup -import requests -from datetime import date - -from fmframework.model import Album, Artist -from fmframework.net.network import Network, LastFMNetworkException - -import logging - -logger = logging.getLogger(__name__) - - -def get_populated_album_chart(net: Network, username: str, from_date: date, to_date: date, limit: int): - """Scrape chart from last.fm frontend before pulling each from the backend for a complete object""" - - chart = get_scraped_album_chart(username or net.username, from_date, to_date, limit) - logger.info('populating scraped albums') - albums = [] - for counter, scraped in enumerate(chart): - logger.debug(f'populating {counter+1} of {len(chart)}') - try: - albums.append(net.get_album(name=scraped.name, artist=scraped.artist.name)) - except LastFMNetworkException: - logger.exception(f'error occured during album retrieval') - - return albums - - -def get_scraped_album_chart(username: str, from_date: date, to_date: date, limit: int): - """Scrape 'light' objects from last.fm frontend based on date range and limit""" - - logger.info(f'scraping album chart from {from_date} to {to_date} for {username}') - - pages = int(limit / 50) - if limit % 50 != 0: - pages += 1 - - albums = [] - for i in range(pages): - scraped_albums = get_scraped_album_chart_page(username, from_date, to_date, i + 1) - if scraped_albums is not None: - albums += scraped_albums - - return albums[:limit] - - -def get_scraped_album_chart_page(username: str, from_date: date, to_date: date, page: int): - """Scrape 'light' objects single page of last.fm frontend based on date range""" - - logger.debug(f'loading page {page} from {from_date} to {to_date} for {username}') - - html = requests.get(f'https://www.last.fm/user/{username}/library/albums' - f'?from={from_date.strftime("%Y-%m-%d")}' - f'&to={to_date.strftime("%Y-%m-%d")}' - f'&page={page}') - if 200 <= html.status_code < 300: - parser = BeautifulSoup(html.content, 'html.parser') - - chart_section = parser.find('section', id='top-albums-section') - - rows = chart_section.find_all('tr', 'chartlist-row') - - albums = [] - for row in rows: - names = row.find_all('a', title=True) - album_name = names[0]['title'] - artist_name = names[1]['title'] - - scrobble_tag = row.find('span', {"class": "chartlist-count-bar-value"}) - scrobble_count = [int(s) for s in scrobble_tag.contents[0].split() if s.isdigit()] - - if len(scrobble_count) != 1: - logger.error('no scrobble count integers found') - scrobble_count = 0 - else: - scrobble_count = scrobble_count[0] - - artist = Artist(name=artist_name) - album = Album(name=album_name, artist=artist, user_scrobbles=scrobble_count) - albums.append(album) - - return albums - else: - logger.error(f'HTTP error occurred {html.status_code}') diff --git a/fmframework/image/__init__.py b/fmframework/image/__init__.py index d166d7c..ee0b96c 100644 --- a/fmframework/image/__init__.py +++ b/fmframework/image/__init__.py @@ -3,7 +3,7 @@ from typing import List from datetime import date from fmframework.net.network import Network -from fmframework.chart import get_populated_album_chart +from fmframework.net.scrape import UserScraper from fmframework.image.downloader import Downloader, ImageSizeNotAvailableException from fmframework.model import Image @@ -121,11 +121,11 @@ class AlbumChartCollage: image_width: int = 5, check_cache=True, cache=True): - chart = get_populated_album_chart(net=net, - username=username, - from_date=from_date, - to_date=to_date, - limit=limit) + chart = UserScraper.get_album_chart(net=net, + username=username, + from_date=from_date, + to_date=to_date, + limit=limit) return get_image_grid_from_objects(objects=chart, image_size=image_size, image_width=image_width, diff --git a/fmframework/model/__init__.py b/fmframework/model/__init__.py index c49a76d..b005eca 100644 --- a/fmframework/model/__init__.py +++ b/fmframework/model/__init__.py @@ -73,6 +73,7 @@ class Album(LastFM): class Track(LastFM): album: Album = None artist: Artist = None + duration: int = None def __str__(self): return f'{self.name} / {self.album} / {self.artist}' diff --git a/fmframework/net/network.py b/fmframework/net/network.py index 0925983..62bc0db 100644 --- a/fmframework/net/network.py +++ b/fmframework/net/network.py @@ -367,6 +367,7 @@ class Network: mbid=track_dict.get('mbid', 'n/a'), listeners=int(track_dict.get('listeners', 0)), play_count=int(track_dict.get('playcount', 0)), + duration=int(track_dict['duration']) if track_dict.get('duration') else None, user_scrobbles=int(track_dict.get('userplaycount', 0)), wiki=self.parse_wiki(track_dict['wiki']) if track_dict.get('wiki', None) else None, images=[self.parse_image(i) for i in track_dict.get('image', [])]) diff --git a/fmframework/net/scrape.py b/fmframework/net/scrape.py new file mode 100644 index 0000000..986daef --- /dev/null +++ b/fmframework/net/scrape.py @@ -0,0 +1,438 @@ +from datetime import date, datetime + +from bs4 import BeautifulSoup +from requests import Session +from urllib import parse + +from fmframework.model import Track, Artist, Album, Scrobble +from fmframework.net.network import Network, LastFMNetworkException + +import logging + +logger = logging.getLogger(__name__) + +class LibraryScraper: + rsession = Session() + + @staticmethod + def get_scrobbled_tracks(username: str, artist: str, net: Network = None, whole_track=True, + from_date: datetime = None, to_date: datetime = None, + date_preset: str = None): + logger.info(f"loading {artist}'s tracks for {username}") + + tracks = LibraryScraper.get_scraped_scrobbled_tracks(username=username, artist=artist, + from_date=from_date, to_date=to_date, + date_preset=date_preset) + + if whole_track and net is None: + raise NameError('Network required for populating tracks') + + populated_tracks = [] + if tracks is not None: + if whole_track: + for track in tracks: + populated_tracks.append(net.get_track(name=track.name, artist=track.artist.name, username=username)) + + return populated_tracks + else: + return tracks + else: + logger.error(f'no scraped tracks returned for {artist} / {username}') + + @staticmethod + def get_scraped_scrobbled_tracks(username: str, artist: str, + from_date: datetime = None, to_date: datetime = None, + date_preset: str = None): + logger.info(f'loading page scraped {artist} tracks for {username}') + + page1 = LibraryScraper.get_scraped_artist_subpage(username=username, artist=artist, page=1, + url_key='tracks', include_pages=True, + from_date=from_date, to_date=to_date, + date_preset=date_preset) + + if page1 is not None: + tracks = page1[0] + for page_number in range(page1[1] - 1): + + page = LibraryScraper.get_scraped_artist_subpage(username=username, artist=artist, + url_key='tracks', + page=page_number + 2, + from_date=from_date, to_date=to_date, + date_preset=date_preset) + + if page is not None: + tracks += page + else: + logger.error(f'no tracks returned for {artist} / {username}') + + track_objects = [] + for album in tracks: + name_cell = album.find('td', class_='chartlist-name').find('a') + count_cell = album.find(class_='chartlist-count-bar-value') + + track_objects.append(Track(name=name_cell.string, + artist=Artist(name=artist), + url=name_cell['href'], + user_scrobbles=int(count_cell.contents[0].strip()))) + + return track_objects + else: + logger.error(f'no tracks returned for page 1 of {artist} / {username}') + + + @staticmethod + def get_scrobbled_albums(username: str, artist: str, net: Network = None, whole_album=True, + from_date: datetime = None, to_date: datetime = None, + date_preset: str = None): + logger.info(f"loading {artist}'s albums for {username}") + + albums = LibraryScraper.get_scraped_scrobbled_albums(username=username, artist=artist, + from_date=from_date, to_date=to_date, + date_preset=date_preset) + + if whole_album and net is None: + raise NameError('Network required for populating albums') + + populated_albums = [] + if albums is not None: + if whole_album: + for album in albums: + populated_albums.append(net.get_album(name=album.name, artist=album.artist.name, username=username)) + + return populated_albums + else: + return albums + else: + logger.error(f'no scraped albums returned for {artist} / {username}') + + @staticmethod + def get_scraped_scrobbled_albums(username: str, artist: str, + from_date: datetime = None, to_date: datetime = None, + date_preset: str = None): + logger.info(f'loading page scraped {artist} albums for {username}') + + page1 = LibraryScraper.get_scraped_artist_subpage(username=username, artist=artist, page=1, + url_key='albums', + include_pages=True, + from_date=from_date, to_date=to_date, + date_preset=date_preset) + + if page1 is not None: + albums = page1[0] + for page_number in range(page1[1] - 1): + + page = LibraryScraper.get_scraped_artist_subpage(username=username, artist=artist, + url_key='albums', + page=page_number + 2, + from_date=from_date, to_date=to_date, + date_preset=date_preset) + + if page is not None: + albums += page + else: + logger.error(f'no albums returned for {artist} / {username}') + + albums_objects = [] + for album in albums: + name_cell = album.find('td', class_='chartlist-name').find('a') + count_cell = album.find(class_='chartlist-count-bar-value') + + albums_objects.append(Album(name=name_cell.string, + artist=Artist(name=artist), + user_scrobbles=int(count_cell.contents[0].strip()), + url=name_cell['href'])) + + return albums_objects + else: + logger.error(f'no albums returned for page 1 of {artist} / {username}') + + @staticmethod + def get_albums_tracks(username: str, artist: str, album: str, net: Network = None, whole_track=True, + from_date: datetime = None, to_date: datetime = None, + date_preset: str = None): + logger.info(f"loading {artist}'s {album} tracks for {username}") + + tracks = LibraryScraper.get_scraped_albums_tracks(username=username, artist=artist, album=album, + from_date=from_date, to_date=to_date, + date_preset=date_preset) + + if whole_track and net is None: + raise NameError('Network required for populating tracks') + + populated_tracks = [] + if tracks is not None: + if whole_track: + for track in tracks: + populated_tracks.append(net.get_track(name=track.name, artist=track.artist.name, username=username)) + + return populated_tracks + else: + return tracks + else: + logger.error(f'no scraped tracks returned for {album} / {artist} / {username}') + + @staticmethod + def get_scraped_albums_tracks(username: str, artist: str, album: str, + from_date: datetime = None, to_date: datetime = None, + date_preset: str = None): + logger.info(f'loading page scraped {artist} albums for {username}') + + page1 = LibraryScraper.get_scraped_artist_subpage(username=username, artist=artist, page=1, + album=album, + include_pages=True, + from_date=from_date, to_date=to_date, + date_preset=date_preset) + + if page1 is not None: + albums = page1[0] + for page_number in range(page1[1] - 1): + + page = LibraryScraper.get_scraped_artist_subpage(username=username, artist=artist, + album=album, + page=page_number + 2, + from_date=from_date, to_date=to_date, + date_preset=date_preset) + + if page is not None: + albums += page + else: + logger.error(f'no tracks returned for {album} / {artist} / {username}') + + track_objects = [] + for album in albums: + name_cell = album.find('td', class_='chartlist-name').find('a') + count_cell = album.find(class_='chartlist-count-bar-value') + + artist_name = parse.unquote_plus(name_cell['href'].split('/')[2]) + + track_objects.append(Track(name=name_cell.string, + artist=Artist(name=artist_name), + url=name_cell['href'], + user_scrobbles=int(count_cell.contents[0].strip()))) + + return track_objects + else: + logger.error(f'no tracks returned for page 1 of {album} / {artist} / {username}') + + @staticmethod + def get_track_scrobbles(username: str, artist: str, track: str, net: Network = None, whole_track=True, + from_date: datetime = None, to_date: datetime = None, + date_preset: str = None): + logger.info(f"loading {track} / {artist} for {username}") + + tracks = LibraryScraper.get_scraped_track_scrobbles(username=username, artist=artist, track=track, + from_date=from_date, to_date=to_date, + date_preset=date_preset) + + if whole_track and net is None: + raise NameError('Network required for populating tracks') + + populated_tracks = [] + if tracks is not None: + if whole_track: + for track in tracks: + pulled_track = net.get_track(name=track.track.name, + artist=track.track.artist.name, + username=username) + pulled_track.album = net.get_album(name=track.track.album.name, + artist=track.track.album.name, + username=username) + populated_tracks.append(pulled_track) + + return populated_tracks + else: + return tracks + else: + logger.error(f'no scraped tracks returned for {track} / {artist} / {username}') + + @staticmethod + def get_scraped_track_scrobbles(username: str, artist: str, track: str, + from_date: datetime = None, to_date: datetime = None, + date_preset: str = None): + logger.info(f'loading page scraped {track} / {artist} for {username}') + + page1 = LibraryScraper.get_scraped_artist_subpage(username=username, artist=artist, page=1, + track=track, + include_pages=True, + from_date=from_date, to_date=to_date, + date_preset=date_preset) + + if page1 is not None: + albums = page1[0] + for page_number in range(page1[1] - 1): + + page = LibraryScraper.get_scraped_artist_subpage(username=username, artist=artist, + track=track, + page=page_number + 2, + from_date=from_date, to_date=to_date, + date_preset=date_preset) + + if page is not None: + albums += page + else: + logger.error(f'no scrobbles returned for {track} / {artist} / {username}') + + track_objects = [] + for album in albums: + name_cell = album.find('td', class_='chartlist-name').find('a') + album_cell = album.find('td', class_='chartlist-album').find('a') + + album_artist_name = parse.unquote_plus(album_cell['href'].split('/')[2]) + scrobble_timestamp = album.find('td', class_='chartlist-timestamp').find('span') + + timestamp_parts = [i.strip() for i in scrobble_timestamp.string.split(', ')] + + if len(timestamp_parts) == 1: + scrobble_datetime = datetime.strptime(timestamp_parts[0], '%d %b %I:%M%p') + scrobble_datetime = scrobble_datetime.replace(year=date.today().year) + elif len(timestamp_parts) == 2: + recombined = ' '.join(timestamp_parts) + scrobble_datetime = datetime.strptime(recombined, '%d %b %Y %I:%M%p') + else: + scrobble_datetime = None + logger.error(f'{len(timestamp_parts)} timestamp parts found, {timestamp_parts}') + + + track_objects.append(Scrobble(track=Track(name=name_cell.string, + artist=Artist(name=artist), + album=Album(name=album_cell.string, + artist=Artist(name=album_artist_name)), + url=name_cell['href']), + time=scrobble_datetime) + ) + + return track_objects + else: + logger.error(f'no scrobbles returned for page 1 of {track} / {artist} / {username}') + + @staticmethod + def get_scraped_artist_subpage(username: str, artist: str, page: int, + + url_key: str = None, + album: str = None, + track: str = None, + + include_pages=False, + from_date: datetime = None, to_date: datetime = None, + date_preset: str = None): + logger.debug(f'loading page {page} of {artist} for {username}') + + url = f'https://www.last.fm/user/{username}/library/music/{parse.quote_plus(artist)}' + + if album: + url += f'/{parse.quote_plus(album)}' + elif track: + url += f'/_/{parse.quote_plus(track)}' + + if url_key: + url += f'/+{url_key}' + + url += f'?page={page}' + + if from_date and to_date: + url += f'&from={from_date.strftime("%Y-%m-%d")}&to={to_date.strftime("%Y-%m-%d")}' + elif date_preset: + date_preset = date_preset.strip().upper() + if date_preset not in ['LAST_7_DAYS', 'LAST_30_DAYS', 'LAST_90_DAYS', + 'LAST_180_DAYS', 'LAST_365_DAYS', 'ALL']: + raise ValueError(f'date range {date_preset} not of allowed value') + + url += f'&date_preset={date_preset}' + + html = LibraryScraper.rsession.get(url) + + if 200 <= html.status_code < 300: + parser = BeautifulSoup(html.content, 'html.parser') + + list_section = parser.find('table', class_='chartlist') + + if list_section: + objs = [i for i in list_section.tbody.find_all('tr') if i.find('td', class_='chartlist-name')] + + if include_pages: + return (objs, len(parser.find_all('li', class_='pagination-page'))) + else: + return objs + else: + logger.error(f'no objects scrobbled for {artist} by {username}') + + else: + logger.error(f'HTTP error occurred {html.status_code}') + +class UserScraper: + rsession = Session() + + @staticmethod + def get_album_chart(net: Network, username: str, from_date: date, to_date: date, limit: int): + """Scrape chart from last.fm frontend before pulling each from the backend for a complete object""" + + chart = UserScraper.get_scraped_album_chart(username or net.username, from_date, to_date, limit) + logger.info('populating scraped albums') + albums = [] + for counter, scraped in enumerate(chart): + logger.debug(f'populating {counter+1} of {len(chart)}') + try: + albums.append(net.get_album(name=scraped.name, artist=scraped.artist.name)) + except LastFMNetworkException: + logger.exception(f'error occured during album retrieval') + + return albums + + @staticmethod + def get_scraped_album_chart(username: str, from_date: date, to_date: date, limit: int): + """Scrape 'light' objects from last.fm frontend based on date range and limit""" + + logger.info(f'scraping album chart from {from_date} to {to_date} for {username}') + + pages = int(limit / 50) + if limit % 50 != 0: + pages += 1 + + albums = [] + for i in range(pages): + scraped_albums = UserScraper.get_scraped_album_chart_page(username, from_date, to_date, i + 1) + if scraped_albums is not None: + albums += scraped_albums + + return albums[:limit] + + @staticmethod + def get_scraped_album_chart_page(username: str, from_date: date, to_date: date, page: int): + """Scrape 'light' objects single page of last.fm frontend based on date range""" + + logger.debug(f'loading page {page} from {from_date} to {to_date} for {username}') + + html = UserScraper.rsession.get(f'https://www.last.fm/user/{username}/library/albums' + f'?from={from_date.strftime("%Y-%m-%d")}' + f'&to={to_date.strftime("%Y-%m-%d")}' + f'&page={page}') + if 200 <= html.status_code < 300: + parser = BeautifulSoup(html.content, 'html.parser') + + chart_section = parser.find('section', id='top-albums-section') + + rows = chart_section.find_all('tr', 'chartlist-row') + + albums = [] + for row in rows: + names = row.find_all('a', title=True) + album_name = names[0]['title'] + artist_name = names[1]['title'] + + scrobble_tag = row.find('span', {"class": "chartlist-count-bar-value"}) + scrobble_count = [int(s) for s in scrobble_tag.contents[0].split() if s.isdigit()] + + if len(scrobble_count) != 1: + logger.error('no scrobble count integers found') + scrobble_count = 0 + else: + scrobble_count = scrobble_count[0] + + album = Album(name=album_name, + artist=Artist(name=artist_name), + user_scrobbles=scrobble_count) + albums.append(album) + + return albums + else: + logger.error(f'HTTP error occurred {html.status_code}') \ No newline at end of file