From 4e2bde4784acb659d21b5667c2d64641cbd23f4a Mon Sep 17 00:00:00 2001 From: JT Date: Sat, 13 Jan 2024 22:00:24 +0800 Subject: [PATCH] Minor feature tweaks to coordinators --- .../Events/Daemon/HotColdCoordinator.cs | 79 ++++++++----------- src/Marten/Events/Daemon/INodeCoordinator.cs | 9 ++- src/Marten/Events/Daemon/IProjectionDaemon.cs | 5 ++ src/Marten/Events/Daemon/SoloCoordinator.cs | 8 +- 4 files changed, 49 insertions(+), 52 deletions(-) diff --git a/src/Marten/Events/Daemon/HotColdCoordinator.cs b/src/Marten/Events/Daemon/HotColdCoordinator.cs index 508eb20391..448d0eff77 100644 --- a/src/Marten/Events/Daemon/HotColdCoordinator.cs +++ b/src/Marten/Events/Daemon/HotColdCoordinator.cs @@ -1,9 +1,9 @@ +#nullable enable using System; using System.Data; using System.Data.Common; using System.Threading; using System.Threading.Tasks; -using System.Timers; using JasperFx.Core; using Marten.Services; using Marten.Storage; @@ -11,7 +11,6 @@ using Npgsql; using Weasel.Core; using Weasel.Postgresql; -using Timer = System.Timers.Timer; namespace Marten.Events.Daemon; @@ -19,15 +18,14 @@ namespace Marten.Events.Daemon; /// Coordinate the async daemon in the case of hot/cold failover /// where only one node at a time should be running the async daemon /// -internal class HotColdCoordinator: INodeCoordinator, ISingleQueryRunner, IDisposable +internal sealed class HotColdCoordinator: INodeCoordinator, ISingleQueryRunner { private readonly CancellationTokenSource _cancellation = new(); private readonly IMartenDatabase _database; private readonly ILogger _logger; private readonly DaemonSettings _settings; - private NpgsqlConnection _connection; - private Timer _timer; - + private NpgsqlConnection? _connection; + private PeriodicTimer? _periodicTimer; public HotColdCoordinator(IMartenDatabase database, DaemonSettings settings, ILogger logger) { @@ -43,7 +41,7 @@ public Task Start(IProjectionDaemon daemon, CancellationToken token) return Task.CompletedTask; } - public IProjectionDaemon Daemon { get; private set; } + public IProjectionDaemon? Daemon { get; private set; } public async Task Stop() { @@ -66,7 +64,7 @@ public async Task Stop() public void Dispose() { _connection?.SafeDispose(); - _timer?.SafeDispose(); + _periodicTimer?.SafeDispose(); } public async Task Query(ISingleQueryHandler handler, CancellationToken cancellation) @@ -90,11 +88,11 @@ public async Task Query(ISingleQueryHandler handler, CancellationToken public async Task SingleCommit(DbCommand command, CancellationToken cancellation) { - NpgsqlTransaction tx = null; + NpgsqlTransaction? tx = null; try { - tx = await _connection.BeginTransactionAsync(cancellation).ConfigureAwait(false); + tx = await _connection!.BeginTransactionAsync(cancellation).ConfigureAwait(false); command.Connection = _connection; await command.ExecuteNonQueryAsync(cancellation).ConfigureAwait(false); @@ -119,29 +117,37 @@ public async Task SingleCommit(DbCommand command, CancellationToken cancellation private void startPollingForOwnership() { - _timer = new Timer { AutoReset = false, Interval = _settings.LeadershipPollingTime }; - - _timer.Elapsed += TimerOnElapsed; - - _timer.Start(); + _periodicTimer = new PeriodicTimer(TimeSpan.FromMilliseconds(_settings.LeadershipPollingTime)); +#pragma warning disable MA0040 + _ = Task.Run(async () => + { + while (await _periodicTimer.WaitForNextTickAsync(_cancellation.Token).ConfigureAwait(false)) + { + var attained = await tryToAttainLockAndStartShards().ConfigureAwait(false); + if (attained) + break; + } + _periodicTimer.Dispose(); + }); +#pragma warning restore MA0040 } + public bool IsPrimary { get; private set; } + private async Task tryToAttainLockAndStartShards() { - var gotLock = false; - - NpgsqlConnection conn = null; + NpgsqlConnection? conn = null; try { conn = _database.CreateConnection(); await conn.OpenAsync(_cancellation.Token).ConfigureAwait(false); - gotLock = (bool)await conn.CreateCommand("SELECT pg_try_advisory_lock(:id);") + IsPrimary = (bool)(await conn.CreateCommand("SELECT pg_try_advisory_lock(:id);") .With("id", _settings.DaemonLockId) - .ExecuteScalarAsync(_cancellation.Token).ConfigureAwait(false); + .ExecuteScalarAsync(_cancellation.Token).ConfigureAwait(false))!; - if (!gotLock) + if (!IsPrimary) { await conn.CloseAsync().ConfigureAwait(false); } @@ -163,7 +169,7 @@ private async Task tryToAttainLockAndStartShards() _database.Identifier); } - if (gotLock) + if (IsPrimary) { _logger.LogInformation( "Attained lock for the async daemon for database {Database}, attempting to start all shards", @@ -171,8 +177,7 @@ private async Task tryToAttainLockAndStartShards() try { - await startAllProjections(conn).ConfigureAwait(false); - stopPollingForOwnership(); + await startAllProjections(conn!).ConfigureAwait(false); return true; } @@ -185,36 +190,14 @@ private async Task tryToAttainLockAndStartShards() await Stop().ConfigureAwait(false); } } - - if (_timer == null || !_timer.Enabled) - { - startPollingForOwnership(); - } - else - { - _timer.Start(); - } - return false; } - private void TimerOnElapsed(object sender, ElapsedEventArgs e) - { - tryToAttainLockAndStartShards().GetAwaiter().GetResult(); - } - - private void stopPollingForOwnership() - { - _timer.Enabled = false; - _timer.SafeDispose(); - _timer = null; - } - private Task startAllProjections(NpgsqlConnection conn) { _connection = conn; - return Daemon.StartAllShards(); + return Daemon!.StartAllShards(); } private async Task reopenConnectionIfNecessary(CancellationToken cancellation) @@ -229,7 +212,7 @@ private async Task reopenConnectionIfNecessary(CancellationToken cancellation) var restarted = await tryToAttainLockAndStartShards().ConfigureAwait(false); if (!restarted) { - await Daemon.StopAll().ConfigureAwait(false); + await Daemon!.StopAll().ConfigureAwait(false); startPollingForOwnership(); } } diff --git a/src/Marten/Events/Daemon/INodeCoordinator.cs b/src/Marten/Events/Daemon/INodeCoordinator.cs index fed357e677..9f58f11d19 100644 --- a/src/Marten/Events/Daemon/INodeCoordinator.cs +++ b/src/Marten/Events/Daemon/INodeCoordinator.cs @@ -1,3 +1,4 @@ +#nullable enable using System; using System.Threading; using System.Threading.Tasks; @@ -14,7 +15,13 @@ public interface INodeCoordinator: IDisposable /// /// Current daemon being controlled /// - IProjectionDaemon Daemon { get; } + IProjectionDaemon? Daemon { get; } + + /// + /// Indicates if the current coordinator is responsible for running the async daemon. + /// Will always be true in single-node environments. + /// + bool IsPrimary { get; } /// /// Called at the start of the application to register the projection diff --git a/src/Marten/Events/Daemon/IProjectionDaemon.cs b/src/Marten/Events/Daemon/IProjectionDaemon.cs index 5723886293..d3f9aca0a4 100644 --- a/src/Marten/Events/Daemon/IProjectionDaemon.cs +++ b/src/Marten/Events/Daemon/IProjectionDaemon.cs @@ -15,6 +15,11 @@ public interface IProjectionDaemon: IDisposable /// ShardStateTracker Tracker { get; } + /// + /// Indicates if this daemon is currently running + /// + bool IsRunning { get; } + /// /// Rebuilds a single projection by projection name inline. /// Will timeout if a shard takes longer than 5 minutes. diff --git a/src/Marten/Events/Daemon/SoloCoordinator.cs b/src/Marten/Events/Daemon/SoloCoordinator.cs index f8b42a06d5..f159727369 100644 --- a/src/Marten/Events/Daemon/SoloCoordinator.cs +++ b/src/Marten/Events/Daemon/SoloCoordinator.cs @@ -1,3 +1,4 @@ +#nullable enable using System.Threading; using System.Threading.Tasks; @@ -7,9 +8,10 @@ namespace Marten.Events.Daemon; /// Default projection coordinator, assumes that there is only one /// single node /// -internal class SoloCoordinator: INodeCoordinator +internal sealed class SoloCoordinator: INodeCoordinator { - public IProjectionDaemon Daemon { get; private set; } + public IProjectionDaemon? Daemon { get; private set; } + public bool IsPrimary => true; public Task Start(IProjectionDaemon agent, CancellationToken token) { @@ -19,7 +21,7 @@ public Task Start(IProjectionDaemon agent, CancellationToken token) public Task Stop() { - return Daemon.StopAll(); + return Daemon!.StopAll(); } public void Dispose()