diff --git a/Selector.CLI/Command/Scrobble/ScrobbleSave.cs b/Selector.CLI/Command/Scrobble/ScrobbleSave.cs index a507436..5c85666 100644 --- a/Selector.CLI/Command/Scrobble/ScrobbleSave.cs +++ b/Selector.CLI/Command/Scrobble/ScrobbleSave.cs @@ -30,6 +30,10 @@ namespace Selector.CLI delayOption.AddAlias("-d"); AddOption(delayOption); + var simulOption = new Option("--simultaneous", getDefaultValue: () => 5, "simultaneous connections when pulling"); + simulOption.AddAlias("-s"); + AddOption(simulOption); + var username = new Option("--username", "user to pulls scrobbles for"); username.AddAlias("-u"); AddOption(username); @@ -42,10 +46,10 @@ namespace Selector.CLI dontRemove.AddAlias("-nr"); AddOption(dontRemove); - Handler = CommandHandler.Create(async (DateTime from, DateTime to, int page, int delay, string username, bool noAdd, bool noRemove, CancellationToken token) => await Execute(from, to, page, delay, username, noAdd, noRemove, token)); + Handler = CommandHandler.Create(async (DateTime from, DateTime to, int page, int delay, int simul, string username, bool noAdd, bool noRemove, CancellationToken token) => await Execute(from, to, page, delay, simul, username, noAdd, noRemove, token)); } - public static async Task Execute(DateTime from, DateTime to, int page, int delay, string username, bool noAdd, bool noRemove, CancellationToken token) + public static async Task Execute(DateTime from, DateTime to, int page, int delay, int simul, string username, bool noAdd, bool noRemove, CancellationToken token) { try { @@ -75,10 +79,12 @@ namespace Selector.CLI To = to, PageSize = page, DontAdd = noAdd, - DontRemove = noRemove + DontRemove = noRemove, + SimultaneousConnections = simul }, db, - context.Logger.CreateLogger()) + context.Logger.CreateLogger(), + context.Logger) .Execute(token); } else diff --git a/Selector.CLI/ScrobbleSaver.cs b/Selector.CLI/ScrobbleSaver.cs index 2598252..094a645 100644 --- a/Selector.CLI/ScrobbleSaver.cs +++ b/Selector.CLI/ScrobbleSaver.cs @@ -1,10 +1,11 @@ using IF.Lastfm.Core.Api; using IF.Lastfm.Core.Api.Helpers; using IF.Lastfm.Core.Objects; -using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; using Selector.Model; using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Linq; @@ -18,10 +19,12 @@ namespace Selector { public ApplicationUser User { get; set; } public TimeSpan InterRequestDelay { get; set; } + public TimeSpan Timeout { get; set; } = new TimeSpan(0, 20, 0); public DateTime? From { get; set; } public DateTime? To { get; set; } public int PageSize { get; set; } = 100; public int Retries { get; set; } = 5; + public int SimultaneousConnections { get; set; } = 3; public bool DontAdd { get; set; } = false; public bool DontRemove { get; set; } = false; } @@ -29,85 +32,77 @@ namespace Selector public class ScrobbleSaver { private readonly ILogger logger; + private readonly ILoggerFactory loggerFactory; private readonly IUserApi userClient; private readonly ScrobbleSaverConfig config; + private CancellationToken _token; + private Task aggregateNetworkTask; + private readonly ApplicationDbContext db; - public ScrobbleSaver(IUserApi _userClient, ScrobbleSaverConfig _config, ApplicationDbContext _db, ILogger _logger) + private ConcurrentQueue waitingRequests = new(); + private ConcurrentQueue runRequests = new(); + + public ScrobbleSaver(IUserApi _userClient, ScrobbleSaverConfig _config, ApplicationDbContext _db, ILogger _logger, ILoggerFactory _loggerFactory = null) { userClient = _userClient; config = _config; db = _db; logger = _logger; + loggerFactory = _loggerFactory; } public async Task Execute(CancellationToken token) { logger.LogInformation("Saving scrobbles for {0}/{1}", config.User.UserName, config.User.LastFmUsername); + _token = token; - var page1 = await userClient.GetRecentScrobbles(config.User.LastFmUsername, count: config.PageSize, from: config.From, to: config.To); + var page1 = new ScrobbleRequest(userClient, + loggerFactory?.CreateLogger() ?? NullLogger.Instance, + config.User.UserName, + 1, + config.PageSize, + config.From, config.To); - if(page1.Success) + await page1.Execute(); + runRequests.Enqueue(page1); + + if (page1.Succeeded) { - var scrobbles = page1.Content.ToList(); - if (page1.TotalPages > 1) { - var tasks = await GetScrobblesFromPageNumbers(2, page1.TotalPages, token); - var taskResults = await Task.WhenAll(tasks); - - foreach (var result in taskResults) - { - if (result.Success) - { - scrobbles.AddRange(result.Content); - } - else - { - logger.LogWarning("Failed to get a subset of scrobbles for {0}/{1}", config.User.UserName, config.User.LastFmUsername); - } - } + TriggerNetworkRequests(page1.TotalPages, token); } + logger.LogDebug("Pulling currently stored scrobbles"); + + var currentScrobblesPulled = GetDbScrobbles(); + + await aggregateNetworkTask; + var scrobbles = runRequests.SelectMany(r => r.Scrobbles); + IdentifyDuplicates(scrobbles); logger.LogDebug("Ordering and filtering pulled scrobbles"); + RemoveNowPlaying(scrobbles.ToList()); + var nativeScrobbles = scrobbles .DistinctBy(s => s.TimePlayed?.UtcDateTime) .Select(s => { - var nativeScrobble = (UserScrobble) s; + var nativeScrobble = (UserScrobble)s; nativeScrobble.UserId = config.User.Id; return nativeScrobble; }); - logger.LogDebug("Pulling currently stored scrobbles"); - - var currentScrobbles = db.Scrobble - .AsEnumerable() - .Where(s => s.UserId == config.User.Id); - - if (config.From is not null) - { - currentScrobbles = currentScrobbles.Where(s => s.Timestamp > config.From); - } - - if (config.To is not null) - { - currentScrobbles = currentScrobbles.Where(s => s.Timestamp < config.To); - } - - logger.LogInformation("Completed scrobble pulling for {0}, pulled {1:n0}", config.User.UserName, nativeScrobbles.Count()); + logger.LogInformation("Completed database scrobble pulling for {0}, pulled {1:n0}", config.User.UserName, nativeScrobbles.Count()); logger.LogDebug("Identifying difference sets"); var time = Stopwatch.StartNew(); - (var toAdd, var toRemove) = ScrobbleMatcher.IdentifyDiffs(currentScrobbles, nativeScrobbles); - - var toAddUser = toAdd.Cast().ToList(); - var toRemoveUser = toRemove.Cast().ToList(); + (var toAdd, var toRemove) = ScrobbleMatcher.IdentifyDiffs(currentScrobblesPulled, nativeScrobbles); time.Stop(); logger.LogTrace("Finished diffing: {0:n}ms", time.ElapsedMilliseconds); @@ -116,26 +111,26 @@ namespace Selector if(!config.DontAdd) { - await db.Scrobble.AddRangeAsync(toAddUser); + await db.Scrobble.AddRangeAsync(toAdd.Cast()); } else { - logger.LogInformation("Skipping adding of {0} scrobbles", toAddUser.Count); + logger.LogInformation("Skipping adding of {0} scrobbles", toAdd.Count()); } if (!config.DontRemove) { - db.Scrobble.RemoveRange(toRemoveUser); + db.Scrobble.RemoveRange(toRemove.Cast()); } else { - logger.LogInformation("Skipping removal of {0} scrobbles", toRemoveUser.Count); + logger.LogInformation("Skipping removal of {0} scrobbles", toRemove.Count()); } await db.SaveChangesAsync(); timeDbOps.Stop(); logger.LogTrace("DB ops: {0:n}ms", timeDbOps.ElapsedMilliseconds); - logger.LogInformation("Completed scrobble pulling for {0}, +{1:n0}, -{2:n0}", config.User.UserName, toAddUser.Count(), toRemoveUser.Count()); + logger.LogInformation("Completed scrobble pulling for {0}, +{1:n0}, -{2:n0}", config.User.UserName, toAdd.Count(), toRemove.Count()); } else { @@ -143,21 +138,77 @@ namespace Selector } } - private async Task>>> GetScrobblesFromPageNumbers(int start, int totalPages, CancellationToken token) + private async void HandleSuccessfulRequest(object o, EventArgs e) { - var tasks = new List>>(); + await Task.Delay(config.InterRequestDelay, _token); + TransitionRequest(); + } - foreach (var pageNumber in Enumerable.Range(start, totalPages - 1)) + private void TransitionRequest() + { + if (waitingRequests.TryDequeue(out var request)) { - logger.LogInformation("Pulling page {2:n0}/{3:n0} for {0}/{1}", config.User.UserName, config.User.LastFmUsername, pageNumber, totalPages); - - tasks.Add(userClient.GetRecentScrobbles(config.User.LastFmUsername, pagenumber: pageNumber, count: config.PageSize, from: config.From, to: config.To)); - await Task.Delay(config.InterRequestDelay, token); + request.Success += HandleSuccessfulRequest; + _ = request.Execute(); + runRequests.Enqueue(request); + } + } + + private void TriggerNetworkRequests(int totalPages, CancellationToken token) + { + foreach (var req in GetRequestsFromPageNumbers(2, totalPages)) + { + waitingRequests.Enqueue(req); } - return tasks; + foreach (var _ in Enumerable.Range(1, config.SimultaneousConnections)) + { + TransitionRequest(); + } + + var timeoutTask = Task.Delay(config.Timeout, token); + var allTasks = waitingRequests.Union(runRequests).Select(r => r.Task).ToList(); + + aggregateNetworkTask = Task.WhenAny(timeoutTask, Task.WhenAll(allTasks)); + + aggregateNetworkTask.ContinueWith(t => + { + if (timeoutTask.IsCompleted) + { + throw new TimeoutException($"Timed-out pulling scrobbles, took {config.Timeout}"); + } + }); } + private IEnumerable GetDbScrobbles() + { + var currentScrobbles = db.Scrobble.AsEnumerable() + .Where(s => s.UserId == config.User.Id); + + if (config.From is not null) + { + currentScrobbles = currentScrobbles.Where(s => s.Timestamp > config.From); + } + + if (config.To is not null) + { + currentScrobbles = currentScrobbles.Where(s => s.Timestamp < config.To); + } + + return currentScrobbles; + } + + private IEnumerable GetRequestsFromPageNumbers(int start, int totalPages) + => Enumerable.Range(start, totalPages - 1) + .Select(n => new ScrobbleRequest( + userClient, + loggerFactory.CreateLogger() ?? NullLogger.Instance, + config.User.UserName, + n, + config.PageSize, + config.From, + config.To)); + private void IdentifyDuplicates(IEnumerable tracks) { logger.LogDebug("Identifying duplicates"); @@ -186,5 +237,20 @@ namespace Selector logger.LogInformation("Duplicate at {0}: {1}", dupe.Key, dupeString.ToString()); } } + + private bool RemoveNowPlaying(List scrobbles) + { + var newestScrobble = scrobbles.FirstOrDefault(); + if (newestScrobble is not null) + { + if (newestScrobble.IsNowPlaying is bool playing && playing) + { + scrobbles.Remove(newestScrobble); + return true; + } + } + + return false; + } } } diff --git a/Selector/Scrobble/ScrobbleRequest.cs b/Selector/Scrobble/ScrobbleRequest.cs new file mode 100644 index 0000000..f8c2d3f --- /dev/null +++ b/Selector/Scrobble/ScrobbleRequest.cs @@ -0,0 +1,102 @@ +using IF.Lastfm.Core.Api; +using IF.Lastfm.Core.Api.Helpers; +using IF.Lastfm.Core.Objects; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; + +namespace Selector +{ + public class ScrobbleRequest + { + private readonly ILogger logger; + private readonly IUserApi userClient; + + public event EventHandler Success; + + public int MaxAttempts { get; private set; } = 5; + public int Attempts { get; private set; } + public List Scrobbles { get; private set; } + public int TotalPages { get; private set; } + private Task> currentTask { get; set; } + public bool Succeeded { get; private set; } = false; + + private string username { get; set; } + private int pageNumber { get; set; } + int pageSize { get; set; } + DateTime? from { get; set; } + DateTime? to { get; set; } + + private TaskCompletionSource AggregateTaskSource { get; set; } = new(); + public Task Task => AggregateTaskSource.Task; + + public ScrobbleRequest(IUserApi _userClient, ILogger _logger, string _username, int _pageNumber, int _pageSize, DateTime? _from, DateTime? _to) + { + userClient = _userClient; + logger = _logger; + + username = _username; + pageNumber = _pageNumber; + pageSize = _pageSize; + from = _from; + to = _to; + } + + protected virtual void RaiseSampleEvent() + { + // Raise the event in a thread-safe manner using the ?. operator. + Success?.Invoke(this, new EventArgs()); + } + + public Task Execute() + { + logger.LogInformation("Scrobble request #{} for {} by {} from {} to {}", pageNumber, username, pageSize, from, to); + currentTask = userClient.GetRecentScrobbles(username, pagenumber: pageNumber, count: pageSize, from: from, to: to); + currentTask.ContinueWith(t => + { + if (t.IsCompletedSuccessfully) + { + var result = t.Result; + Succeeded = result.Success; + + if (Succeeded) + { + Scrobbles = result.Content.ToList(); + TotalPages = result.TotalPages; + OnSuccess(); + AggregateTaskSource.SetResult(); + } + else + { + if(Attempts < MaxAttempts) + { + logger.LogDebug("Request failed for {}, #{} by {}: {}, retrying ({} of {})", username, pageNumber, pageSize, result.Status, Attempts + 1, MaxAttempts); + Execute(); + } + else + { + logger.LogDebug("Request failed for {}, #{} by {}: {}, max retries exceeded {}, not retrying", username, pageNumber, pageSize, result.Status, MaxAttempts); + AggregateTaskSource.SetCanceled(); + } + } + } + else + { + logger.LogError("Scrobble request task faulted, {}", t.Exception); + AggregateTaskSource.SetException(t.Exception); + } + }); + + Attempts++; + return Task; + } + + protected virtual void OnSuccess() + { + // Raise the event in a thread-safe manner using the ?. operator. + Success?.Invoke(this, new EventArgs()); + } + } +}