added artist+album scrobble scraping, added mock headers

This commit is contained in:
aj 2020-08-17 11:56:37 +01:00
parent 8ad8509f25
commit ee8471ee72
2 changed files with 103 additions and 33 deletions

View File

@ -0,0 +1 @@
from .network import Network, LastFMNetworkException

View File

@ -1,4 +1,4 @@
from datetime import date, datetime from datetime import date, datetime, timedelta
from typing import Union from typing import Union
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
@ -33,6 +33,30 @@ class LibraryScraper:
else: else:
raise TypeError(f'invalid period provided, {period} / {type(period)}') raise TypeError(f'invalid period provided, {period} / {type(period)}')
@staticmethod
def artist_scrobbles(username: str, artist: str, net: Network = None, whole_track=True,
from_date: datetime = None, to_date: datetime = None,
date_preset: str = None, reverse: bool = False):
logger.info(f"loading {artist}'s tracks for {username}")
artists_tracks = LibraryScraper.artist_tracks(username=username, artist=artist, net=net,
whole_track=False, from_date=from_date, to_date=to_date,
date_preset=date_preset)
scrobbles = []
for track in artists_tracks:
tracks_scrobbles = LibraryScraper.track_scrobbles(username=username, artist=artist, track=track.name,
net=net, whole_track=whole_track,
from_date=from_date, to_date=to_date,
date_preset=date_preset)
if tracks_scrobbles is not None:
scrobbles += tracks_scrobbles
else:
logger.warning(f'no scrobbles returned for {track.name} / {track.artist.name} / {username}')
return sorted(scrobbles, key=lambda x: x.time, reverse=reverse)
@staticmethod @staticmethod
def artist_tracks(username: str, artist: str, net: Network = None, whole_track=True, def artist_tracks(username: str, artist: str, net: Network = None, whole_track=True,
from_date: datetime = None, to_date: datetime = None, from_date: datetime = None, to_date: datetime = None,
@ -164,6 +188,34 @@ class LibraryScraper:
else: else:
logger.error(f'no albums returned for page 1 of {artist} / {username}') logger.error(f'no albums returned for page 1 of {artist} / {username}')
@staticmethod
def album_scrobbles(username: str, artist: str, album: str, net: Network = None, whole_track=True,
from_date: datetime = None, to_date: datetime = None,
date_preset: str = None, reverse: bool = False):
logger.info(f"loading {album} / {artist}'s tracks for {username}")
albums_tracks = LibraryScraper.album_tracks(username=username, artist=artist, album=album, net=net,
whole_track=False, from_date=from_date, to_date=to_date,
date_preset=date_preset)
if albums_tracks is None:
logger.error(f'no tracks returned for {album} / {artist} / {username}')
return
scrobbles = []
for track in albums_tracks:
tracks_scrobbles = LibraryScraper.track_scrobbles(username=username, artist=artist, track=track.name,
net=net, whole_track=whole_track,
from_date=from_date, to_date=to_date,
date_preset=date_preset)
if tracks_scrobbles is not None:
scrobbles += tracks_scrobbles
else:
logger.warning(f'no scrobbles returned for {track.name} / {track.artist.name} / {username}')
return sorted(scrobbles, key=lambda x: x.time, reverse=reverse)
@staticmethod @staticmethod
def album_tracks(username: str, artist: str, album: str, net: Network = None, whole_track=True, def album_tracks(username: str, artist: str, album: str, net: Network = None, whole_track=True,
from_date: datetime = None, to_date: datetime = None, from_date: datetime = None, to_date: datetime = None,
@ -232,6 +284,7 @@ class LibraryScraper:
else: else:
logger.error(f'no tracks returned for page 1 of {album} / {artist} / {username}') logger.error(f'no tracks returned for page 1 of {album} / {artist} / {username}')
# TODO cache pulled album to reduce requests
@staticmethod @staticmethod
def track_scrobbles(username: str, artist: str, track: str, net: Network = None, whole_track=True, def track_scrobbles(username: str, artist: str, track: str, net: Network = None, whole_track=True,
from_date: datetime = None, to_date: datetime = None, from_date: datetime = None, to_date: datetime = None,
@ -245,19 +298,21 @@ class LibraryScraper:
if whole_track and net is None: if whole_track and net is None:
raise NameError('Network required for populating tracks') raise NameError('Network required for populating tracks')
populated_tracks = [] populated_scrobbles = []
if tracks is not None: if tracks is not None:
if whole_track: if whole_track:
for track in tracks: for scrobble in tracks:
pulled_track = net.track(name=track.track.name, pulled_scrobble = net.track(name=scrobble.track.name,
artist=track.track.artist.name, artist=scrobble.track.artist.name,
username=username) username=username)
pulled_track.album = net.album(name=track.track.album.name, pulled_scrobble.album = net.album(name=scrobble.track.album.name,
artist=track.track.album.name, artist=scrobble.track.album.artist.name,
username=username) username=username)
populated_tracks.append(pulled_track)
return populated_tracks scrobble.track = pulled_scrobble
populated_scrobbles.append(scrobble)
return populated_scrobbles
else: else:
return tracks return tracks
else: else:
@ -301,11 +356,14 @@ class LibraryScraper:
timestamp_parts = [i.strip() for i in scrobble_timestamp.string.split(', ')] timestamp_parts = [i.strip() for i in scrobble_timestamp.string.split(', ')]
if len(timestamp_parts) == 1: if len(timestamp_parts) == 1:
scrobble_datetime = datetime.strptime(timestamp_parts[0], '%d %b %I:%M%p') try:
scrobble_datetime = datetime.strptime(timestamp_parts[0], '%d %b %I:%M%p') # this year
scrobble_datetime = scrobble_datetime.replace(year=date.today().year) scrobble_datetime = scrobble_datetime.replace(year=date.today().year)
except ValueError:
scrobble_datetime = datetime.now() - timedelta(hours=int(timestamp_parts[0][0])) # X hours ago
elif len(timestamp_parts) == 2: elif len(timestamp_parts) == 2:
recombined = ' '.join(timestamp_parts) recombined = ' '.join(timestamp_parts)
scrobble_datetime = datetime.strptime(recombined, '%d %b %Y %I:%M%p') scrobble_datetime = datetime.strptime(recombined, '%d %b %Y %I:%M%p') # previous year
else: else:
scrobble_datetime = None scrobble_datetime = None
logger.error(f'{len(timestamp_parts)} timestamp parts found, {timestamp_parts}') logger.error(f'{len(timestamp_parts)} timestamp parts found, {timestamp_parts}')
@ -366,22 +424,26 @@ class LibraryScraper:
else: else:
raise TypeError(f'invalid period provided, {date_preset} / {type(date_preset)}') raise TypeError(f'invalid period provided, {date_preset} / {type(date_preset)}')
html = LibraryScraper.rsession.get(url) headers = {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "en-GB,en;q=0.5",
"DNT": "1",
"Host": "www.last.fm",
"Upgrade-Insecure-Requests": "1",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:75.0) Gecko/20100101 Firefox/75.0",
}
html = LibraryScraper.rsession.get(url, headers=headers)
if 200 <= html.status_code < 300: if 200 <= html.status_code < 300:
parser = BeautifulSoup(html.content, 'html.parser') parser = BeautifulSoup(html.content, 'html.parser')
list_section = parser.find('table', class_='chartlist') objs = [i for i in parser.find_all('tr') if i.find('td', class_='chartlist-name')]
if list_section:
objs = [i for i in list_section.tbody.find_all('tr') if i.find('td', class_='chartlist-name')]
if include_pages: if include_pages:
return objs, len(parser.find_all('li', class_='pagination-page')) return objs, len(parser.find_all('li', class_='pagination-page'))
else: else:
return objs return objs
else:
logger.error(f'no objects scrobbled for {artist} by {username}')
else: else:
logger.error(f'HTTP error occurred {html.status_code}') logger.error(f'HTTP error occurred {html.status_code}')
@ -430,16 +492,23 @@ class UserScraper:
logger.debug(f'loading page {page} from {from_date} to {to_date} for {username}') logger.debug(f'loading page {page} from {from_date} to {to_date} for {username}')
headers = {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "en-GB,en;q=0.5",
"DNT": "1",
"Host": "www.last.fm",
"Upgrade-Insecure-Requests": "1",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:75.0) Gecko/20100101 Firefox/75.0",
}
html = UserScraper.rsession.get(f'https://www.last.fm/user/{username}/library/albums' html = UserScraper.rsession.get(f'https://www.last.fm/user/{username}/library/albums'
f'?from={from_date.strftime("%Y-%m-%d")}' f'?from={from_date.strftime("%Y-%m-%d")}'
f'&to={to_date.strftime("%Y-%m-%d")}' f'&to={to_date.strftime("%Y-%m-%d")}'
f'&page={page}') f'&page={page}',
headers=headers)
if 200 <= html.status_code < 300: if 200 <= html.status_code < 300:
parser = BeautifulSoup(html.content, 'html.parser') parser = BeautifulSoup(html.content, 'html.parser')
rows = parser.find_all('tr', 'chartlist-row')
chart_section = parser.find('section', id='top-albums-section')
rows = chart_section.find_all('tr', 'chartlist-row')
albums = [] albums = []
for row in rows: for row in rows: