adding web hook consumer, wrapping consumer callbacks in try/catches

This commit is contained in:
andy 2021-12-18 23:06:21 +00:00
parent a263941e97
commit 3dbe975220
10 changed files with 252 additions and 9 deletions

View File

@ -30,7 +30,17 @@ namespace Selector.Cache
public void CacheCallback(object sender, AnalysedTrack e)
{
Task.Run(async () => { await AsyncCacheCallback(e); }, CancelToken);
Task.Run(async () =>
{
try
{
await AsyncCacheCallback(e);
}
catch (Exception e)
{
Logger.LogError(e, "Error occured during callback");
}
}, CancelToken);
}
public async Task AsyncCacheCallback(AnalysedTrack e)

View File

@ -35,7 +35,17 @@ namespace Selector.Cache
{
if (e.Current is null) return;
Task.Run(async () => { await AsyncCallback(e); }, CancelToken);
Task.Run(async () => {
try
{
await AsyncCallback(e);
}
catch (Exception e)
{
Logger.LogError(e, "Error occured during callback");
}
}, CancelToken);
}
public async Task AsyncCallback(ListeningChangeEventArgs e)

View File

@ -37,7 +37,16 @@ namespace Selector.Cache
public void CacheCallback(object sender, PlayCount e)
{
Task.Run(async () => { await AsyncCacheCallback(e); }, CancelToken);
Task.Run(async () => {
try
{
await AsyncCacheCallback(e);
}
catch (Exception e)
{
Logger.LogError(e, "Error occured during callback");
}
}, CancelToken);
}
public async Task AsyncCacheCallback(PlayCount e)

View File

@ -34,7 +34,16 @@ namespace Selector.Cache
{
if (e.Current is null) return;
Task.Run(async () => { await AsyncCallback(e); }, CancelToken);
Task.Run(async () => {
try
{
await AsyncCallback(e);
}
catch (Exception e)
{
Logger.LogError(e, "Error occured during callback");
}
}, CancelToken);
}
public async Task AsyncCallback(ListeningChangeEventArgs e)

View File

@ -37,7 +37,16 @@ namespace Selector
{
if (e.Current is null) return;
Task.Run(async () => { await AsyncCallback(e); }, CancelToken);
Task.Run(async () => {
try
{
await AsyncCallback(e);
}
catch (Exception e)
{
Logger.LogError(e, "Error occured during callback");
}
}, CancelToken);
}
public async Task AsyncCallback(ListeningChangeEventArgs e)

View File

@ -0,0 +1,37 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using System.Net.Http;
namespace Selector
{
public interface IWebHookFactory
{
public Task<WebHook> Get(WebHookConfig config, IPlayerWatcher watcher = null);
}
public class WebHookFactory: IWebHookFactory
{
private readonly ILoggerFactory LoggerFactory;
private readonly HttpClient Http;
public WebHookFactory(ILoggerFactory loggerFactory, HttpClient httpClient)
{
LoggerFactory = loggerFactory;
Http = httpClient;
}
public Task<WebHook> Get(WebHookConfig config, IPlayerWatcher watcher = null)
{
return Task.FromResult(new WebHook(
watcher,
Http,
config,
LoggerFactory.CreateLogger<WebHook>()
));
}
}
}

View File

@ -52,7 +52,16 @@ namespace Selector
{
if (e.Current is null) return;
Task.Run(async () => { await AsyncCallback(e); }, CancelToken);
Task.Run(async () => {
try
{
await AsyncCallback(e);
}
catch (Exception e)
{
Logger.LogError(e, "Error occured during callback");
}
}, CancelToken);
}
public async Task AsyncCallback(ListeningChangeEventArgs e)

View File

@ -0,0 +1,149 @@
using System;
using System.Net.Http;
using System.Linq;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
namespace Selector
{
public class WebHookConfig
{
public string Name { get; set; }
public IEnumerable<Predicate<ListeningChangeEventArgs>> Predicates { get; set; }
public string Url { get; set; }
public HttpContent Content { get; set; }
public bool ShouldRequest(ListeningChangeEventArgs e)
{
if(Predicates is not null)
{
return Predicates.Select(p => p(e)).Aggregate((a, b) => a && b);
}
else
{
return true;
}
}
}
public class WebHook : IConsumer
{
protected readonly IPlayerWatcher Watcher;
protected readonly HttpClient HttpClient;
protected readonly ILogger<WebHook> Logger;
protected readonly WebHookConfig Config;
protected event EventHandler PredicatePass;
protected event EventHandler SuccessfulRequest;
protected event EventHandler FailedRequest;
public CancellationToken CancelToken { get; set; }
public AnalysedTrackTimeline Timeline { get; set; } = new();
public WebHook(
IPlayerWatcher watcher,
HttpClient httpClient,
WebHookConfig config,
ILogger<WebHook> logger = null,
CancellationToken token = default
)
{
Watcher = watcher;
HttpClient = httpClient;
Config = config;
Logger = logger ?? NullLogger<WebHook>.Instance;
CancelToken = token;
}
public void Callback(object sender, ListeningChangeEventArgs e)
{
if (e.Current is null) return;
Task.Run(async () => {
try
{
await AsyncCallback(e);
}
catch (Exception e)
{
Logger.LogError(e, "Error occured during callback");
}
}, CancelToken);
}
public async Task AsyncCallback(ListeningChangeEventArgs e)
{
if(Config.ShouldRequest(e))
{
Logger.LogDebug("[{name}] predicate passed, making request to [{url}]", Config.Name, Config.Url);
var response = await HttpClient.PostAsync(Config.Url, Config.Content, CancelToken);
OnPredicatePass(new EventArgs());
if (response.IsSuccessStatusCode)
{
Logger.LogDebug("[{name}] request success", Config.Name);
OnSuccessfulRequest(new EventArgs());
}
else
{
Logger.LogDebug("[{name}] request failed [{error}] [{content}]", Config.Name, response.StatusCode, response.Content);
OnFailedRequest(new EventArgs());
}
}
else
{
Logger.LogTrace("[{name}] predicate failed, skipping", Config.Name);
}
}
public void Subscribe(IWatcher watch = null)
{
var watcher = watch ?? Watcher ?? throw new ArgumentNullException("No watcher provided");
if (watcher is IPlayerWatcher watcherCast)
{
watcherCast.ItemChange += Callback;
}
else
{
throw new ArgumentException("Provided watcher is not a PlayerWatcher");
}
}
public void Unsubscribe(IWatcher watch = null)
{
var watcher = watch ?? Watcher ?? throw new ArgumentNullException("No watcher provided");
if (watcher is IPlayerWatcher watcherCast)
{
watcherCast.ItemChange -= Callback;
}
else
{
throw new ArgumentException("Provided watcher is not a PlayerWatcher");
}
}
protected virtual void OnPredicatePass(EventArgs args)
{
PredicatePass?.Invoke(this, args);
}
protected virtual void OnSuccessfulRequest(EventArgs args)
{
SuccessfulRequest?.Invoke(this, args);
}
protected virtual void OnFailedRequest(EventArgs args)
{
FailedRequest?.Invoke(this, args);
}
}
}

View File

@ -16,6 +16,9 @@ namespace Selector.Extensions
services.AddTransient<IPlayCounterFactory, PlayCounterFactory>();
services.AddTransient<PlayCounterFactory>();
services.AddTransient<IWebHookFactory, WebHookFactory>();
services.AddTransient<WebHookFactory>();
}
public static void AddSpotify(this IServiceCollection services)
@ -39,8 +42,6 @@ namespace Selector.Extensions
services.AddTransient<IChartApi>(sp => sp.GetService<LastfmClient>().Chart);
services.AddTransient<ILibraryApi>(sp => sp.GetService<LastfmClient>().Library);
services.AddTransient<ITagApi>(sp => sp.GetService<LastfmClient>().Tag);
}
public static void AddWatcher(this IServiceCollection services)

View File

@ -82,7 +82,7 @@ namespace Selector
catch(APITooManyRequestsException e)
{
Logger.LogDebug($"Too many requests error: [{e.Message}]");
await Task.Delay(e.RetryAfter);
await Task.Delay(e.RetryAfter, token);
// throw e;
}
catch(APIException e)