batch pulling durations

This commit is contained in:
Andy Pack 2022-10-10 11:47:50 +01:00
parent 79da0108d3
commit 492b1dd424
Signed by: sarsoo
GPG Key ID: A55BA3536A5E0ED7
8 changed files with 376 additions and 31 deletions

View File

@ -3,6 +3,7 @@ using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using Selector.Model;
using SpotifyAPI.Web;
using StackExchange.Redis;
namespace Selector.CLI
{
@ -10,7 +11,8 @@ namespace Selector.CLI
{
public RootOptions Config { get; set; }
public ILoggerFactory Logger { get; set; }
public ISpotifyClient Spotify{ get; set; }
public ISpotifyClient Spotify { get; set; }
public ConnectionMultiplexer RedisMux { get; set; }
public DbContextOptionsBuilder<ApplicationDbContext> DatabaseConfig { get; set; }
public LastfmClient LastFmClient { get; set; }

View File

@ -9,6 +9,7 @@ using System.CommandLine;
using System.CommandLine.Invocation;
using System.Linq;
using System.Collections.Generic;
using Selector.Cache;
namespace Selector.CLI
{
@ -37,15 +38,20 @@ namespace Selector.CLI
try
{
var context = new CommandContext().WithLogger().WithDb(connectionString).WithLastfmApi();
var context = new CommandContext().WithLogger().WithDb(connectionString).WithSpotify().WithRedis();
var logger = context.Logger.CreateLogger("Scrobble");
using var db = new ApplicationDbContext(context.DatabaseConfig.Options, context.Logger.CreateLogger<ApplicationDbContext>());
var historyPersister = new HistoryPersister(db, new DataJsonContext(), new()
{
Username = username
}, context.Logger.CreateLogger<HistoryPersister>());
Username = username,
Apply50PercentRule = true
},
durationPuller: new(context.Logger.CreateLogger<DurationPuller>(),
context.Spotify.Tracks,
cache: context.RedisMux.GetDatabase()),
logger: context.Logger.CreateLogger<HistoryPersister>());
logger.LogInformation("Preparing to parse from {} for {}", path, username);

View File

@ -5,6 +5,7 @@ using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Selector.Model;
using SpotifyAPI.Web;
using StackExchange.Redis;
using System.Linq;
namespace Selector.CLI.Extensions
@ -91,5 +92,21 @@ namespace Selector.CLI.Extensions
return context;
}
public static CommandContext WithRedis(this CommandContext context)
{
if (context.Config is null)
{
context.WithConfig();
}
var connectionString = context.Config.RedisOptions.ConnectionString;
var connMulti = ConnectionMultiplexer.Connect(connectionString);
context.RedisMux = connMulti;
return context;
}
}
}

View File

@ -0,0 +1,26 @@
using System;
using System.Threading.Tasks;
using StackExchange.Redis;
namespace Selector.Cache;
public static class CacheExtensions
{
public static async Task<int?> GetTrackDuration(this IDatabaseAsync cache, string trackId)
{
return (int?) await cache?.HashGetAsync(Key.Track(trackId), Key.Duration);
}
public static async Task SetTrackDuration(this IDatabaseAsync cache, string trackId, int duration, TimeSpan? expiry = null)
{
var trackCacheKey = Key.Track(trackId);
await cache?.HashSetAsync(trackCacheKey, Key.Duration, duration);
if(expiry is not null)
{
await cache?.KeyExpireAsync(trackCacheKey, expiry);
}
}
}

View File

@ -20,6 +20,7 @@ namespace Selector.Cache
public const string AudioFeatureName = "AUDIO_FEATURE";
public const string PlayCountName = "PLAY_COUNT";
public const string Duration = "DURATION";
public const string SpotifyName = "SPOTIFY";
public const string LastfmName = "LASTFM";
@ -34,6 +35,9 @@ namespace Selector.Cache
public static string CurrentlyPlaying(string user) => MajorNamespace(MinorNamespace(UserName, CurrentlyPlayingName), user);
public static readonly string AllCurrentlyPlaying = CurrentlyPlaying(All);
public static string Track(string trackId) => MajorNamespace(TrackName, trackId);
public static readonly string AllTracks = Track(All);
public static string AudioFeature(string trackId) => MajorNamespace(MinorNamespace(TrackName, AudioFeatureName), trackId);
public static readonly string AllAudioFeatures = AudioFeature(All);

View File

@ -0,0 +1,183 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json.Linq;
using SpotifyAPI.Web;
using StackExchange.Redis;
namespace Selector.Cache
{
public class DurationPuller
{
private readonly IDatabaseAsync Cache;
private readonly ILogger<DurationPuller> Logger;
protected readonly ITracksClient SpotifyClient;
private int _retries = 0;
public DurationPuller(
ILogger<DurationPuller> logger,
ITracksClient spotifyClient,
IDatabaseAsync cache = null
)
{
Cache = cache;
Logger = logger;
SpotifyClient = spotifyClient;
}
public async Task<int?> Get(string uri)
{
if (string.IsNullOrWhiteSpace(uri)) throw new ArgumentNullException("No uri provided");
var trackId = uri.Split(":").Last();
var cachedVal = await Cache?.HashGetAsync(Key.Track(trackId), Key.Duration);
if (Cache is null || cachedVal == RedisValue.Null || cachedVal.IsNullOrEmpty)
{
try {
Logger.LogDebug("Missed cache, pulling");
var info = await SpotifyClient.Get(trackId);
await Cache?.SetTrackDuration(trackId, info.DurationMs, TimeSpan.FromDays(7));
_retries = 0;
return info.DurationMs;
}
catch (APIUnauthorizedException e)
{
Logger.LogError("Unauthorised error: [{message}] (should be refreshed and retried?)", e.Message);
throw e;
}
catch (APITooManyRequestsException e)
{
if(_retries <= 3)
{
Logger.LogWarning("Too many requests error, retrying ({}): [{message}]", e.RetryAfter, e.Message);
_retries++;
await Task.Delay(e.RetryAfter);
return await Get(uri);
}
else
{
Logger.LogError("Too many requests error, done retrying: [{message}]", e.Message);
throw e;
}
}
catch (APIException e)
{
if (_retries <= 3)
{
Logger.LogWarning("API error, retrying: [{message}]", e.Message);
_retries++;
await Task.Delay(TimeSpan.FromSeconds(2));
return await Get(uri);
}
else
{
Logger.LogError("API error, done retrying: [{message}]", e.Message);
throw e;
}
}
}
else
{
return (int?) cachedVal;
}
}
public async Task<IDictionary<string, int>> Get(IEnumerable<string> uri)
{
if (!uri.Any()) throw new ArgumentNullException("No URIs provided");
var ret = new Dictionary<string, int>();
var toPullFromSpotify = new List<string>();
foreach (var input in uri.Select(x => x.Split(":").Last()))
{
var cachedVal = await Cache?.HashGetAsync(Key.Track(input), Key.Duration);
if (Cache is null || cachedVal == RedisValue.Null || cachedVal.IsNullOrEmpty)
{
toPullFromSpotify.Add(input);
}
else
{
ret[input] = (int) cachedVal;
}
}
var retries = new List<string>();
foreach(var chunk in toPullFromSpotify.Chunk(50))
{
await PullChunk(chunk, ret);
await Task.Delay(TimeSpan.FromMilliseconds(200));
}
return ret;
}
private async Task PullChunk(IList<string> toPull, IDictionary<string, int> ret)
{
try
{
var info = await SpotifyClient.GetSeveral(new(toPull));
foreach (var resp in info.Tracks)
{
await Cache?.SetTrackDuration(resp.Id, resp.DurationMs, TimeSpan.FromDays(7));
ret[resp.Id] = (int)resp.DurationMs;
}
_retries = 0;
}
catch (APIUnauthorizedException e)
{
Logger.LogError("Unauthorised error: [{message}] (should be refreshed and retried?)", e.Message);
throw e;
}
catch (APITooManyRequestsException e)
{
if (_retries <= 3)
{
Logger.LogWarning("Too many requests error, retrying ({}): [{message}]", e.RetryAfter, e.Message);
_retries++;
await Task.Delay(e.RetryAfter);
await PullChunk(toPull, ret);
}
else
{
Logger.LogError("Too many requests error, done retrying: [{message}]", e.Message);
throw e;
}
}
catch (APIException e)
{
if (_retries <= 3)
{
Logger.LogWarning("API error, retrying: [{message}]", e.Message);
_retries++;
await Task.Delay(TimeSpan.FromSeconds(5));
await PullChunk(toPull, ret);
}
else
{
Logger.LogError("API error, done retrying: [{message}]", e.Message);
throw e;
}
}
}
}
}

View File

@ -1,6 +1,8 @@
using System.Text.Json;
using Microsoft.Extensions.Logging;
using Selector.Cache;
using Selector.Model;
using static SpotifyAPI.Web.PlaylistRemoveItemsRequest;
namespace Selector.Data;
@ -17,14 +19,31 @@ public class HistoryPersister
private ApplicationDbContext Db { get; set; }
private DataJsonContext Json { get; set; }
private DurationPuller DurationPuller { get; set; }
private ILogger<HistoryPersister> Logger { get; set; }
public HistoryPersister(ApplicationDbContext db, DataJsonContext json, HistoryPersisterConfig config, ILogger<HistoryPersister> logger = null)
private readonly Dictionary<string, int> Durations;
public HistoryPersister(
ApplicationDbContext db,
DataJsonContext json,
HistoryPersisterConfig config,
DurationPuller durationPuller = null,
ILogger<HistoryPersister> logger = null)
{
Config = config;
Db = db;
Json = json;
DurationPuller = durationPuller;
Logger = logger;
if (config.Apply50PercentRule && DurationPuller is null)
{
throw new ArgumentNullException(nameof(DurationPuller));
}
Durations = new();
}
public void Process(string input)
@ -48,7 +67,7 @@ public class HistoryPersister
var parsed = await JsonSerializer.DeserializeAsync(singleInput, Json.EndSongArray);
songs = songs.Concat(parsed);
Logger?.LogDebug("Parsed {.2f} items for {}", parsed.Length, Config.Username);
Logger?.LogDebug("Parsed {:n0} items for {}", parsed.Length, Config.Username);
}
await Process(songs);
@ -56,47 +75,134 @@ public class HistoryPersister
public async Task Process(IEnumerable<EndSong> input)
{
var user = Db.Users.Single(u => u.UserName == Config.Username);
if (Config.InitialClear)
{
var latestTime = input.OrderBy(x => x.ts).Last().ts;
var time = DateTime.Parse(latestTime).ToUniversalTime();
Db.SpotifyListen.RemoveRange(Db.SpotifyListen.Where(x => x.User.UserName == Config.Username && x.Timestamp <= time));
Db.SpotifyListen.RemoveRange(Db.SpotifyListen.Where(x => x.UserId == user.Id && x.Timestamp <= time));
}
var user = Db.Users.Single(u => u.UserName == Config.Username);
var filtered = input.Where(x => x.ms_played > 30000
&& !string.IsNullOrWhiteSpace(x.master_metadata_track_name))
.DistinctBy(x => (x.offline_timestamp, x.ts, x.spotify_track_uri))
.ToArray();
var counter = 0;
Logger.LogInformation("{:n0} items after filtering", filtered.Length);
var filtered = input.Where(x => x.ms_played > 30000)
.DistinctBy(x => (x.offline_timestamp, x.ts, x.spotify_track_uri))
.ToArray();
Logger.LogInformation("{.2f} items after filtering", filtered.Length);
foreach (var item in filtered)
var processedCounter = 0;
foreach (var item in filtered.Chunk(1000))
{
if(!string.IsNullOrWhiteSpace(item.master_metadata_track_name))
IEnumerable<EndSong> toPopulate = item;
if (Config.Apply50PercentRule)
{
Db.SpotifyListen.Add(new()
{
TrackName = item.master_metadata_track_name,
AlbumName = item.master_metadata_album_album_name,
ArtistName = item.master_metadata_album_artist_name,
Logger.LogDebug("Validating tracks {:n0}/{:n0}", processedCounter + 1, filtered.Length);
Timestamp = DateTime.Parse(item.ts).ToUniversalTime(),
PlayedDuration = item.ms_played,
TrackUri = item.spotify_track_uri,
UserId = user.Id
});
counter++;
toPopulate = Passes50PcRule(toPopulate);
}
Db.SpotifyListen.AddRange(toPopulate.Select(x => new SpotifyListen()
{
TrackName = x.master_metadata_track_name,
AlbumName = x.master_metadata_album_album_name,
ArtistName = x.master_metadata_album_artist_name,
Timestamp = DateTime.Parse(x.ts).ToUniversalTime(),
PlayedDuration = x.ms_played,
TrackUri = x.spotify_track_uri,
UserId = user.Id
}));
processedCounter += item.Length;
}
Logger?.LogInformation("Added {} historical items for {}", counter, user.UserName);
Logger?.LogInformation("Added {:n0} historical items for {}", processedCounter, user.UserName);
await Db.SaveChangesAsync();
}
private const int FOUR_MINUTES = 4 * 60 * 1000;
public async Task<bool> Passes50PcRule(EndSong song)
{
if (string.IsNullOrWhiteSpace(song.spotify_track_uri)) return true;
int duration;
if (Durations.TryGetValue(song.spotify_track_uri, out duration))
{
}
else
{
var pulledDuration = await DurationPuller.Get(song.spotify_track_uri);
if (pulledDuration is int d)
{
duration = d;
Durations.Add(song.spotify_track_uri, duration);
}
else
{
Logger.LogDebug("No duration returned for {}/{}", song.master_metadata_track_name, song.master_metadata_album_artist_name);
return true; // if can't get duration, just pass
}
}
return CheckDuration(song, duration);
}
public IEnumerable<EndSong> Passes50PcRule(IEnumerable<EndSong> inputTracks)
{
var toPullOverWire = new List<EndSong>();
// quick return items from local cache
foreach(var track in inputTracks)
{
if (string.IsNullOrWhiteSpace(track.spotify_track_uri)) yield return track;
if (Durations.TryGetValue(track.spotify_track_uri, out var duration))
{
if (CheckDuration(track, duration))
{
yield return track;
}
}
else
{
toPullOverWire.Add(track);
}
}
var pulledDuration = DurationPuller.Get(toPullOverWire.Select(x => x.spotify_track_uri)).Result;
// apply results to cache
foreach((var uri, var dur) in pulledDuration)
{
Durations[uri] = dur;
}
// check return acceptable tracks from pulled
foreach(var track in toPullOverWire)
{
if(pulledDuration.TryGetValue(track.spotify_track_uri, out var duration))
{
if(CheckDuration(track, duration))
{
yield return track;
}
}
else
{
yield return track;
}
}
}
public bool CheckDuration(EndSong song, int duration) => song.ms_played >= duration / 2 || song.ms_played >= FOUR_MINUTES;
}

View File

@ -7,6 +7,7 @@
<ItemGroup>
<ProjectReference Include="..\Selector.Model\Selector.Model.csproj" />
<ProjectReference Include="..\Selector.Cache\Selector.Cache.csproj" />
</ItemGroup>
<ItemGroup>