adding cache consumers

This commit is contained in:
andy 2021-10-28 23:05:07 +01:00
parent 78fbc27e3a
commit b0467c3df9
13 changed files with 211 additions and 50 deletions

View File

@ -71,7 +71,7 @@ namespace Selector.CLI
enum Consumers enum Consumers
{ {
AudioFeatures AudioFeatures, CacheWriter, Publisher
} }
class DatabaseOptions { class DatabaseOptions {

View File

@ -39,7 +39,7 @@ namespace Selector.CLI
Console.WriteLine("> Adding Services..."); Console.WriteLine("> Adding Services...");
// SERVICES // SERVICES
services.AddSingleton<IWatcherFactory, WatcherFactory>(); services.AddSingleton<IWatcherFactory, WatcherFactory>();
services.AddSingleton<IConsumerFactory, AudioFeatureInjectorFactory>(); services.AddSingleton<IAudioFeatureInjectorFactory, AudioFeatureInjectorFactory>();
services.AddSingleton<IWatcherCollectionFactory, WatcherCollectionFactory>(); services.AddSingleton<IWatcherCollectionFactory, WatcherCollectionFactory>();
// For generating spotify clients // For generating spotify clients
//services.AddSingleton<IRefreshTokenFactoryProvider, RefreshTokenFactoryProvider>(); //services.AddSingleton<IRefreshTokenFactoryProvider, RefreshTokenFactoryProvider>();
@ -68,6 +68,7 @@ namespace Selector.CLI
var connMulti = ConnectionMultiplexer.Connect(config.RedisOptions.ConnectionString); var connMulti = ConnectionMultiplexer.Connect(config.RedisOptions.ConnectionString);
services.AddSingleton(connMulti); services.AddSingleton(connMulti);
services.AddTransient<IDatabaseAsync>(services => services.GetService<ConnectionMultiplexer>().GetDatabase()); services.AddTransient<IDatabaseAsync>(services => services.GetService<ConnectionMultiplexer>().GetDatabase());
services.AddTransient<ISubscriber>(services => services.GetService<ConnectionMultiplexer>().GetSubscriber());
} }
// EQUAL // EQUAL

View File

@ -10,6 +10,9 @@ using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
using Selector.Cache;
using StackExchange.Redis;
namespace Selector.CLI namespace Selector.CLI
{ {
class WatcherService : IHostedService class WatcherService : IHostedService
@ -23,6 +26,9 @@ namespace Selector.CLI
private readonly IWatcherCollectionFactory WatcherCollectionFactory; private readonly IWatcherCollectionFactory WatcherCollectionFactory;
private readonly IRefreshTokenFactoryProvider SpotifyFactory; private readonly IRefreshTokenFactoryProvider SpotifyFactory;
private readonly IDatabaseAsync Cache;
private readonly ISubscriber Subscriber;
private Dictionary<string, IWatcherCollection> Watchers { get; set; } = new(); private Dictionary<string, IWatcherCollection> Watchers { get; set; } = new();
public WatcherService( public WatcherService(
@ -30,7 +36,9 @@ namespace Selector.CLI
IWatcherCollectionFactory watcherCollectionFactory, IWatcherCollectionFactory watcherCollectionFactory,
IRefreshTokenFactoryProvider spotifyFactory, IRefreshTokenFactoryProvider spotifyFactory,
ILoggerFactory loggerFactory, ILoggerFactory loggerFactory,
IOptions<RootOptions> config IOptions<RootOptions> config,
IDatabaseAsync cache = null,
ISubscriber subscriber = null
) { ) {
Logger = loggerFactory.CreateLogger<WatcherService>(); Logger = loggerFactory.CreateLogger<WatcherService>();
LoggerFactory = loggerFactory; LoggerFactory = loggerFactory;
@ -38,6 +46,8 @@ namespace Selector.CLI
WatcherFactory = watcherFactory; WatcherFactory = watcherFactory;
WatcherCollectionFactory = watcherCollectionFactory; WatcherCollectionFactory = watcherCollectionFactory;
SpotifyFactory = spotifyFactory; SpotifyFactory = spotifyFactory;
Cache = cache;
Subscriber = subscriber;
SpotifyFactory.Initialise(Config.ClientId, Config.ClientSecret); SpotifyFactory.Initialise(Config.ClientId, Config.ClientSecret);
} }
@ -100,8 +110,18 @@ namespace Selector.CLI
switch(consumer) switch(consumer)
{ {
case Consumers.AudioFeatures: case Consumers.AudioFeatures:
var factory = new AudioFeatureInjectorFactory(LoggerFactory); var featureInjector = new AudioFeatureInjectorFactory(LoggerFactory);
consumers.Add(await factory.Get(spotifyFactory)); consumers.Add(await featureInjector.Get(spotifyFactory));
break;
case Consumers.CacheWriter:
var cacheWriter = new CacheWriterFactory(Cache, LoggerFactory);
consumers.Add(await cacheWriter.Get());
break;
case Consumers.Publisher:
var pub = new PublisherFactory(Subscriber, LoggerFactory);
consumers.Add(await pub.Get());
break; break;
} }
} }

View File

@ -9,12 +9,12 @@
"name": "Player Watcher", "name": "Player Watcher",
"type": "player", "type": "player",
"pollperiod": 2000, "pollperiod": 2000,
"consumers": [ "audiofeatures" ] "consumers": [ "audiofeatures", "cachewriter" ]
} }
] ]
}, },
"Database": { "Database": {
"enabled": true "enabled": false
}, },
"Redis": { "Redis": {
"enabled": true "enabled": true

View File

@ -1,28 +1,32 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Text; using System.Text.Json;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Extensions.Logging.Abstractions;
using SpotifyAPI.Web; using SpotifyAPI.Web;
using StackExchange.Redis;
namespace Selector.Cache namespace Selector.Cache
{ {
public class CacheUpdater : IConsumer public class CacheWriter : IConsumer
{ {
private readonly IPlayerWatcher Watcher; private readonly IPlayerWatcher Watcher;
private readonly ILogger<CacheUpdater> Logger; private readonly IDatabaseAsync Db;
private readonly ILogger<CacheWriter> Logger;
public CancellationToken CancelToken { get; set; } public CancellationToken CancelToken { get; set; }
public CacheUpdater( public CacheWriter(
IPlayerWatcher watcher, IPlayerWatcher watcher,
ILogger<CacheUpdater> logger = null, IDatabaseAsync db,
ILogger<CacheWriter> logger = null,
CancellationToken token = default CancellationToken token = default
){ ){
Watcher = watcher; Watcher = watcher;
Logger = logger ?? NullLogger<CacheUpdater>.Instance; Db = db;
Logger = logger ?? NullLogger<CacheWriter>.Instance;
CancelToken = token; CancelToken = token;
} }
@ -35,18 +39,8 @@ namespace Selector.Cache
public async Task AsyncCallback(ListeningChangeEventArgs e) public async Task AsyncCallback(ListeningChangeEventArgs e)
{ {
if (e.Current.Item is FullTrack track) var payload = JsonSerializer.Serialize(e);
{ await Db.StringSetAsync(Key.CurrentlyPlaying(e.Username), payload);
}
else if (e.Current.Item is FullEpisode episode)
{
}
else
{
Logger.LogError($"Unknown item pulled from API [{e.Current.Item}]");
}
} }
public void Subscribe(IWatcher watch = null) public void Subscribe(IWatcher watch = null)

View File

@ -0,0 +1,37 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using StackExchange.Redis;
namespace Selector.Cache
{
public interface ICacheWriterFactory {
public Task<IConsumer> Get(IPlayerWatcher watcher = null);
}
public class CacheWriterFactory: ICacheWriterFactory {
private readonly ILoggerFactory LoggerFactory;
private readonly IDatabaseAsync Cache;
public CacheWriterFactory(
IDatabaseAsync cache,
ILoggerFactory loggerFactory
) {
Cache = cache;
LoggerFactory = loggerFactory;
}
public async Task<IConsumer> Get(IPlayerWatcher watcher = null)
{
return new CacheWriter(
watcher,
Cache,
LoggerFactory.CreateLogger<CacheWriter>()
);
}
}
}

View File

@ -0,0 +1,37 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using StackExchange.Redis;
namespace Selector.Cache
{
public interface IPublisherFactory {
public Task<IConsumer> Get(IPlayerWatcher watcher = null);
}
public class PublisherFactory: IPublisherFactory {
private readonly ILoggerFactory LoggerFactory;
private readonly ISubscriber Subscriber;
public PublisherFactory(
ISubscriber subscriber,
ILoggerFactory loggerFactory
) {
Subscriber = subscriber;
LoggerFactory = loggerFactory;
}
public async Task<IConsumer> Get(IPlayerWatcher watcher = null)
{
return new Publisher(
watcher,
Subscriber,
LoggerFactory.CreateLogger<Publisher>()
);
}
}
}

View File

@ -0,0 +1,74 @@
using System;
using System.Collections.Generic;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using SpotifyAPI.Web;
using StackExchange.Redis;
namespace Selector.Cache
{
public class Publisher : IConsumer
{
private readonly IPlayerWatcher Watcher;
private readonly ISubscriber Subscriber;
private readonly ILogger<Publisher> Logger;
public CancellationToken CancelToken { get; set; }
public Publisher(
IPlayerWatcher watcher,
ISubscriber subscriber,
ILogger<Publisher> logger = null,
CancellationToken token = default
){
Watcher = watcher;
Subscriber = subscriber;
Logger = logger ?? NullLogger<Publisher>.Instance;
CancelToken = token;
}
public void Callback(object sender, ListeningChangeEventArgs e)
{
if (e.Current is null) return;
Task.Run(() => { return AsyncCallback(e); }, CancelToken);
}
public async Task AsyncCallback(ListeningChangeEventArgs e)
{
var payload = JsonSerializer.Serialize(e);
await Subscriber.PublishAsync(Key.CurrentlyPlaying(e.Username), payload);
}
public void Subscribe(IWatcher watch = null)
{
var watcher = watch ?? Watcher ?? throw new ArgumentNullException("No watcher provided");
if (watcher is IPlayerWatcher watcherCast)
{
watcherCast.ItemChange += Callback;
}
else
{
throw new ArgumentException("Provided watcher is not a PlayerWatcher");
}
}
public void Unsubscribe(IWatcher watch = null)
{
var watcher = watch ?? Watcher ?? throw new ArgumentNullException("No watcher provided");
if (watcher is IPlayerWatcher watcherCast)
{
watcherCast.ItemChange -= Callback;
}
else
{
throw new ArgumentException("Provided watcher is not a PlayerWatcher");
}
}
}
}

View File

@ -110,6 +110,7 @@ namespace Selector.Web
var connMulti = ConnectionMultiplexer.Connect(config.RedisOptions.ConnectionString); var connMulti = ConnectionMultiplexer.Connect(config.RedisOptions.ConnectionString);
services.AddSingleton(connMulti); services.AddSingleton(connMulti);
services.AddTransient<IDatabaseAsync>(services => services.GetService<ConnectionMultiplexer>().GetDatabase()); services.AddTransient<IDatabaseAsync>(services => services.GetService<ConnectionMultiplexer>().GetDatabase());
services.AddTransient<ISubscriber>(services => services.GetService<ConnectionMultiplexer>().GetSubscriber());
} }
} }

View File

@ -8,7 +8,12 @@ using SpotifyAPI.Web;
namespace Selector namespace Selector
{ {
public class AudioFeatureInjectorFactory: IConsumerFactory { public interface IAudioFeatureInjectorFactory
{
public Task<IConsumer> Get(ISpotifyConfigFactory spotifyFactory, IPlayerWatcher watcher);
}
public class AudioFeatureInjectorFactory: IAudioFeatureInjectorFactory {
private readonly ILoggerFactory LoggerFactory; private readonly ILoggerFactory LoggerFactory;

View File

@ -1,10 +0,0 @@
using System;
using System.Threading.Tasks;
namespace Selector
{
public interface IConsumerFactory
{
public Task<IConsumer> Get(ISpotifyConfigFactory spotifyFactory, IPlayerWatcher watcher);
}
}

View File

@ -6,13 +6,15 @@ namespace Selector
public class ListeningChangeEventArgs: EventArgs { public class ListeningChangeEventArgs: EventArgs {
public CurrentlyPlayingContext Previous; public CurrentlyPlayingContext Previous;
public CurrentlyPlayingContext Current; public CurrentlyPlayingContext Current;
public string Username;
public static ListeningChangeEventArgs From(CurrentlyPlayingContext previous, CurrentlyPlayingContext current) public static ListeningChangeEventArgs From(CurrentlyPlayingContext previous, CurrentlyPlayingContext current, string username = null)
{ {
return new ListeningChangeEventArgs() return new ListeningChangeEventArgs()
{ {
Previous = previous, Previous = previous,
Current = current Current = current,
Username = username
}; };
} }
} }

View File

@ -74,14 +74,14 @@ namespace Selector
&& (Live.Item is FullTrack || Live.Item is FullEpisode)) && (Live.Item is FullTrack || Live.Item is FullEpisode))
{ {
Logger.LogDebug($"Playback started: {Live.DisplayString()}"); Logger.LogDebug($"Playback started: {Live.DisplayString()}");
OnPlayingChange(ListeningChangeEventArgs.From(previous, Live)); OnPlayingChange(ListeningChangeEventArgs.From(previous, Live, Username));
} }
// STOPPED PLAYBACK // STOPPED PLAYBACK
else if((previous.Item is FullTrack || previous.Item is FullEpisode) else if((previous.Item is FullTrack || previous.Item is FullEpisode)
&& Live is null) && Live is null)
{ {
Logger.LogDebug($"Playback stopped: {previous.DisplayString()}"); Logger.LogDebug($"Playback stopped: {previous.DisplayString()}");
OnPlayingChange(ListeningChangeEventArgs.From(previous, Live)); OnPlayingChange(ListeningChangeEventArgs.From(previous, Live, Username));
} }
// CONTINUING PLAYBACK // CONTINUING PLAYBACK
else { else {
@ -92,17 +92,17 @@ namespace Selector
{ {
if(!eq.IsEqual(previousTrack, currentTrack)) { if(!eq.IsEqual(previousTrack, currentTrack)) {
Logger.LogDebug($"Track changed: {previousTrack.DisplayString()} -> {currentTrack.DisplayString()}"); Logger.LogDebug($"Track changed: {previousTrack.DisplayString()} -> {currentTrack.DisplayString()}");
OnItemChange(ListeningChangeEventArgs.From(previous, Live)); OnItemChange(ListeningChangeEventArgs.From(previous, Live, Username));
} }
if(!eq.IsEqual(previousTrack.Album, currentTrack.Album)) { if(!eq.IsEqual(previousTrack.Album, currentTrack.Album)) {
Logger.LogDebug($"Album changed: {previousTrack.Album.DisplayString()} -> {currentTrack.Album.DisplayString()}"); Logger.LogDebug($"Album changed: {previousTrack.Album.DisplayString()} -> {currentTrack.Album.DisplayString()}");
OnAlbumChange(ListeningChangeEventArgs.From(previous, Live)); OnAlbumChange(ListeningChangeEventArgs.From(previous, Live, Username));
} }
if(!eq.IsEqual(previousTrack.Artists[0], currentTrack.Artists[0])) { if(!eq.IsEqual(previousTrack.Artists[0], currentTrack.Artists[0])) {
Logger.LogDebug($"Artist changed: {previousTrack.Artists.DisplayString()} -> {currentTrack.Artists.DisplayString()}"); Logger.LogDebug($"Artist changed: {previousTrack.Artists.DisplayString()} -> {currentTrack.Artists.DisplayString()}");
OnArtistChange(ListeningChangeEventArgs.From(previous, Live)); OnArtistChange(ListeningChangeEventArgs.From(previous, Live, Username));
} }
} }
// CHANGED CONTENT // CHANGED CONTENT
@ -110,8 +110,8 @@ namespace Selector
|| (previous.Item is FullEpisode && Live.Item is FullTrack)) || (previous.Item is FullEpisode && Live.Item is FullTrack))
{ {
Logger.LogDebug($"Media type changed: {previous.Item}, {previous.Item}"); Logger.LogDebug($"Media type changed: {previous.Item}, {previous.Item}");
OnContentChange(ListeningChangeEventArgs.From(previous, Live)); OnContentChange(ListeningChangeEventArgs.From(previous, Live, Username));
OnItemChange(ListeningChangeEventArgs.From(previous, Live)); OnItemChange(ListeningChangeEventArgs.From(previous, Live, Username));
} }
// PODCASTS // PODCASTS
else if(previous.Item is FullEpisode previousEp else if(previous.Item is FullEpisode previousEp
@ -119,7 +119,7 @@ namespace Selector
{ {
if(!eq.IsEqual(previousEp, currentEp)) { if(!eq.IsEqual(previousEp, currentEp)) {
Logger.LogDebug($"Podcast changed: {previousEp.DisplayString()} -> {currentEp.DisplayString()}"); Logger.LogDebug($"Podcast changed: {previousEp.DisplayString()} -> {currentEp.DisplayString()}");
OnItemChange(ListeningChangeEventArgs.From(previous, Live)); OnItemChange(ListeningChangeEventArgs.From(previous, Live, Username));
} }
} }
else { else {
@ -129,25 +129,25 @@ namespace Selector
// CONTEXT // CONTEXT
if(!eq.IsEqual(previous.Context, Live.Context)) { if(!eq.IsEqual(previous.Context, Live.Context)) {
Logger.LogDebug($"Context changed: {previous.Context.DisplayString()} -> {Live.Context.DisplayString()}"); Logger.LogDebug($"Context changed: {previous.Context.DisplayString()} -> {Live.Context.DisplayString()}");
OnContextChange(ListeningChangeEventArgs.From(previous, Live)); OnContextChange(ListeningChangeEventArgs.From(previous, Live, Username));
} }
// DEVICE // DEVICE
if(!eq.IsEqual(previous?.Device, Live?.Device)) { if(!eq.IsEqual(previous?.Device, Live?.Device)) {
Logger.LogDebug($"Device changed: {previous?.Device.DisplayString()} -> {Live?.Device.DisplayString()}"); Logger.LogDebug($"Device changed: {previous?.Device.DisplayString()} -> {Live?.Device.DisplayString()}");
OnDeviceChange(ListeningChangeEventArgs.From(previous, Live)); OnDeviceChange(ListeningChangeEventArgs.From(previous, Live, Username));
} }
// IS PLAYING // IS PLAYING
if(previous.IsPlaying != Live.IsPlaying) { if(previous.IsPlaying != Live.IsPlaying) {
Logger.LogDebug($"Playing state changed: {previous.IsPlaying} -> {Live.IsPlaying}"); Logger.LogDebug($"Playing state changed: {previous.IsPlaying} -> {Live.IsPlaying}");
OnPlayingChange(ListeningChangeEventArgs.From(previous, Live)); OnPlayingChange(ListeningChangeEventArgs.From(previous, Live, Username));
} }
// VOLUME // VOLUME
if(previous.Device.VolumePercent != Live.Device.VolumePercent) { if(previous.Device.VolumePercent != Live.Device.VolumePercent) {
Logger.LogDebug($"Volume changed: {previous.Device.VolumePercent}% -> {Live.Device.VolumePercent}%"); Logger.LogDebug($"Volume changed: {previous.Device.VolumePercent}% -> {Live.Device.VolumePercent}%");
OnVolumeChange(ListeningChangeEventArgs.From(previous, Live)); OnVolumeChange(ListeningChangeEventArgs.From(previous, Live, Username));
} }
} }
} }