From 3dbe9752208995e0fe5acf98f08d9043fb1ddbd2 Mon Sep 17 00:00:00 2001 From: andy Date: Sat, 18 Dec 2021 23:06:21 +0000 Subject: [PATCH] adding web hook consumer, wrapping consumer callbacks in try/catches --- .../Consumer/AudioInjectorCaching.cs | 12 +- .../Consumer/CacheWriterConsumer.cs | 12 +- Selector.Cache/Consumer/PlayCounterCaching.cs | 11 +- Selector.Cache/Consumer/PublisherConsumer.cs | 11 +- Selector/Consumers/AudioFeatureInjector.cs | 11 +- Selector/Consumers/Factory/WebHookFactory.cs | 37 +++++ Selector/Consumers/PlayCounter.cs | 11 +- Selector/Consumers/WebHook.cs | 149 ++++++++++++++++++ Selector/Extensions/ServiceExtensions.cs | 5 +- Selector/Watcher/PlayerWatcher.cs | 2 +- 10 files changed, 252 insertions(+), 9 deletions(-) create mode 100644 Selector/Consumers/Factory/WebHookFactory.cs create mode 100644 Selector/Consumers/WebHook.cs diff --git a/Selector.Cache/Consumer/AudioInjectorCaching.cs b/Selector.Cache/Consumer/AudioInjectorCaching.cs index 9f5e4a6..afe67b9 100644 --- a/Selector.Cache/Consumer/AudioInjectorCaching.cs +++ b/Selector.Cache/Consumer/AudioInjectorCaching.cs @@ -30,7 +30,17 @@ namespace Selector.Cache public void CacheCallback(object sender, AnalysedTrack e) { - Task.Run(async () => { await AsyncCacheCallback(e); }, CancelToken); + Task.Run(async () => + { + try + { + await AsyncCacheCallback(e); + } + catch (Exception e) + { + Logger.LogError(e, "Error occured during callback"); + } + }, CancelToken); } public async Task AsyncCacheCallback(AnalysedTrack e) diff --git a/Selector.Cache/Consumer/CacheWriterConsumer.cs b/Selector.Cache/Consumer/CacheWriterConsumer.cs index 5baff09..6358c1c 100644 --- a/Selector.Cache/Consumer/CacheWriterConsumer.cs +++ b/Selector.Cache/Consumer/CacheWriterConsumer.cs @@ -35,7 +35,17 @@ namespace Selector.Cache { if (e.Current is null) return; - Task.Run(async () => { await AsyncCallback(e); }, CancelToken); + Task.Run(async () => { + try + { + await AsyncCallback(e); + } + catch (Exception e) + { + Logger.LogError(e, "Error occured during callback"); + } + + }, CancelToken); } public async Task AsyncCallback(ListeningChangeEventArgs e) diff --git a/Selector.Cache/Consumer/PlayCounterCaching.cs b/Selector.Cache/Consumer/PlayCounterCaching.cs index d098d02..3b85ee5 100644 --- a/Selector.Cache/Consumer/PlayCounterCaching.cs +++ b/Selector.Cache/Consumer/PlayCounterCaching.cs @@ -37,7 +37,16 @@ namespace Selector.Cache public void CacheCallback(object sender, PlayCount e) { - Task.Run(async () => { await AsyncCacheCallback(e); }, CancelToken); + Task.Run(async () => { + try + { + await AsyncCacheCallback(e); + } + catch (Exception e) + { + Logger.LogError(e, "Error occured during callback"); + } + }, CancelToken); } public async Task AsyncCacheCallback(PlayCount e) diff --git a/Selector.Cache/Consumer/PublisherConsumer.cs b/Selector.Cache/Consumer/PublisherConsumer.cs index 996aff7..1b25158 100644 --- a/Selector.Cache/Consumer/PublisherConsumer.cs +++ b/Selector.Cache/Consumer/PublisherConsumer.cs @@ -34,7 +34,16 @@ namespace Selector.Cache { if (e.Current is null) return; - Task.Run(async () => { await AsyncCallback(e); }, CancelToken); + Task.Run(async () => { + try + { + await AsyncCallback(e); + } + catch (Exception e) + { + Logger.LogError(e, "Error occured during callback"); + } + }, CancelToken); } public async Task AsyncCallback(ListeningChangeEventArgs e) diff --git a/Selector/Consumers/AudioFeatureInjector.cs b/Selector/Consumers/AudioFeatureInjector.cs index f51b9c6..3ea72ce 100644 --- a/Selector/Consumers/AudioFeatureInjector.cs +++ b/Selector/Consumers/AudioFeatureInjector.cs @@ -37,7 +37,16 @@ namespace Selector { if (e.Current is null) return; - Task.Run(async () => { await AsyncCallback(e); }, CancelToken); + Task.Run(async () => { + try + { + await AsyncCallback(e); + } + catch (Exception e) + { + Logger.LogError(e, "Error occured during callback"); + } + }, CancelToken); } public async Task AsyncCallback(ListeningChangeEventArgs e) diff --git a/Selector/Consumers/Factory/WebHookFactory.cs b/Selector/Consumers/Factory/WebHookFactory.cs new file mode 100644 index 0000000..ac971cb --- /dev/null +++ b/Selector/Consumers/Factory/WebHookFactory.cs @@ -0,0 +1,37 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; + +using System.Net.Http; + +namespace Selector +{ + public interface IWebHookFactory + { + public Task Get(WebHookConfig config, IPlayerWatcher watcher = null); + } + + public class WebHookFactory: IWebHookFactory + { + private readonly ILoggerFactory LoggerFactory; + private readonly HttpClient Http; + + public WebHookFactory(ILoggerFactory loggerFactory, HttpClient httpClient) + { + LoggerFactory = loggerFactory; + Http = httpClient; + } + + public Task Get(WebHookConfig config, IPlayerWatcher watcher = null) + { + return Task.FromResult(new WebHook( + watcher, + Http, + config, + LoggerFactory.CreateLogger() + )); + } + } +} diff --git a/Selector/Consumers/PlayCounter.cs b/Selector/Consumers/PlayCounter.cs index 90290eb..b532cc2 100644 --- a/Selector/Consumers/PlayCounter.cs +++ b/Selector/Consumers/PlayCounter.cs @@ -52,7 +52,16 @@ namespace Selector { if (e.Current is null) return; - Task.Run(async () => { await AsyncCallback(e); }, CancelToken); + Task.Run(async () => { + try + { + await AsyncCallback(e); + } + catch (Exception e) + { + Logger.LogError(e, "Error occured during callback"); + } + }, CancelToken); } public async Task AsyncCallback(ListeningChangeEventArgs e) diff --git a/Selector/Consumers/WebHook.cs b/Selector/Consumers/WebHook.cs new file mode 100644 index 0000000..01d2de0 --- /dev/null +++ b/Selector/Consumers/WebHook.cs @@ -0,0 +1,149 @@ +using System; +using System.Net.Http; +using System.Linq; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; + +namespace Selector +{ + public class WebHookConfig + { + public string Name { get; set; } + public IEnumerable> Predicates { get; set; } + public string Url { get; set; } + public HttpContent Content { get; set; } + + public bool ShouldRequest(ListeningChangeEventArgs e) + { + if(Predicates is not null) + { + return Predicates.Select(p => p(e)).Aggregate((a, b) => a && b); + } + else + { + return true; + } + } + } + + public class WebHook : IConsumer + { + protected readonly IPlayerWatcher Watcher; + protected readonly HttpClient HttpClient; + protected readonly ILogger Logger; + + protected readonly WebHookConfig Config; + + protected event EventHandler PredicatePass; + protected event EventHandler SuccessfulRequest; + protected event EventHandler FailedRequest; + + public CancellationToken CancelToken { get; set; } + + public AnalysedTrackTimeline Timeline { get; set; } = new(); + + public WebHook( + IPlayerWatcher watcher, + HttpClient httpClient, + WebHookConfig config, + ILogger logger = null, + CancellationToken token = default + ) + { + Watcher = watcher; + HttpClient = httpClient; + Config = config; + Logger = logger ?? NullLogger.Instance; + CancelToken = token; + } + + public void Callback(object sender, ListeningChangeEventArgs e) + { + if (e.Current is null) return; + + Task.Run(async () => { + try + { + await AsyncCallback(e); + } + catch (Exception e) + { + Logger.LogError(e, "Error occured during callback"); + } + }, CancelToken); + } + + public async Task AsyncCallback(ListeningChangeEventArgs e) + { + if(Config.ShouldRequest(e)) + { + Logger.LogDebug("[{name}] predicate passed, making request to [{url}]", Config.Name, Config.Url); + var response = await HttpClient.PostAsync(Config.Url, Config.Content, CancelToken); + + OnPredicatePass(new EventArgs()); + + if (response.IsSuccessStatusCode) + { + Logger.LogDebug("[{name}] request success", Config.Name); + OnSuccessfulRequest(new EventArgs()); + } + else + { + Logger.LogDebug("[{name}] request failed [{error}] [{content}]", Config.Name, response.StatusCode, response.Content); + OnFailedRequest(new EventArgs()); + } + } + else + { + Logger.LogTrace("[{name}] predicate failed, skipping", Config.Name); + } + } + + public void Subscribe(IWatcher watch = null) + { + var watcher = watch ?? Watcher ?? throw new ArgumentNullException("No watcher provided"); + + if (watcher is IPlayerWatcher watcherCast) + { + watcherCast.ItemChange += Callback; + } + else + { + throw new ArgumentException("Provided watcher is not a PlayerWatcher"); + } + } + + public void Unsubscribe(IWatcher watch = null) + { + var watcher = watch ?? Watcher ?? throw new ArgumentNullException("No watcher provided"); + + if (watcher is IPlayerWatcher watcherCast) + { + watcherCast.ItemChange -= Callback; + } + else + { + throw new ArgumentException("Provided watcher is not a PlayerWatcher"); + } + } + + protected virtual void OnPredicatePass(EventArgs args) + { + PredicatePass?.Invoke(this, args); + } + + protected virtual void OnSuccessfulRequest(EventArgs args) + { + SuccessfulRequest?.Invoke(this, args); + } + + protected virtual void OnFailedRequest(EventArgs args) + { + FailedRequest?.Invoke(this, args); + } + } +} diff --git a/Selector/Extensions/ServiceExtensions.cs b/Selector/Extensions/ServiceExtensions.cs index e322422..4edc3d3 100644 --- a/Selector/Extensions/ServiceExtensions.cs +++ b/Selector/Extensions/ServiceExtensions.cs @@ -16,6 +16,9 @@ namespace Selector.Extensions services.AddTransient(); services.AddTransient(); + + services.AddTransient(); + services.AddTransient(); } public static void AddSpotify(this IServiceCollection services) @@ -39,8 +42,6 @@ namespace Selector.Extensions services.AddTransient(sp => sp.GetService().Chart); services.AddTransient(sp => sp.GetService().Library); services.AddTransient(sp => sp.GetService().Tag); - - } public static void AddWatcher(this IServiceCollection services) diff --git a/Selector/Watcher/PlayerWatcher.cs b/Selector/Watcher/PlayerWatcher.cs index 24ac44f..d73bee2 100644 --- a/Selector/Watcher/PlayerWatcher.cs +++ b/Selector/Watcher/PlayerWatcher.cs @@ -82,7 +82,7 @@ namespace Selector catch(APITooManyRequestsException e) { Logger.LogDebug($"Too many requests error: [{e.Message}]"); - await Task.Delay(e.RetryAfter); + await Task.Delay(e.RetryAfter, token); // throw e; } catch(APIException e)