adding scrobble request with retries, in parallel with db
fixes #35 fixes #36
This commit is contained in:
parent
e8c895f68b
commit
a10fc490bf
@ -30,6 +30,10 @@ namespace Selector.CLI
|
||||
delayOption.AddAlias("-d");
|
||||
AddOption(delayOption);
|
||||
|
||||
var simulOption = new Option<int>("--simultaneous", getDefaultValue: () => 5, "simultaneous connections when pulling");
|
||||
simulOption.AddAlias("-s");
|
||||
AddOption(simulOption);
|
||||
|
||||
var username = new Option<string>("--username", "user to pulls scrobbles for");
|
||||
username.AddAlias("-u");
|
||||
AddOption(username);
|
||||
@ -42,10 +46,10 @@ namespace Selector.CLI
|
||||
dontRemove.AddAlias("-nr");
|
||||
AddOption(dontRemove);
|
||||
|
||||
Handler = CommandHandler.Create(async (DateTime from, DateTime to, int page, int delay, string username, bool noAdd, bool noRemove, CancellationToken token) => await Execute(from, to, page, delay, username, noAdd, noRemove, token));
|
||||
Handler = CommandHandler.Create(async (DateTime from, DateTime to, int page, int delay, int simul, string username, bool noAdd, bool noRemove, CancellationToken token) => await Execute(from, to, page, delay, simul, username, noAdd, noRemove, token));
|
||||
}
|
||||
|
||||
public static async Task<int> Execute(DateTime from, DateTime to, int page, int delay, string username, bool noAdd, bool noRemove, CancellationToken token)
|
||||
public static async Task<int> Execute(DateTime from, DateTime to, int page, int delay, int simul, string username, bool noAdd, bool noRemove, CancellationToken token)
|
||||
{
|
||||
try
|
||||
{
|
||||
@ -75,10 +79,12 @@ namespace Selector.CLI
|
||||
To = to,
|
||||
PageSize = page,
|
||||
DontAdd = noAdd,
|
||||
DontRemove = noRemove
|
||||
DontRemove = noRemove,
|
||||
SimultaneousConnections = simul
|
||||
},
|
||||
db,
|
||||
context.Logger.CreateLogger<ScrobbleSaver>())
|
||||
context.Logger.CreateLogger<ScrobbleSaver>(),
|
||||
context.Logger)
|
||||
.Execute(token);
|
||||
}
|
||||
else
|
||||
|
@ -1,10 +1,11 @@
|
||||
using IF.Lastfm.Core.Api;
|
||||
using IF.Lastfm.Core.Api.Helpers;
|
||||
using IF.Lastfm.Core.Objects;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using Selector.Model;
|
||||
using System;
|
||||
using System.Collections.Concurrent;
|
||||
using System.Collections.Generic;
|
||||
using System.Diagnostics;
|
||||
using System.Linq;
|
||||
@ -18,10 +19,12 @@ namespace Selector
|
||||
{
|
||||
public ApplicationUser User { get; set; }
|
||||
public TimeSpan InterRequestDelay { get; set; }
|
||||
public TimeSpan Timeout { get; set; } = new TimeSpan(0, 20, 0);
|
||||
public DateTime? From { get; set; }
|
||||
public DateTime? To { get; set; }
|
||||
public int PageSize { get; set; } = 100;
|
||||
public int Retries { get; set; } = 5;
|
||||
public int SimultaneousConnections { get; set; } = 3;
|
||||
public bool DontAdd { get; set; } = false;
|
||||
public bool DontRemove { get; set; } = false;
|
||||
}
|
||||
@ -29,85 +32,77 @@ namespace Selector
|
||||
public class ScrobbleSaver
|
||||
{
|
||||
private readonly ILogger<ScrobbleSaver> logger;
|
||||
private readonly ILoggerFactory loggerFactory;
|
||||
|
||||
private readonly IUserApi userClient;
|
||||
private readonly ScrobbleSaverConfig config;
|
||||
private CancellationToken _token;
|
||||
private Task aggregateNetworkTask;
|
||||
|
||||
private readonly ApplicationDbContext db;
|
||||
|
||||
public ScrobbleSaver(IUserApi _userClient, ScrobbleSaverConfig _config, ApplicationDbContext _db, ILogger<ScrobbleSaver> _logger)
|
||||
private ConcurrentQueue<ScrobbleRequest> waitingRequests = new();
|
||||
private ConcurrentQueue<ScrobbleRequest> runRequests = new();
|
||||
|
||||
public ScrobbleSaver(IUserApi _userClient, ScrobbleSaverConfig _config, ApplicationDbContext _db, ILogger<ScrobbleSaver> _logger, ILoggerFactory _loggerFactory = null)
|
||||
{
|
||||
userClient = _userClient;
|
||||
config = _config;
|
||||
db = _db;
|
||||
logger = _logger;
|
||||
loggerFactory = _loggerFactory;
|
||||
}
|
||||
|
||||
public async Task Execute(CancellationToken token)
|
||||
{
|
||||
logger.LogInformation("Saving scrobbles for {0}/{1}", config.User.UserName, config.User.LastFmUsername);
|
||||
_token = token;
|
||||
|
||||
var page1 = await userClient.GetRecentScrobbles(config.User.LastFmUsername, count: config.PageSize, from: config.From, to: config.To);
|
||||
var page1 = new ScrobbleRequest(userClient,
|
||||
loggerFactory?.CreateLogger<ScrobbleRequest>() ?? NullLogger<ScrobbleRequest>.Instance,
|
||||
config.User.UserName,
|
||||
1,
|
||||
config.PageSize,
|
||||
config.From, config.To);
|
||||
|
||||
if(page1.Success)
|
||||
await page1.Execute();
|
||||
runRequests.Enqueue(page1);
|
||||
|
||||
if (page1.Succeeded)
|
||||
{
|
||||
var scrobbles = page1.Content.ToList();
|
||||
|
||||
if (page1.TotalPages > 1)
|
||||
{
|
||||
var tasks = await GetScrobblesFromPageNumbers(2, page1.TotalPages, token);
|
||||
var taskResults = await Task.WhenAll(tasks);
|
||||
|
||||
foreach (var result in taskResults)
|
||||
{
|
||||
if (result.Success)
|
||||
{
|
||||
scrobbles.AddRange(result.Content);
|
||||
}
|
||||
else
|
||||
{
|
||||
logger.LogWarning("Failed to get a subset of scrobbles for {0}/{1}", config.User.UserName, config.User.LastFmUsername);
|
||||
}
|
||||
}
|
||||
TriggerNetworkRequests(page1.TotalPages, token);
|
||||
}
|
||||
|
||||
logger.LogDebug("Pulling currently stored scrobbles");
|
||||
|
||||
var currentScrobblesPulled = GetDbScrobbles();
|
||||
|
||||
await aggregateNetworkTask;
|
||||
var scrobbles = runRequests.SelectMany(r => r.Scrobbles);
|
||||
|
||||
IdentifyDuplicates(scrobbles);
|
||||
|
||||
logger.LogDebug("Ordering and filtering pulled scrobbles");
|
||||
|
||||
RemoveNowPlaying(scrobbles.ToList());
|
||||
|
||||
var nativeScrobbles = scrobbles
|
||||
.DistinctBy(s => s.TimePlayed?.UtcDateTime)
|
||||
.Select(s =>
|
||||
{
|
||||
var nativeScrobble = (UserScrobble) s;
|
||||
var nativeScrobble = (UserScrobble)s;
|
||||
nativeScrobble.UserId = config.User.Id;
|
||||
return nativeScrobble;
|
||||
});
|
||||
|
||||
logger.LogDebug("Pulling currently stored scrobbles");
|
||||
|
||||
var currentScrobbles = db.Scrobble
|
||||
.AsEnumerable()
|
||||
.Where(s => s.UserId == config.User.Id);
|
||||
|
||||
if (config.From is not null)
|
||||
{
|
||||
currentScrobbles = currentScrobbles.Where(s => s.Timestamp > config.From);
|
||||
}
|
||||
|
||||
if (config.To is not null)
|
||||
{
|
||||
currentScrobbles = currentScrobbles.Where(s => s.Timestamp < config.To);
|
||||
}
|
||||
|
||||
logger.LogInformation("Completed scrobble pulling for {0}, pulled {1:n0}", config.User.UserName, nativeScrobbles.Count());
|
||||
logger.LogInformation("Completed database scrobble pulling for {0}, pulled {1:n0}", config.User.UserName, nativeScrobbles.Count());
|
||||
|
||||
logger.LogDebug("Identifying difference sets");
|
||||
var time = Stopwatch.StartNew();
|
||||
|
||||
(var toAdd, var toRemove) = ScrobbleMatcher.IdentifyDiffs(currentScrobbles, nativeScrobbles);
|
||||
|
||||
var toAddUser = toAdd.Cast<UserScrobble>().ToList();
|
||||
var toRemoveUser = toRemove.Cast<UserScrobble>().ToList();
|
||||
(var toAdd, var toRemove) = ScrobbleMatcher.IdentifyDiffs(currentScrobblesPulled, nativeScrobbles);
|
||||
|
||||
time.Stop();
|
||||
logger.LogTrace("Finished diffing: {0:n}ms", time.ElapsedMilliseconds);
|
||||
@ -116,26 +111,26 @@ namespace Selector
|
||||
|
||||
if(!config.DontAdd)
|
||||
{
|
||||
await db.Scrobble.AddRangeAsync(toAddUser);
|
||||
await db.Scrobble.AddRangeAsync(toAdd.Cast<UserScrobble>());
|
||||
}
|
||||
else
|
||||
{
|
||||
logger.LogInformation("Skipping adding of {0} scrobbles", toAddUser.Count);
|
||||
logger.LogInformation("Skipping adding of {0} scrobbles", toAdd.Count());
|
||||
}
|
||||
if (!config.DontRemove)
|
||||
{
|
||||
db.Scrobble.RemoveRange(toRemoveUser);
|
||||
db.Scrobble.RemoveRange(toRemove.Cast<UserScrobble>());
|
||||
}
|
||||
else
|
||||
{
|
||||
logger.LogInformation("Skipping removal of {0} scrobbles", toRemoveUser.Count);
|
||||
logger.LogInformation("Skipping removal of {0} scrobbles", toRemove.Count());
|
||||
}
|
||||
await db.SaveChangesAsync();
|
||||
|
||||
timeDbOps.Stop();
|
||||
logger.LogTrace("DB ops: {0:n}ms", timeDbOps.ElapsedMilliseconds);
|
||||
|
||||
logger.LogInformation("Completed scrobble pulling for {0}, +{1:n0}, -{2:n0}", config.User.UserName, toAddUser.Count(), toRemoveUser.Count());
|
||||
logger.LogInformation("Completed scrobble pulling for {0}, +{1:n0}, -{2:n0}", config.User.UserName, toAdd.Count(), toRemove.Count());
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -143,21 +138,77 @@ namespace Selector
|
||||
}
|
||||
}
|
||||
|
||||
private async Task<List<Task<PageResponse<LastTrack>>>> GetScrobblesFromPageNumbers(int start, int totalPages, CancellationToken token)
|
||||
private async void HandleSuccessfulRequest(object o, EventArgs e)
|
||||
{
|
||||
var tasks = new List<Task<PageResponse<LastTrack>>>();
|
||||
await Task.Delay(config.InterRequestDelay, _token);
|
||||
TransitionRequest();
|
||||
}
|
||||
|
||||
foreach (var pageNumber in Enumerable.Range(start, totalPages - 1))
|
||||
private void TransitionRequest()
|
||||
{
|
||||
if (waitingRequests.TryDequeue(out var request))
|
||||
{
|
||||
logger.LogInformation("Pulling page {2:n0}/{3:n0} for {0}/{1}", config.User.UserName, config.User.LastFmUsername, pageNumber, totalPages);
|
||||
|
||||
tasks.Add(userClient.GetRecentScrobbles(config.User.LastFmUsername, pagenumber: pageNumber, count: config.PageSize, from: config.From, to: config.To));
|
||||
await Task.Delay(config.InterRequestDelay, token);
|
||||
request.Success += HandleSuccessfulRequest;
|
||||
_ = request.Execute();
|
||||
runRequests.Enqueue(request);
|
||||
}
|
||||
}
|
||||
|
||||
private void TriggerNetworkRequests(int totalPages, CancellationToken token)
|
||||
{
|
||||
foreach (var req in GetRequestsFromPageNumbers(2, totalPages))
|
||||
{
|
||||
waitingRequests.Enqueue(req);
|
||||
}
|
||||
|
||||
return tasks;
|
||||
foreach (var _ in Enumerable.Range(1, config.SimultaneousConnections))
|
||||
{
|
||||
TransitionRequest();
|
||||
}
|
||||
|
||||
var timeoutTask = Task.Delay(config.Timeout, token);
|
||||
var allTasks = waitingRequests.Union(runRequests).Select(r => r.Task).ToList();
|
||||
|
||||
aggregateNetworkTask = Task.WhenAny(timeoutTask, Task.WhenAll(allTasks));
|
||||
|
||||
aggregateNetworkTask.ContinueWith(t =>
|
||||
{
|
||||
if (timeoutTask.IsCompleted)
|
||||
{
|
||||
throw new TimeoutException($"Timed-out pulling scrobbles, took {config.Timeout}");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private IEnumerable<UserScrobble> GetDbScrobbles()
|
||||
{
|
||||
var currentScrobbles = db.Scrobble.AsEnumerable()
|
||||
.Where(s => s.UserId == config.User.Id);
|
||||
|
||||
if (config.From is not null)
|
||||
{
|
||||
currentScrobbles = currentScrobbles.Where(s => s.Timestamp > config.From);
|
||||
}
|
||||
|
||||
if (config.To is not null)
|
||||
{
|
||||
currentScrobbles = currentScrobbles.Where(s => s.Timestamp < config.To);
|
||||
}
|
||||
|
||||
return currentScrobbles;
|
||||
}
|
||||
|
||||
private IEnumerable<ScrobbleRequest> GetRequestsFromPageNumbers(int start, int totalPages)
|
||||
=> Enumerable.Range(start, totalPages - 1)
|
||||
.Select(n => new ScrobbleRequest(
|
||||
userClient,
|
||||
loggerFactory.CreateLogger<ScrobbleRequest>() ?? NullLogger<ScrobbleRequest>.Instance,
|
||||
config.User.UserName,
|
||||
n,
|
||||
config.PageSize,
|
||||
config.From,
|
||||
config.To));
|
||||
|
||||
private void IdentifyDuplicates(IEnumerable<LastTrack> tracks)
|
||||
{
|
||||
logger.LogDebug("Identifying duplicates");
|
||||
@ -186,5 +237,20 @@ namespace Selector
|
||||
logger.LogInformation("Duplicate at {0}: {1}", dupe.Key, dupeString.ToString());
|
||||
}
|
||||
}
|
||||
|
||||
private bool RemoveNowPlaying(List<LastTrack> scrobbles)
|
||||
{
|
||||
var newestScrobble = scrobbles.FirstOrDefault();
|
||||
if (newestScrobble is not null)
|
||||
{
|
||||
if (newestScrobble.IsNowPlaying is bool playing && playing)
|
||||
{
|
||||
scrobbles.Remove(newestScrobble);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
102
Selector/Scrobble/ScrobbleRequest.cs
Normal file
102
Selector/Scrobble/ScrobbleRequest.cs
Normal file
@ -0,0 +1,102 @@
|
||||
using IF.Lastfm.Core.Api;
|
||||
using IF.Lastfm.Core.Api.Helpers;
|
||||
using IF.Lastfm.Core.Objects;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace Selector
|
||||
{
|
||||
public class ScrobbleRequest
|
||||
{
|
||||
private readonly ILogger<ScrobbleRequest> logger;
|
||||
private readonly IUserApi userClient;
|
||||
|
||||
public event EventHandler Success;
|
||||
|
||||
public int MaxAttempts { get; private set; } = 5;
|
||||
public int Attempts { get; private set; }
|
||||
public List<LastTrack> Scrobbles { get; private set; }
|
||||
public int TotalPages { get; private set; }
|
||||
private Task<PageResponse<LastTrack>> currentTask { get; set; }
|
||||
public bool Succeeded { get; private set; } = false;
|
||||
|
||||
private string username { get; set; }
|
||||
private int pageNumber { get; set; }
|
||||
int pageSize { get; set; }
|
||||
DateTime? from { get; set; }
|
||||
DateTime? to { get; set; }
|
||||
|
||||
private TaskCompletionSource AggregateTaskSource { get; set; } = new();
|
||||
public Task Task => AggregateTaskSource.Task;
|
||||
|
||||
public ScrobbleRequest(IUserApi _userClient, ILogger<ScrobbleRequest> _logger, string _username, int _pageNumber, int _pageSize, DateTime? _from, DateTime? _to)
|
||||
{
|
||||
userClient = _userClient;
|
||||
logger = _logger;
|
||||
|
||||
username = _username;
|
||||
pageNumber = _pageNumber;
|
||||
pageSize = _pageSize;
|
||||
from = _from;
|
||||
to = _to;
|
||||
}
|
||||
|
||||
protected virtual void RaiseSampleEvent()
|
||||
{
|
||||
// Raise the event in a thread-safe manner using the ?. operator.
|
||||
Success?.Invoke(this, new EventArgs());
|
||||
}
|
||||
|
||||
public Task Execute()
|
||||
{
|
||||
logger.LogInformation("Scrobble request #{} for {} by {} from {} to {}", pageNumber, username, pageSize, from, to);
|
||||
currentTask = userClient.GetRecentScrobbles(username, pagenumber: pageNumber, count: pageSize, from: from, to: to);
|
||||
currentTask.ContinueWith(t =>
|
||||
{
|
||||
if (t.IsCompletedSuccessfully)
|
||||
{
|
||||
var result = t.Result;
|
||||
Succeeded = result.Success;
|
||||
|
||||
if (Succeeded)
|
||||
{
|
||||
Scrobbles = result.Content.ToList();
|
||||
TotalPages = result.TotalPages;
|
||||
OnSuccess();
|
||||
AggregateTaskSource.SetResult();
|
||||
}
|
||||
else
|
||||
{
|
||||
if(Attempts < MaxAttempts)
|
||||
{
|
||||
logger.LogDebug("Request failed for {}, #{} by {}: {}, retrying ({} of {})", username, pageNumber, pageSize, result.Status, Attempts + 1, MaxAttempts);
|
||||
Execute();
|
||||
}
|
||||
else
|
||||
{
|
||||
logger.LogDebug("Request failed for {}, #{} by {}: {}, max retries exceeded {}, not retrying", username, pageNumber, pageSize, result.Status, MaxAttempts);
|
||||
AggregateTaskSource.SetCanceled();
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
logger.LogError("Scrobble request task faulted, {}", t.Exception);
|
||||
AggregateTaskSource.SetException(t.Exception);
|
||||
}
|
||||
});
|
||||
|
||||
Attempts++;
|
||||
return Task;
|
||||
}
|
||||
|
||||
protected virtual void OnSuccess()
|
||||
{
|
||||
// Raise the event in a thread-safe manner using the ?. operator.
|
||||
Success?.Invoke(this, new EventArgs());
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user