UserEventFirer, web hook tests

This commit is contained in:
andy 2021-12-20 23:04:53 +00:00
parent 8174f9f6f6
commit 7a56f7f586
14 changed files with 454 additions and 161 deletions

View File

@ -30,6 +30,8 @@ namespace Selector.CLI
private readonly IAudioFeatureInjectorFactory AudioFeatureInjectorFactory;
private readonly IPlayCounterFactory PlayCounterFactory;
private readonly IUserEventFirerFactory UserEventFirerFactory;
private readonly IPublisherFactory PublisherFactory;
private readonly ICacheWriterFactory CacheWriterFactory;
private ConcurrentDictionary<string, IWatcherCollection> Watchers { get; set; } = new();
@ -48,7 +50,9 @@ namespace Selector.CLI
IServiceProvider serviceProvider,
IPublisherFactory publisherFactory = null,
ICacheWriterFactory cacheWriterFactory = null
ICacheWriterFactory cacheWriterFactory = null,
IUserEventFirerFactory userEventFirerFactory = null
)
{
Logger = logger;
@ -62,6 +66,8 @@ namespace Selector.CLI
AudioFeatureInjectorFactory = audioFeatureInjectorFactory;
PlayCounterFactory = playCounterFactory;
UserEventFirerFactory = userEventFirerFactory;
PublisherFactory = publisherFactory;
CacheWriterFactory = cacheWriterFactory;
}
@ -123,6 +129,8 @@ namespace Selector.CLI
if (CacheWriterFactory is not null) consumers.Add(await CacheWriterFactory.Get());
if (PublisherFactory is not null) consumers.Add(await PublisherFactory.Get());
if (UserEventFirerFactory is not null) consumers.Add(await UserEventFirerFactory.Get());
if (!string.IsNullOrWhiteSpace(dbWatcher.User.LastFmUsername))
{
consumers.Add(await PlayCounterFactory.Get(creds: new() { Username = dbWatcher.User.LastFmUsername }));

View File

@ -125,8 +125,8 @@ namespace Selector.CLI
services.AddRedisServices(config.RedisOptions.ConnectionString);
Console.WriteLine("> Adding cache event maps...");
services.AddTransient<IEventMapping, SpotifyLinkFromCacheMapping>();
services.AddTransient<IEventMapping, LastfmFromCacheMapping>();
services.AddTransient<IEventMapping, FromPubSub.SpotifyLink>();
services.AddTransient<IEventMapping, FromPubSub.Lastfm>();
Console.WriteLine("> Adding caching Spotify consumers...");
services.AddCachingSpotify();

View File

@ -14,74 +14,81 @@ namespace Selector.Events
public string NewUsername { get; set; }
}
public class LastfmFromCacheMapping : IEventMapping
public partial class FromPubSub
{
private readonly ILogger<LastfmFromCacheMapping> Logger;
private readonly ISubscriber Subscriber;
private readonly UserEventBus UserEvent;
public LastfmFromCacheMapping(ILogger<LastfmFromCacheMapping> logger,
ISubscriber subscriber,
UserEventBus userEvent)
public class Lastfm : IEventMapping
{
Logger = logger;
Subscriber = subscriber;
UserEvent = userEvent;
}
private readonly ILogger<Lastfm> Logger;
private readonly ISubscriber Subscriber;
private readonly UserEventBus UserEvent;
public async Task ConstructMapping()
{
Logger.LogDebug("Forming Last.fm username event mapping FROM cache TO event bus");
public Lastfm(ILogger<Lastfm> logger,
ISubscriber subscriber,
UserEventBus userEvent)
{
Logger = logger;
Subscriber = subscriber;
UserEvent = userEvent;
}
(await Subscriber.SubscribeAsync(Key.AllUserLastfm)).OnMessage(message => {
try{
var userId = Key.Param(message.Channel);
public async Task ConstructMapping()
{
Logger.LogDebug("Forming Last.fm username event mapping FROM cache TO event bus");
var deserialised = JsonSerializer.Deserialize<LastfmChange>(message.Message);
Logger.LogDebug("Received new Last.fm username event for [{userId}]", deserialised.UserId);
(await Subscriber.SubscribeAsync(Key.AllUserLastfm)).OnMessage(message => {
if (!userId.Equals(deserialised.UserId))
try
{
Logger.LogWarning("Serialised user ID [{}] does not match cache channel [{}]", userId, deserialised.UserId);
}
var userId = Key.Param(message.Channel);
UserEvent.OnLastfmCredChange(this, deserialised);
}
catch(Exception e)
{
Logger.LogError(e, "Error parsing Last.fm username event");
}
});
var deserialised = JsonSerializer.Deserialize<LastfmChange>(message.Message);
Logger.LogDebug("Received new Last.fm username event for [{userId}]", deserialised.UserId);
if (!userId.Equals(deserialised.UserId))
{
Logger.LogWarning("Serialised user ID [{}] does not match cache channel [{}]", userId, deserialised.UserId);
}
UserEvent.OnLastfmCredChange(this, deserialised);
}
catch (Exception e)
{
Logger.LogError(e, "Error parsing Last.fm username event");
}
});
}
}
}
public class LastfmToCacheMapping : IEventMapping
public partial class ToPubSub
{
private readonly ILogger<LastfmToCacheMapping> Logger;
private readonly ISubscriber Subscriber;
private readonly UserEventBus UserEvent;
public LastfmToCacheMapping(ILogger<LastfmToCacheMapping> logger,
ISubscriber subscriber,
UserEventBus userEvent)
public class Lastfm : IEventMapping
{
Logger = logger;
Subscriber = subscriber;
UserEvent = userEvent;
}
private readonly ILogger<Lastfm> Logger;
private readonly ISubscriber Subscriber;
private readonly UserEventBus UserEvent;
public Task ConstructMapping()
{
Logger.LogDebug("Forming Last.fm username event mapping TO cache FROM event bus");
UserEvent.LastfmCredChange += async (o, e) =>
public Lastfm(ILogger<Lastfm> logger,
ISubscriber subscriber,
UserEventBus userEvent)
{
var payload = JsonSerializer.Serialize(e);
await Subscriber.PublishAsync(Key.UserLastfm(e.UserId), payload);
};
Logger = logger;
Subscriber = subscriber;
UserEvent = userEvent;
}
return Task.CompletedTask;
public Task ConstructMapping()
{
Logger.LogDebug("Forming Last.fm username event mapping TO cache FROM event bus");
UserEvent.LastfmCredChange += async (o, e) =>
{
var payload = JsonSerializer.Serialize(e);
await Subscriber.PublishAsync(Key.UserLastfm(e.UserId), payload);
};
return Task.CompletedTask;
}
}
}
}

View File

@ -7,40 +7,78 @@ using Selector.Cache;
namespace Selector.Events
{
public class NowPlayingFromCacheMapping : IEventMapping
public partial class FromPubSub
{
private readonly ILogger<NowPlayingFromCacheMapping> Logger;
private readonly ISubscriber Subscriber;
private readonly UserEventBus UserEvent;
public NowPlayingFromCacheMapping(ILogger<NowPlayingFromCacheMapping> logger,
ISubscriber subscriber,
UserEventBus userEvent)
public class NowPlaying : IEventMapping
{
Logger = logger;
Subscriber = subscriber;
UserEvent = userEvent;
private readonly ILogger<NowPlaying> Logger;
private readonly ISubscriber Subscriber;
private readonly UserEventBus UserEvent;
public NowPlaying(ILogger<NowPlaying> logger,
ISubscriber subscriber,
UserEventBus userEvent)
{
Logger = logger;
Subscriber = subscriber;
UserEvent = userEvent;
}
public async Task ConstructMapping()
{
Logger.LogDebug("Forming now playing event mapping between cache and event bus");
(await Subscriber.SubscribeAsync(Key.AllCurrentlyPlaying)).OnMessage(message => {
try
{
var userId = Key.Param(message.Channel);
var deserialised = JsonSerializer.Deserialize<CurrentlyPlayingDTO>(message.Message);
Logger.LogDebug("Received new currently playing [{username}]", deserialised.Username);
UserEvent.OnCurrentlyPlayingChange(this, userId, deserialised);
}
catch (Exception e)
{
Logger.LogError(e, $"Error parsing new currently playing [{message}]");
}
});
}
}
}
public async Task ConstructMapping()
public partial class ToPubSub
{
public class NowPlaying : IEventMapping
{
Logger.LogDebug("Forming now playing event mapping between cache and event bus");
private readonly ILogger<NowPlaying> Logger;
private readonly ISubscriber Subscriber;
private readonly UserEventBus UserEvent;
(await Subscriber.SubscribeAsync(Key.AllCurrentlyPlaying)).OnMessage(message => {
try{
var userId = Key.Param(message.Channel);
public NowPlaying(ILogger<NowPlaying> logger,
ISubscriber subscriber,
UserEventBus userEvent)
{
Logger = logger;
Subscriber = subscriber;
UserEvent = userEvent;
}
var deserialised = JsonSerializer.Deserialize<CurrentlyPlayingDTO>(message.Message);
Logger.LogDebug("Received new currently playing [{username}]", deserialised.Username);
public Task ConstructMapping()
{
Logger.LogDebug("Forming now playing event mapping TO cache FROM event bus");
UserEvent.OnCurrentlyPlayingChange(this, userId, deserialised);
}
catch(Exception e)
UserEvent.CurrentlyPlaying += async (o, e) =>
{
Logger.LogError(e, $"Error parsing new currently playing [{message}]");
}
});
(string id, CurrentlyPlayingDTO args) = e;
var payload = JsonSerializer.Serialize(e);
await Subscriber.PublishAsync(Key.CurrentlyPlaying(id), payload);
};
return Task.CompletedTask;
}
}
}
}

View File

@ -14,78 +14,85 @@ namespace Selector.Events
public bool NewLinkState { get; set; }
}
public class SpotifyLinkFromCacheMapping : IEventMapping
public partial class FromPubSub
{
private readonly ILogger<SpotifyLinkFromCacheMapping> Logger;
private readonly ISubscriber Subscriber;
private readonly UserEventBus UserEvent;
public SpotifyLinkFromCacheMapping(ILogger<SpotifyLinkFromCacheMapping> logger,
ISubscriber subscriber,
UserEventBus userEvent)
public class SpotifyLink : IEventMapping
{
Logger = logger;
Subscriber = subscriber;
UserEvent = userEvent;
}
private readonly ILogger<SpotifyLink> Logger;
private readonly ISubscriber Subscriber;
private readonly UserEventBus UserEvent;
public async Task ConstructMapping()
{
Logger.LogDebug("Forming Spotify link event mapping FROM cache TO event bus");
public SpotifyLink(ILogger<SpotifyLink> logger,
ISubscriber subscriber,
UserEventBus userEvent)
{
Logger = logger;
Subscriber = subscriber;
UserEvent = userEvent;
}
(await Subscriber.SubscribeAsync(Key.AllUserSpotify)).OnMessage(message => {
try{
var userId = Key.Param(message.Channel);
public async Task ConstructMapping()
{
Logger.LogDebug("Forming Spotify link event mapping FROM cache TO event bus");
var deserialised = JsonSerializer.Deserialize<SpotifyLinkChange>(message.Message);
Logger.LogDebug("Received new Spotify link event for [{userId}]", deserialised.UserId);
(await Subscriber.SubscribeAsync(Key.AllUserSpotify)).OnMessage(message => {
if (!userId.Equals(deserialised.UserId))
try
{
Logger.LogWarning("Serialised user ID [{}] does not match cache channel [{}]", userId, deserialised.UserId);
}
var userId = Key.Param(message.Channel);
UserEvent.OnSpotifyLinkChange(this, deserialised);
}
catch(TaskCanceledException)
{
Logger.LogDebug("Task Cancelled");
}
catch(Exception e)
{
Logger.LogError(e, "Error parsing new Spotify link event");
}
});
var deserialised = JsonSerializer.Deserialize<SpotifyLinkChange>(message.Message);
Logger.LogDebug("Received new Spotify link event for [{userId}]", deserialised.UserId);
if (!userId.Equals(deserialised.UserId))
{
Logger.LogWarning("Serialised user ID [{}] does not match cache channel [{}]", userId, deserialised.UserId);
}
UserEvent.OnSpotifyLinkChange(this, deserialised);
}
catch (TaskCanceledException)
{
Logger.LogDebug("Task Cancelled");
}
catch (Exception e)
{
Logger.LogError(e, "Error parsing new Spotify link event");
}
});
}
}
}
public class SpotifyLinkToCacheMapping : IEventMapping
public partial class ToPubSub
{
private readonly ILogger<SpotifyLinkToCacheMapping> Logger;
private readonly ISubscriber Subscriber;
private readonly UserEventBus UserEvent;
public SpotifyLinkToCacheMapping(ILogger<SpotifyLinkToCacheMapping> logger,
ISubscriber subscriber,
UserEventBus userEvent)
public class SpotifyLink : IEventMapping
{
Logger = logger;
Subscriber = subscriber;
UserEvent = userEvent;
}
private readonly ILogger<SpotifyLink> Logger;
private readonly ISubscriber Subscriber;
private readonly UserEventBus UserEvent;
public Task ConstructMapping()
{
Logger.LogDebug("Forming Spotify link event mapping TO cache FROM event bus");
UserEvent.SpotifyLinkChange += async (o, e) =>
public SpotifyLink(ILogger<SpotifyLink> logger,
ISubscriber subscriber,
UserEventBus userEvent)
{
var payload = JsonSerializer.Serialize(e);
await Subscriber.PublishAsync(Key.UserSpotify(e.UserId), payload);
};
Logger = logger;
Subscriber = subscriber;
UserEvent = userEvent;
}
return Task.CompletedTask;
public Task ConstructMapping()
{
Logger.LogDebug("Forming Spotify link event mapping TO cache FROM event bus");
UserEvent.SpotifyLinkChange += async (o, e) =>
{
var payload = JsonSerializer.Serialize(e);
await Subscriber.PublishAsync(Key.UserSpotify(e.UserId), payload);
};
return Task.CompletedTask;
}
}
}
}

View File

@ -0,0 +1,83 @@
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Selector.Events;
namespace Selector
{
public class UserEventFirer : IConsumer
{
protected readonly IPlayerWatcher Watcher;
protected readonly ILogger<UserEventFirer> Logger;
protected readonly UserEventBus UserEvent;
public CancellationToken CancelToken { get; set; }
public UserEventFirer(
IPlayerWatcher watcher,
UserEventBus userEvent,
ILogger<UserEventFirer> logger = null,
CancellationToken token = default
)
{
Watcher = watcher;
UserEvent = userEvent;
Logger = logger ?? NullLogger<UserEventFirer>.Instance;
CancelToken = token;
}
public void Callback(object sender, ListeningChangeEventArgs e)
{
if (e.Current is null) return;
Task.Run(async () => {
try
{
await AsyncCallback(e);
}
catch (Exception e)
{
Logger.LogError(e, "Error occured during callback");
}
}, CancelToken);
}
public Task AsyncCallback(ListeningChangeEventArgs e)
{
Logger.LogDebug("Firing now playing event on user bus [{username}/{userId}]", e.SpotifyUsername, e.Id);
UserEvent.OnCurrentlyPlayingChange(this, e.Id, (CurrentlyPlayingDTO) e);
return Task.CompletedTask;
}
public void Subscribe(IWatcher watch = null)
{
var watcher = watch ?? Watcher ?? throw new ArgumentNullException("No watcher provided");
if (watcher is IPlayerWatcher watcherCast)
{
watcherCast.ItemChange += Callback;
}
else
{
throw new ArgumentException("Provided watcher is not a PlayerWatcher");
}
}
public void Unsubscribe(IWatcher watch = null)
{
var watcher = watch ?? Watcher ?? throw new ArgumentNullException("No watcher provided");
if (watcher is IPlayerWatcher watcherCast)
{
watcherCast.ItemChange -= Callback;
}
else
{
throw new ArgumentException("Provided watcher is not a PlayerWatcher");
}
}
}
}

View File

@ -0,0 +1,35 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Selector.Events;
namespace Selector
{
public interface IUserEventFirerFactory
{
public Task<UserEventFirer> Get(IPlayerWatcher watcher = null);
}
public class UserEventFirerFactory: IUserEventFirerFactory
{
private readonly ILoggerFactory LoggerFactory;
private readonly UserEventBus UserEvent;
public UserEventFirerFactory(ILoggerFactory loggerFactory, UserEventBus userEvent)
{
LoggerFactory = loggerFactory;
UserEvent = userEvent;
}
public Task<UserEventFirer> Get(IPlayerWatcher watcher = null)
{
return Task.FromResult(new UserEventFirer(
watcher,
UserEvent,
LoggerFactory.CreateLogger<UserEventFirer>()
));
}
}
}

View File

@ -8,6 +8,9 @@ namespace Selector.Events
{
services.AddEventBus();
services.AddEventMappingAgent();
services.AddTransient<IUserEventFirerFactory, UserEventFirerFactory>();
services.AddTransient<UserEventFirerFactory>();
}
public static void AddEventBus(this IServiceCollection services)

View File

@ -1,9 +1,4 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging;
using Selector.Model;

View File

@ -0,0 +1,106 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Net.Http;
using Xunit;
using Moq;
using Moq.Protected;
using FluentAssertions;
using System.Net;
using SpotifyAPI.Web;
namespace Selector.Tests
{
public class WebHookTest
{
[Fact(Skip = "Not working atm")]
public async Task TestHttpClientUsed()
{
var msg = new HttpResponseMessage(HttpStatusCode.OK);
var httpHandlerMock = new Mock<HttpMessageHandler>();
httpHandlerMock.Protected()
.Setup<Task<HttpResponseMessage>>("SendAsync", ItExpr.IsAny<HttpRequestMessage>(), ItExpr.IsAny<CancellationToken>())
.ReturnsAsync(msg);
var watcherMock = new Mock<IPlayerWatcher>();
watcherMock.SetupAdd(w => w.ItemChange += It.IsAny<EventHandler<ListeningChangeEventArgs>>());
watcherMock.SetupRemove(w => w.ItemChange -= It.IsAny<EventHandler<ListeningChangeEventArgs>>());
var link = "https://link";
var content = new StringContent("");
var config = new WebHookConfig()
{
Url = link,
Content = content,
};
var http = new HttpClient(httpHandlerMock.Object);
var webHook = new WebHook(watcherMock.Object, http, config);
webHook.Subscribe();
watcherMock.Raise(w => w.ItemChange += null, this, new ListeningChangeEventArgs());
await Task.Delay(100);
httpHandlerMock.Protected().Verify<Task<HttpResponseMessage>>("SendAsync", Times.Once(), ItExpr.IsAny<HttpRequestMessage>(), ItExpr.IsAny<CancellationToken>());
}
[Theory]
[InlineData(200, true, true)]
[InlineData(404, true, false)]
[InlineData(500, true, false)]
public async Task TestEventFiring(int code, bool predicate, bool successful)
{
var msg = new HttpResponseMessage(Enum.Parse<HttpStatusCode>(code.ToString()));
var httpHandlerMock = new Mock<HttpMessageHandler>();
httpHandlerMock.Protected()
.Setup<Task<HttpResponseMessage>>("SendAsync", ItExpr.IsAny<HttpRequestMessage>(), ItExpr.IsAny<CancellationToken>())
.ReturnsAsync(msg);
var watcherMock = new Mock<IPlayerWatcher>();
var link = "https://link";
var content = new StringContent("");
var config = new WebHookConfig()
{
Url = link,
Content = content,
};
var http = new HttpClient(httpHandlerMock.Object);
bool predicateEvent = false, successfulEvent = false, failedEvent = false;
var webHook = new WebHook(watcherMock.Object, http, config);
webHook.PredicatePass += (o, e) =>
{
predicateEvent = predicate;
};
webHook.SuccessfulRequest += (o, e) =>
{
successfulEvent = successful;
};
webHook.FailedRequest += (o, e) =>
{
failedEvent = !successful;
};
await webHook.AsyncCallback(ListeningChangeEventArgs.From(new (), new (), new()));
predicateEvent.Should().Be(predicate);
successfulEvent.Should().Be(successful);
failedEvent.Should().Be(!successful);
}
}
}

View File

@ -5,7 +5,7 @@
ViewData["ActivePage"] = ManageNavPages.LastFm;
}
<h4>@ViewData["Title"] <a href="https://last.fm"><img src="/last-fm.png" class="lastfm-logo central" /></a></h4>
<h4>@ViewData["Title"] <a href="https://last.fm" target="_blank"><img src="/last-fm.png" class="lastfm-logo central" /></a></h4>
<partial name="_StatusMessage" model="Model.StatusMessage" />
<div class="row">
<div class="col-md-6">

View File

@ -5,7 +5,7 @@
ViewData["ActivePage"] = ManageNavPages.Spotify;
}
<h4>@ViewData["Title"] <a href="https://spotify.com"><img src="/Spotify_Icon_RGB_White.png" class="spotify-logo central" /></a></h4>
<h4>@ViewData["Title"] <a href="https://spotify.com" target="_blank"><img src="/Spotify_Icon_RGB_White.png" class="spotify-logo central" /></a></h4>
<partial name="_StatusMessage" model="Model.StatusMessage" />
<div class="row">

View File

@ -107,9 +107,9 @@ namespace Selector.Web
Console.WriteLine("> Adding cache event maps...");
services.AddTransient<IEventMapping, SpotifyLinkToCacheMapping>();
services.AddTransient<IEventMapping, LastfmToCacheMapping>();
services.AddTransient<IEventMapping, NowPlayingFromCacheMapping>();
services.AddTransient<IEventMapping, ToPubSub.SpotifyLink>();
services.AddTransient<IEventMapping, ToPubSub.Lastfm>();
services.AddTransient<IEventMapping, FromPubSub.NowPlaying>();
services.AddCacheHubProxy();

View File

@ -38,9 +38,9 @@ namespace Selector
protected readonly WebHookConfig Config;
protected event EventHandler PredicatePass;
protected event EventHandler SuccessfulRequest;
protected event EventHandler FailedRequest;
public event EventHandler PredicatePass;
public event EventHandler SuccessfulRequest;
public event EventHandler FailedRequest;
public CancellationToken CancelToken { get; set; }
@ -81,20 +81,31 @@ namespace Selector
{
if(Config.ShouldRequest(e))
{
Logger.LogDebug("[{name}] predicate passed, making request to [{url}]", Config.Name, Config.Url);
var response = await HttpClient.PostAsync(Config.Url, Config.Content, CancelToken);
OnPredicatePass(new EventArgs());
if (response.IsSuccessStatusCode)
try
{
Logger.LogDebug("[{name}] request success", Config.Name);
OnSuccessfulRequest(new EventArgs());
Logger.LogDebug("[{name}] predicate passed, making request to [{url}]", Config.Name, Config.Url);
OnPredicatePass(new EventArgs());
var response = await HttpClient.PostAsync(Config.Url, Config.Content, CancelToken);
if (response.IsSuccessStatusCode)
{
Logger.LogDebug("[{name}] request success", Config.Name);
OnSuccessfulRequest(new EventArgs());
}
else
{
Logger.LogDebug("[{name}] request failed [{error}] [{content}]", Config.Name, response.StatusCode, response.Content);
OnFailedRequest(new EventArgs());
}
}
else
catch(HttpRequestException ex)
{
Logger.LogDebug("[{name}] request failed [{error}] [{content}]", Config.Name, response.StatusCode, response.Content);
OnFailedRequest(new EventArgs());
Logger.LogError(ex, "Exception occured during request");
}
catch (TaskCanceledException)
{
Logger.LogDebug("Task cancelled");
}
}
else