added image grid generation and web scraping for arbitrary creation
This commit is contained in:
parent
3289e6b242
commit
626873a164
94
fmframework/chart/__init__.py
Normal file
94
fmframework/chart/__init__.py
Normal file
@ -0,0 +1,94 @@
|
||||
from bs4 import BeautifulSoup
|
||||
import requests
|
||||
from datetime import date
|
||||
|
||||
from fmframework.model.album import Album
|
||||
from fmframework.model.artist import Artist
|
||||
from fmframework.model.fm import Image
|
||||
from fmframework.net.network import Network
|
||||
import fmframework.image
|
||||
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def get_album_chart_image(net: Network,
|
||||
username: str,
|
||||
from_date: date,
|
||||
to_date: date,
|
||||
limit: int = 20,
|
||||
image_size: Image.Size = Image.Size.extralarge,
|
||||
image_width: int = 5):
|
||||
album_chart = get_populated_album_chart(net=net, username=username,
|
||||
from_date=from_date, to_date=to_date,
|
||||
limit=limit)
|
||||
return fmframework.image.get_image_grid_from_objects(net=net,
|
||||
objects=album_chart,
|
||||
image_size=image_size,
|
||||
image_width=image_width)
|
||||
|
||||
|
||||
def get_populated_album_chart(net: Network, username: str, from_date: date, to_date: date, limit: int):
|
||||
chart = get_scraped_album_chart(username, from_date, to_date, limit)
|
||||
logger.info('populating scraped albums')
|
||||
albums = []
|
||||
for counter, scraped in enumerate(chart):
|
||||
logger.debug(f'populating {counter} of {len(chart)}')
|
||||
albums.append(net.get_album(name=scraped.name, artist=scraped.artist.name))
|
||||
|
||||
return albums
|
||||
|
||||
|
||||
def get_scraped_album_chart(username: str, from_date: date, to_date: date, limit: int):
|
||||
logger.info(f'scraping album chart from {from_date} to {to_date} for {username}')
|
||||
|
||||
pages = int(limit / 50)
|
||||
if limit % 50 != 0:
|
||||
pages += 1
|
||||
|
||||
albums = []
|
||||
for i in range(pages):
|
||||
scraped_albums = get_scraped_album_chart_page(username, from_date, to_date, i + 1)
|
||||
if scraped_albums is not None:
|
||||
albums += scraped_albums
|
||||
|
||||
return albums[:limit]
|
||||
|
||||
|
||||
def get_scraped_album_chart_page(username: str, from_date: date, to_date: date, page: int):
|
||||
logger.debug(f'loading page {page} from {from_date} to {to_date} for {username}')
|
||||
|
||||
html = requests.get(f'https://www.last.fm/user/{username}/library/albums'
|
||||
f'?from={from_date.strftime("%Y-%m-%d")}'
|
||||
f'&to={to_date.strftime("%Y-%m-%d")}'
|
||||
f'&page={page}')
|
||||
if 200 <= html.status_code < 300:
|
||||
parser = BeautifulSoup(html.content, 'html.parser')
|
||||
|
||||
chart_section = parser.find('section', id='top-albums-section')
|
||||
|
||||
rows = chart_section.find_all('tr', 'chartlist-row')
|
||||
|
||||
albums = []
|
||||
for row in rows:
|
||||
names = row.find_all('a', title=True)
|
||||
album_name = names[0]['title']
|
||||
artist_name = names[1]['title']
|
||||
|
||||
scrobble_tag = row.find('span', {"class": "chartlist-count-bar-value"})
|
||||
scrobble_count = [int(s) for s in scrobble_tag.contents[0].split() if s.isdigit()]
|
||||
|
||||
if len(scrobble_count) != 1:
|
||||
logger.error('no scrobble count integers found')
|
||||
scrobble_count = 0
|
||||
else:
|
||||
scrobble_count = scrobble_count[0]
|
||||
|
||||
artist = Artist(name=artist_name)
|
||||
album = Album(name=album_name, artist=artist, user_scrobbles=scrobble_count)
|
||||
albums.append(album)
|
||||
|
||||
return albums
|
||||
else:
|
||||
logger.error(f'HTTP error occurred {html.status_code}')
|
63
fmframework/image/__init__.py
Normal file
63
fmframework/image/__init__.py
Normal file
@ -0,0 +1,63 @@
|
||||
import numpy as np
|
||||
from typing import List
|
||||
from fmframework.net.network import Network, ImageSizeNotAvailableException
|
||||
from fmframework.model.fm import Image
|
||||
|
||||
import logging
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def get_blank_image(width, height):
|
||||
return np.zeros((height, width, 3), np.uint8)
|
||||
|
||||
|
||||
def arrange_cover_grid(images: List[np.array], width: int = 5):
|
||||
logger.debug(f'arranging {len(images)} images at width {width}')
|
||||
rows = []
|
||||
for row in chunk(images, width):
|
||||
row_img = row[0]
|
||||
for image in row[1:]:
|
||||
row_img = np.concatenate((row_img, image), axis=1)
|
||||
|
||||
# handle incomplete final row
|
||||
if len(row) < width and len(rows) > 0:
|
||||
width = rows[0].shape[1] - row_img.shape[1]
|
||||
height = rows[0].shape[0]
|
||||
logger.debug(rows[0].shape)
|
||||
row_img = np.concatenate((row_img, get_blank_image(width=width, height=height)), axis=1)
|
||||
|
||||
rows.append(row_img)
|
||||
|
||||
final_img = rows[0]
|
||||
if len(rows) > 1:
|
||||
for row in rows[1:]:
|
||||
final_img = np.concatenate((final_img, row), axis=0)
|
||||
return final_img
|
||||
|
||||
|
||||
def get_image_grid_from_objects(net: Network, objects, image_size: Image.Size, image_width: int = 5):
|
||||
logger.debug(f'getting {image_size.name} image grid of {len(objects)} objects at width {image_width}')
|
||||
images = []
|
||||
for counter, iter_object in enumerate(objects):
|
||||
logger.debug(f'downloading image {counter} of {len(objects)}')
|
||||
try:
|
||||
images.append(net.download_image_by_size(iter_object, size=image_size))
|
||||
except ImageSizeNotAvailableException:
|
||||
logger.error(f'{image_size.name} image not available for {iter_object.name}')
|
||||
|
||||
grid_image = arrange_cover_grid(images=images, width=image_width)
|
||||
return grid_image
|
||||
|
||||
|
||||
def chunk(l, n):
|
||||
for i in range(0, len(l), n):
|
||||
yield l[i:i+n]
|
||||
|
||||
|
||||
def generate_album_chart_grid(net: Network,
|
||||
chart_range: Network.Range,
|
||||
image_size: Image.Size = Image.Size.extralarge,
|
||||
limit: int = 100,
|
||||
image_width: int = 5):
|
||||
chart = net.get_top_albums(period=chart_range, limit=limit)
|
||||
return get_image_grid_from_objects(net=net, objects=chart, image_size=image_size, image_width=image_width)
|
@ -22,6 +22,9 @@ class Image:
|
||||
self.size = size
|
||||
self.link = link
|
||||
|
||||
def __str__(self):
|
||||
return f'{self.size.name} - {self.link}'
|
||||
|
||||
|
||||
class Wiki:
|
||||
def __init__(self,
|
||||
@ -65,6 +68,23 @@ class LastFM:
|
||||
f'listeners({self.listeners}), wiki({self.wiki})'
|
||||
|
||||
|
||||
class WeeklyChart:
|
||||
def __init__(self, from_time, to_time):
|
||||
self.from_secs = from_time
|
||||
self.to_secs = to_time
|
||||
|
||||
@property
|
||||
def from_date(self):
|
||||
return datetime.fromtimestamp(self.from_secs)
|
||||
|
||||
@property
|
||||
def to_date(self):
|
||||
return datetime.fromtimestamp(self.to_secs)
|
||||
|
||||
def __str__(self):
|
||||
return f'{self.from_secs} -> {self.to_secs}'
|
||||
|
||||
|
||||
class Scrobble:
|
||||
def __init__(self,
|
||||
track: Track = None,
|
||||
|
@ -1,11 +1,14 @@
|
||||
import requests
|
||||
from typing import Optional, List
|
||||
from typing import Optional, List, Union
|
||||
from copy import deepcopy
|
||||
import logging
|
||||
from enum import Enum
|
||||
from datetime import datetime, date, time, timedelta
|
||||
|
||||
from fmframework.model.fm import Scrobble, Wiki, Image
|
||||
import numpy as np
|
||||
import cv2
|
||||
|
||||
from fmframework.model.fm import Scrobble, Wiki, Image, WeeklyChart
|
||||
from fmframework.model.track import Track
|
||||
from fmframework.model.album import Album
|
||||
from fmframework.model.artist import Artist
|
||||
@ -13,6 +16,10 @@ from fmframework.model.artist import Artist
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ImageSizeNotAvailableException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class Network:
|
||||
|
||||
class Range(Enum):
|
||||
@ -259,6 +266,85 @@ class Network:
|
||||
|
||||
return [self.parse_artist(i) for i in iterator.items]
|
||||
|
||||
def download_image_by_size(self, fm_object: Union[Track, Album, Artist], size: Image.Size):
|
||||
try:
|
||||
images = fm_object.images
|
||||
|
||||
image_pointer = next((i for i in images if i.size == size), None)
|
||||
if image_pointer is not None:
|
||||
return self.download_image(image_pointer=image_pointer)
|
||||
else:
|
||||
logger.error(f'image of size {size.name} not found')
|
||||
raise ImageSizeNotAvailableException
|
||||
except AttributeError:
|
||||
logger.error(f'{fm_object} has no images')
|
||||
|
||||
@staticmethod
|
||||
def download_image(image_pointer: Image):
|
||||
logger.info(f'downloading {image_pointer.size.name} image - {image_pointer.link}')
|
||||
resp = requests.get(image_pointer.link, stream=True)
|
||||
|
||||
if 200 <= resp.status_code < 300:
|
||||
image = np.asarray(bytearray(resp.content), dtype="uint8")
|
||||
image = cv2.imdecode(image, cv2.IMREAD_COLOR)
|
||||
return image
|
||||
else:
|
||||
logger.error(f'http error {resp.status_code}')
|
||||
|
||||
def get_weekly_charts(self, username: str = None):
|
||||
logger.info('getting weekly chart list')
|
||||
|
||||
params = {'user': self.username if username is None else username}
|
||||
resp = self.get_request('user.getweeklychartlist', params=params)
|
||||
if resp:
|
||||
return [WeeklyChart(from_time=int(i['from']), to_time=int(i['to']))
|
||||
for i in resp.get('weeklychartlist', {}).get('chart', [])]
|
||||
else:
|
||||
logger.error('no response')
|
||||
|
||||
def get_weekly_chart(self,
|
||||
object_type,
|
||||
chart: WeeklyChart = None,
|
||||
from_time: int = None,
|
||||
to_time: int = None,
|
||||
username: str = None,
|
||||
limit: int = None):
|
||||
|
||||
if object_type not in ['album', 'artist', 'track']:
|
||||
raise ValueError('invalid object type')
|
||||
|
||||
if chart is None and (from_time is None or to_time is None):
|
||||
raise ValueError('no time range')
|
||||
|
||||
if chart is not None:
|
||||
from_time = chart.from_secs
|
||||
to_time = chart.to_secs
|
||||
|
||||
if limit is not None:
|
||||
logger.info(f'pulling top {limit} {object_type}s from {chart.from_date} to {chart.to_date} '
|
||||
f'for {self.username if username is None else username}')
|
||||
else:
|
||||
logger.info(f'pulling top {object_type}s from {chart.from_date} to {chart.to_date} '
|
||||
f'for {self.username if username is None else username}')
|
||||
|
||||
params = {
|
||||
'user': self.username if username is None else username,
|
||||
'from': from_time,
|
||||
'to': to_time
|
||||
}
|
||||
|
||||
resp = self.get_request(method=f'user.getweekly{object_type}chart', params=params)
|
||||
|
||||
if resp:
|
||||
if object_type == 'track':
|
||||
return [self.parse_track(i) for i in resp.get('weeklytrackchart', {}).get('track', [])]
|
||||
elif object_type == 'album':
|
||||
return [self.parse_album(i) for i in resp.get('weeklyalbumchart', {}).get('album', [])]
|
||||
elif object_type == 'artist':
|
||||
return [self.parse_artist(i) for i in resp.get('weeklyartistchart', {}).get('artist', [])]
|
||||
else:
|
||||
logger.error('no response')
|
||||
|
||||
@staticmethod
|
||||
def parse_wiki(wiki_dict) -> Optional[Wiki]:
|
||||
if wiki_dict:
|
||||
|
Loading…
Reference in New Issue
Block a user