added event bus, proxying cache events to event bus before proxying those to signalr

This commit is contained in:
andy 2021-12-19 13:44:22 +00:00
parent 3dbe975220
commit f79c7111fe
20 changed files with 305 additions and 223 deletions

View File

@ -13,6 +13,7 @@ using Selector.Cache;
using Selector.Cache.Extensions;
using IF.Lastfm.Core.Api;
using Selector.Model.Extensions;
namespace Selector.CLI
{
@ -101,6 +102,7 @@ namespace Selector.CLI
Console.WriteLine("> Configuring...");
// CONFIG
var config = ConfigureOptions(context, services);
services.AddHttpClient();
Console.WriteLine("> Adding Services...");
// SERVICES

View File

@ -56,9 +56,9 @@ namespace Selector.Cache
var tasks = new Task[]
{
Db.StringSetAsync(Key.TrackPlayCount(track.Name, track.Artists[0].Name), e.Track, expiry: CacheExpiry),
Db.StringSetAsync(Key.AlbumPlayCount(track.Album.Name, track.Album.Artists[0].Name), e.Album, expiry: CacheExpiry),
Db.StringSetAsync(Key.ArtistPlayCount(track.Artists[0].Name), e.Artist, expiry: CacheExpiry),
Db.StringSetAsync(Key.TrackPlayCount(e.Username, track.Name, track.Artists[0].Name), e.Track, expiry: CacheExpiry),
Db.StringSetAsync(Key.AlbumPlayCount(e.Username, track.Album.Name, track.Album.Artists[0].Name), e.Album, expiry: CacheExpiry),
Db.StringSetAsync(Key.ArtistPlayCount(e.Username, track.Artists[0].Name), e.Artist, expiry: CacheExpiry),
Db.StringSetAsync(Key.UserPlayCount(e.Username), e.User, expiry: CacheExpiry),
};

View File

@ -1,40 +1,72 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace Selector.Cache
{
public class Key
{
public const string CurrentlyPlayingName = "CurrentlyPlaying";
public const char MajorSep = ':';
public const char MinorSep = '.';
public const string TrackName = "Track";
public const string AlbumName = "Album";
public const string ArtistName = "Artist";
public const string UserName = "User";
public const string All = "*";
public const string CurrentlyPlayingName = "CURRENTLY_PLAYING";
public const string AudioFeatureName = "AudioFeature";
public const string PlayCountName = "PlayCount";
public const string TrackName = "TRACK";
public const string AlbumName = "ALBUM";
public const string ArtistName = "ARTIST";
public const string UserName = "USER";
public const string WorkerName = "Worker";
public const string WatcherName = "Watcher";
public const string ReservedName = "Reserved";
public const string AudioFeatureName = "AUDIO_FEATURE";
public const string PlayCountName = "PLAY_COUNT";
public const string SpotifyName = "SPOTIFY";
public const string LastfmName = "LASTFM";
public const string WatcherName = "WATCHER";
/// <summary>
/// Current playback for a user
/// </summary>
/// <param name="user">User's database Id (Guid)</param>
/// <returns></returns>
public static string CurrentlyPlaying(string user) => Namespace(user, CurrentlyPlayingName);
public static string AudioFeature(string trackId) => Namespace(TrackName, trackId, AudioFeatureName);
public static string CurrentlyPlaying(string user) => MajorNamespace(MinorNamespace(UserName, CurrentlyPlayingName), user);
public static readonly string AllCurrentlyPlaying = CurrentlyPlaying(All);
public static string TrackPlayCount(string name, string artist) => Namespace(TrackName, artist, name, PlayCountName);
public static string AlbumPlayCount(string name, string artist) => Namespace(AlbumName, artist, name, PlayCountName);
public static string ArtistPlayCount(string name) => Namespace(ArtistName, name, PlayCountName);
public static string UserPlayCount(string username) => Namespace(UserName, username, PlayCountName);
public static string AudioFeature(string trackId) => MajorNamespace(MinorNamespace(TrackName, AudioFeatureName), trackId);
public static readonly string AllAudioFeatures = AudioFeature(All);
public static string WatcherReserved(int id) => Namespace(WatcherName, id.ToString(), ReservedName);
public static string TrackPlayCount(string username, string name, string artist) => MajorNamespace(MinorNamespace(TrackName, PlayCountName), artist, name, username);
public static string AlbumPlayCount(string username, string name, string artist) => MajorNamespace(MinorNamespace(AlbumName, PlayCountName), artist, name, username);
public static string ArtistPlayCount(string username, string name) => MajorNamespace(MinorNamespace(ArtistName, PlayCountName), name, username);
public static string UserPlayCount(string username) => MajorNamespace(MinorNamespace(UserName, PlayCountName), username);
public static string Namespace(params string[] args) => string.Join(":", args);
public static string UserSpotify(string username) => MajorNamespace(MinorNamespace(UserName, SpotifyName), username);
public static readonly string AllUserSpotify = UserSpotify(All);
public static string UserLastfm(string username) => MajorNamespace(MinorNamespace(UserName, LastfmName), username);
public static readonly string AllUserLastfm = UserLastfm(All);
public static string Watcher(int id) => MajorNamespace(WatcherName, id.ToString());
public static readonly string AllWatcher = MajorNamespace(WatcherName, All);
public static string MajorNamespace(params string[] args) => Namespace(MajorSep, args);
public static string MinorNamespace(params string[] args) => Namespace(MinorSep, args);
public static string Namespace(char separator, params string[] args) => string.Join(separator, args);
public static string[] UnMajorNamespace(string arg) => UnNamespace(arg, MajorSep);
public static string[] UnMinorNamespace(string arg) => UnNamespace(arg, MinorSep);
public static string[] UnNamespace(string key, params char[] args) => key.Split(args);
public static string Param(string key) => UnMajorNamespace(key).Skip(1).First();
public static (string, string) ParamPair(string key) {
var split = UnMajorNamespace(key);
return (split[1], split[2]);
}
public static (string, string, string) ParamTriplet(string key)
{
var split = UnMajorNamespace(key);
return (split[1], split[2], split[3]);
}
}
}

View File

@ -45,9 +45,9 @@ namespace Selector.Cache
{
if (string.IsNullOrWhiteSpace(username)) throw new ArgumentNullException("No username provided");
var trackCache = Cache?.StringGetAsync(Key.TrackPlayCount(track, artist));
var albumCache = Cache?.StringGetAsync(Key.AlbumPlayCount(album, albumArtist));
var artistCache = Cache?.StringGetAsync(Key.ArtistPlayCount(artist));
var trackCache = Cache?.StringGetAsync(Key.TrackPlayCount(username, track, artist));
var albumCache = Cache?.StringGetAsync(Key.AlbumPlayCount(username, album, albumArtist));
var artistCache = Cache?.StringGetAsync(Key.ArtistPlayCount(username, artist));
var userCache = Cache?.StringGetAsync(Key.UserPlayCount(username));
var cacheTasks = new Task[] { trackCache, albumCache, artistCache, userCache };

View File

@ -0,0 +1,50 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Selector.Events;
namespace Selector.Model.Events
{
public class UserEventBus: IEventBus
{
private readonly ILogger<UserEventBus> Logger;
public event EventHandler<ApplicationUser> UserChange;
public event EventHandler<ApplicationUser> SpotifyLinkChange;
public event EventHandler<ApplicationUser> LastfmCredChange;
public event EventHandler<(string, CurrentlyPlayingDTO)> CurrentlyPlaying;
public UserEventBus(ILogger<UserEventBus> logger)
{
Logger = logger;
}
public void OnUserChange(object sender, ApplicationUser args)
{
Logger.LogTrace("Firing user event [{usernamne}]", args?.UserName);
UserChange?.Invoke(sender, args);
}
public void OnSpotifyLinkChange(object sender, ApplicationUser args)
{
Logger.LogTrace("Firing user Spotify event [{usernamne}]", args?.UserName);
SpotifyLinkChange?.Invoke(sender, args);
}
public void OnLastfmCredChange(object sender, ApplicationUser args)
{
Logger.LogTrace("Firing user Last.fm event [{usernamne}]", args?.UserName);
LastfmCredChange?.Invoke(sender, args);
}
public void OnCurrentlyPlayingChange(object sender, string userId, CurrentlyPlayingDTO args)
{
Logger.LogTrace("Firing currently playing event [{usernamne}/{userId}]", args?.Username, userId);
CurrentlyPlaying?.Invoke(sender, (userId, args));
}
}
}

View File

@ -1,17 +1,20 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Authorization;
using Microsoft.Extensions.DependencyInjection;
using Selector.Events;
using Selector.Model.Authorisation;
using Selector.Model.Events;
namespace Selector.Model.Extensions
{
public static class ServiceExtensions
{
public static void AddModelEventBus(this IServiceCollection services)
{
services.AddSingleton<UserEventBus>();
services.AddSingleton<IEventBus, UserEventBus>(sp => sp.GetService<UserEventBus>());
}
public static void AddAuthorisationHandlers(this IServiceCollection services)
{
services.AddAuthorization(options =>

View File

@ -1,5 +1,6 @@
using Microsoft.Extensions.DependencyInjection;
using Selector.Web.Service;
using Selector.Web.Hubs;
namespace Selector.Web.Extensions
{
@ -7,11 +8,14 @@ namespace Selector.Web.Extensions
{
public static void AddCacheHubProxy(this IServiceCollection services)
{
services.AddSingleton<CacheHubProxy>();
services.AddHostedService<CacheHubProxyService>();
services.AddScoped<EventHubProxy>();
services.AddHostedService<CacheEventProxyService>();
services.AddTransient<INowPlayingMappingFactory, NowPlayingMappingFactory>();
services.AddScoped<IUserMapping, NowPlayingUserMapping>();
services.AddTransient<ICacheEventMapping, NowPlayingCacheMapping>();
services.AddTransient<NowPlayingCacheMapping>();
services.AddScoped<IEventHubMapping<NowPlayingHub, INowPlayingHubClient>, NowPlayingHubMapping>();
services.AddScoped<NowPlayingHubMapping>();
}
}
}

View File

@ -8,33 +8,38 @@ using Microsoft.Extensions.Logging;
namespace Selector.Web.Service
{
public class CacheHubProxyService: IHostedService
public class CacheEventProxyService: IHostedService
{
private readonly ILogger<CacheHubProxyService> Logger;
private readonly CacheHubProxy Proxy;
private readonly ILogger<CacheEventProxyService> Logger;
private readonly IServiceScopeFactory ScopeFactory;
public CacheHubProxyService(
ILogger<CacheHubProxyService> logger,
CacheHubProxy proxy,
private readonly IEnumerable<ICacheEventMapping> CacheEvents;
public CacheEventProxyService(
ILogger<CacheEventProxyService> logger,
IEnumerable<ICacheEventMapping> mappings,
IServiceScopeFactory scopeFactory
)
{
Logger = logger;
Proxy = proxy;
ScopeFactory = scopeFactory;
CacheEvents = mappings;
}
public Task StartAsync(CancellationToken cancellationToken)
{
Logger.LogInformation("Starting cache hub proxy");
Logger.LogInformation("Starting cache event proxy");
using(var scope = ScopeFactory.CreateScope())
{
foreach(var mapping in scope.ServiceProvider.GetServices<IUserMapping>())
{
mapping.FormAll();
}
foreach (var mapping in CacheEvents)
{
mapping.ConstructMapping();
}
using (var scope = ScopeFactory.CreateScope())
{
var hubProxy = scope.ServiceProvider.GetRequiredService<EventHubProxy>();
hubProxy.FormMappings();
}
return Task.CompletedTask;

View File

@ -1,38 +0,0 @@
using System;
using System.Linq;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.AspNetCore.SignalR;
using StackExchange.Redis;
using Selector.Web.Hubs;
namespace Selector.Web.Service
{
public class CacheHubProxy
{
private readonly ILogger<CacheHubProxy> Logger;
private readonly ISubscriber Subscriber;
private readonly IServiceProvider Services;
public CacheHubProxy(ILogger<CacheHubProxy> logger,
ISubscriber subscriber,
IServiceProvider services
)
{
Logger = logger;
Subscriber = subscriber;
Services = services;
}
public void FormMapping<THub, T>(ICacheHubMapping<THub, T> mapping) where THub: Hub<T> where T: class
{
var context = Services.GetService<IHubContext<THub, T>>();
mapping.ConstructMapping(Subscriber, context);
}
}
}

View File

@ -0,0 +1,13 @@
using System;
using System.Threading.Tasks;
using Microsoft.AspNetCore.SignalR;
using StackExchange.Redis;
namespace Selector.Web.Service
{
public interface ICacheEventMapping
{
public Task ConstructMapping();
}
}

View File

@ -0,0 +1,52 @@
using System;
using System.Text.Json;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore.SignalR;
using Microsoft.Extensions.Logging;
using StackExchange.Redis;
using Selector.Web.Hubs;
using Selector.Cache;
using Selector.Model.Events;
namespace Selector.Web.Service
{
public class NowPlayingCacheMapping : ICacheEventMapping
{
private readonly ILogger<NowPlayingCacheMapping> Logger;
private readonly ISubscriber Subscriber;
private readonly UserEventBus UserEvent;
public NowPlayingCacheMapping(ILogger<NowPlayingCacheMapping> logger,
ISubscriber subscriber,
UserEventBus userEvent)
{
Logger = logger;
Subscriber = subscriber;
UserEvent = userEvent;
}
public async Task ConstructMapping()
{
Logger.LogDebug("Forming now playing event mapping between cache and event bus");
(await Subscriber.SubscribeAsync(Key.AllCurrentlyPlaying)).OnMessage(message => {
try{
var userId = Key.Param(message.Channel);
var deserialised = JsonSerializer.Deserialize<CurrentlyPlayingDTO>(message.Message);
Logger.LogDebug("Received new currently playing [{username}]", deserialised.Username);
UserEvent.OnCurrentlyPlayingChange(this, userId, deserialised);
}
catch(Exception e)
{
Logger.LogError(e, $"Error parsing new currently playing [{message}]");
}
});
}
}
}

View File

@ -0,0 +1,26 @@
using Microsoft.Extensions.Logging;
namespace Selector.Web.Service
{
public class EventHubProxy
{
private readonly ILogger<EventHubProxy> Logger;
private readonly NowPlayingHubMapping NowPlayingMapping;
public EventHubProxy(ILogger<EventHubProxy> logger,
NowPlayingHubMapping nowPlayingMapping
)
{
Logger = logger;
NowPlayingMapping = nowPlayingMapping;
}
public void FormMappings()
{
Logger.LogDebug("Forming event mappings between event bus and SignalR hubs");
NowPlayingMapping.ConstructMapping();
}
}
}

View File

@ -0,0 +1,15 @@
using System.Threading.Tasks;
using Microsoft.AspNetCore.SignalR;
using StackExchange.Redis;
namespace Selector.Web.Service
{
public interface IEventHubMapping<THub, T>
where THub : Hub<T>
where T : class
{
public Task ConstructMapping();
// public Task RemoveMapping();
}
}

View File

@ -0,0 +1,40 @@
using System.Threading.Tasks;
using Microsoft.AspNetCore.SignalR;
using Microsoft.Extensions.Logging;
using Selector.Web.Hubs;
using Selector.Model.Events;
namespace Selector.Web.Service
{
public class NowPlayingHubMapping: IEventHubMapping<NowPlayingHub, INowPlayingHubClient>
{
private readonly ILogger<NowPlayingHubMapping> Logger;
private readonly UserEventBus UserEvent;
private readonly IHubContext<NowPlayingHub, INowPlayingHubClient> Hub;
public NowPlayingHubMapping(ILogger<NowPlayingHubMapping> logger,
UserEventBus userEvent,
IHubContext<NowPlayingHub, INowPlayingHubClient> hub)
{
Logger = logger;
UserEvent = userEvent;
Hub = hub;
}
public Task ConstructMapping()
{
Logger.LogDebug("Forming now playing event mapping between event bus and SignalR hub");
UserEvent.CurrentlyPlaying += async (o, args) =>
{
(string id, CurrentlyPlayingDTO e) = args;
Logger.LogDebug("Passing now playing event to SignalR hub [{userId}]", id);
await Hub.Clients.User(id).OnNewPlaying(e);
};
return Task.CompletedTask;
}
}
}

View File

@ -1,16 +0,0 @@
using System;
using System.Threading.Tasks;
using Microsoft.AspNetCore.SignalR;
using StackExchange.Redis;
namespace Selector.Web.Service
{
public interface ICacheHubMapping<THub, T>
where THub : Hub<T>
where T : class
{
public Task ConstructMapping(ISubscriber subscriber, IHubContext<THub, T> hub);
// public Task RemoveMapping(ISubscriber subscriber, THub hub);
}
}

View File

@ -1,46 +0,0 @@
using System;
using System.Text.Json;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore.SignalR;
using Microsoft.Extensions.Logging;
using StackExchange.Redis;
using Selector.Web.Hubs;
using Selector.Cache;
namespace Selector.Web.Service
{
public class NowPlayingMapping : ICacheHubMapping<NowPlayingHub, INowPlayingHubClient>
{
private readonly ILogger<NowPlayingMapping> Logger;
private readonly string UserId;
private readonly string Username;
public NowPlayingMapping(ILogger<NowPlayingMapping> logger, string userId, string username)
{
Logger = logger;
UserId = userId;
Username = username;
}
public async Task ConstructMapping(ISubscriber subscriber, IHubContext<NowPlayingHub, INowPlayingHubClient> hub)
{
var key = Key.CurrentlyPlaying(UserId);
(await subscriber.SubscribeAsync(key)).OnMessage(async message => {
try{
var trimmedMessage = message.ToString().Substring(key.Length + 1);
var deserialised = JsonSerializer.Deserialize<CurrentlyPlayingDTO>(trimmedMessage);
Logger.LogDebug($"Received new currently playing [{deserialised.Username}] [{deserialised.Username}]");
await hub.Clients.User(UserId).OnNewPlaying(deserialised);
}
catch(Exception e)
{
Logger.LogError(e, $"Error parsing new currently playing [{message}]");
}
});
}
}
}

View File

@ -1,38 +0,0 @@
using System;
using Microsoft.AspNetCore.SignalR;
using Microsoft.Extensions.Logging;
using Selector.Web.Hubs;
namespace Selector.Web.Service
{
public interface IUserMappingFactory<out TMap, THub, T>
where TMap : ICacheHubMapping<THub, T>
where THub : Hub<T>
where T : class
{
public TMap Get(string userId, string username);
}
public interface INowPlayingMappingFactory: IUserMappingFactory<NowPlayingMapping, NowPlayingHub, INowPlayingHubClient>
{ }
public class NowPlayingMappingFactory : INowPlayingMappingFactory {
private readonly ILoggerFactory LoggerFactory;
public NowPlayingMappingFactory(ILoggerFactory loggerFactory)
{
LoggerFactory = loggerFactory;
}
public NowPlayingMapping Get(string userId, string username)
{
return new NowPlayingMapping(
LoggerFactory?.CreateLogger<NowPlayingMapping>(),
userId,
username
);
}
}
}

View File

@ -1,37 +0,0 @@
using System;
using System.Linq;
using Selector.Model;
namespace Selector.Web.Service
{
public interface IUserMapping {
public void FormAll();
}
public class NowPlayingUserMapping: IUserMapping
{
private readonly ApplicationDbContext Db;
private readonly CacheHubProxy Proxy;
private readonly INowPlayingMappingFactory NowPlayingMappingFactory;
public NowPlayingUserMapping(
ApplicationDbContext db,
CacheHubProxy proxy,
INowPlayingMappingFactory nowPlayingMappingFactory
)
{
Db = db;
Proxy = proxy;
NowPlayingMappingFactory = nowPlayingMappingFactory;
}
public void FormAll()
{
foreach(var user in Db.Users)
{
Proxy.FormMapping(NowPlayingMappingFactory.Get(user.Id, user.UserName));
}
}
}
}

View File

@ -48,6 +48,7 @@ namespace Selector.Web
services.AddRazorPages().AddRazorRuntimeCompilation();
services.AddControllers();
services.AddSignalR(o => o.EnableDetailedErrors = true);
services.AddHttpClient();
services.AddDbContext<ApplicationDbContext>(options =>
options.UseNpgsql(Configuration.GetConnectionString("Default"))
@ -92,6 +93,7 @@ namespace Selector.Web
});
services.AddAuthorisationHandlers();
services.AddModelEventBus();
if (config.RedisOptions.Enabled)
services.AddRedisServices(config.RedisOptions.ConnectionString);

View File

@ -0,0 +1,13 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Selector.Events
{
public interface IEventBus
{
}
}