Skip to content

Commit

Permalink
Minor feature tweaks to coordinators
Browse files Browse the repository at this point in the history
  • Loading branch information
Hawxy committed Jan 13, 2024
1 parent b1177d8 commit 4e2bde4
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 52 deletions.
79 changes: 31 additions & 48 deletions src/Marten/Events/Daemon/HotColdCoordinator.cs
Original file line number Diff line number Diff line change
@@ -1,33 +1,31 @@
#nullable enable
using System;
using System.Data;
using System.Data.Common;
using System.Threading;
using System.Threading.Tasks;
using System.Timers;
using JasperFx.Core;
using Marten.Services;
using Marten.Storage;
using Microsoft.Extensions.Logging;
using Npgsql;
using Weasel.Core;
using Weasel.Postgresql;
using Timer = System.Timers.Timer;

namespace Marten.Events.Daemon;

/// <summary>
/// Coordinate the async daemon in the case of hot/cold failover
/// where only one node at a time should be running the async daemon
/// </summary>
internal class HotColdCoordinator: INodeCoordinator, ISingleQueryRunner, IDisposable
internal sealed class HotColdCoordinator: INodeCoordinator, ISingleQueryRunner
{
private readonly CancellationTokenSource _cancellation = new();
private readonly IMartenDatabase _database;
private readonly ILogger _logger;
private readonly DaemonSettings _settings;
private NpgsqlConnection _connection;
private Timer _timer;

private NpgsqlConnection? _connection;
private PeriodicTimer? _periodicTimer;

public HotColdCoordinator(IMartenDatabase database, DaemonSettings settings, ILogger logger)
{
Expand All @@ -43,7 +41,7 @@ public Task Start(IProjectionDaemon daemon, CancellationToken token)
return Task.CompletedTask;
}

public IProjectionDaemon Daemon { get; private set; }
public IProjectionDaemon? Daemon { get; private set; }

public async Task Stop()
{
Expand All @@ -66,7 +64,7 @@ public async Task Stop()
public void Dispose()
{
_connection?.SafeDispose();
_timer?.SafeDispose();
_periodicTimer?.SafeDispose();
}

public async Task<T> Query<T>(ISingleQueryHandler<T> handler, CancellationToken cancellation)
Expand All @@ -90,11 +88,11 @@ public async Task<T> Query<T>(ISingleQueryHandler<T> handler, CancellationToken

public async Task SingleCommit(DbCommand command, CancellationToken cancellation)
{
NpgsqlTransaction tx = null;
NpgsqlTransaction? tx = null;

try
{
tx = await _connection.BeginTransactionAsync(cancellation).ConfigureAwait(false);
tx = await _connection!.BeginTransactionAsync(cancellation).ConfigureAwait(false);
command.Connection = _connection;

await command.ExecuteNonQueryAsync(cancellation).ConfigureAwait(false);
Expand All @@ -119,29 +117,37 @@ public async Task SingleCommit(DbCommand command, CancellationToken cancellation

private void startPollingForOwnership()
{
_timer = new Timer { AutoReset = false, Interval = _settings.LeadershipPollingTime };

_timer.Elapsed += TimerOnElapsed;

_timer.Start();
_periodicTimer = new PeriodicTimer(TimeSpan.FromMilliseconds(_settings.LeadershipPollingTime));
#pragma warning disable MA0040
_ = Task.Run(async () =>
{
while (await _periodicTimer.WaitForNextTickAsync(_cancellation.Token).ConfigureAwait(false))
{
var attained = await tryToAttainLockAndStartShards().ConfigureAwait(false);
if (attained)
break;
}
_periodicTimer.Dispose();
});
#pragma warning restore MA0040
}

public bool IsPrimary { get; private set; }

private async Task<bool> tryToAttainLockAndStartShards()
{
var gotLock = false;

NpgsqlConnection conn = null;
NpgsqlConnection? conn = null;

try
{
conn = _database.CreateConnection();
await conn.OpenAsync(_cancellation.Token).ConfigureAwait(false);

gotLock = (bool)await conn.CreateCommand("SELECT pg_try_advisory_lock(:id);")
IsPrimary = (bool)(await conn.CreateCommand("SELECT pg_try_advisory_lock(:id);")
.With("id", _settings.DaemonLockId)
.ExecuteScalarAsync(_cancellation.Token).ConfigureAwait(false);
.ExecuteScalarAsync(_cancellation.Token).ConfigureAwait(false))!;

if (!gotLock)
if (!IsPrimary)
{
await conn.CloseAsync().ConfigureAwait(false);
}
Expand All @@ -163,16 +169,15 @@ private async Task<bool> tryToAttainLockAndStartShards()
_database.Identifier);
}

if (gotLock)
if (IsPrimary)
{
_logger.LogInformation(
"Attained lock for the async daemon for database {Database}, attempting to start all shards",
_database.Identifier);

try
{
await startAllProjections(conn).ConfigureAwait(false);
stopPollingForOwnership();
await startAllProjections(conn!).ConfigureAwait(false);

return true;
}
Expand All @@ -185,36 +190,14 @@ private async Task<bool> tryToAttainLockAndStartShards()
await Stop().ConfigureAwait(false);
}
}

if (_timer == null || !_timer.Enabled)
{
startPollingForOwnership();
}
else
{
_timer.Start();
}

return false;
}

private void TimerOnElapsed(object sender, ElapsedEventArgs e)
{
tryToAttainLockAndStartShards().GetAwaiter().GetResult();
}

private void stopPollingForOwnership()
{
_timer.Enabled = false;
_timer.SafeDispose();
_timer = null;
}

private Task startAllProjections(NpgsqlConnection conn)
{
_connection = conn;

return Daemon.StartAllShards();
return Daemon!.StartAllShards();
}

private async Task reopenConnectionIfNecessary(CancellationToken cancellation)
Expand All @@ -229,7 +212,7 @@ private async Task reopenConnectionIfNecessary(CancellationToken cancellation)
var restarted = await tryToAttainLockAndStartShards().ConfigureAwait(false);
if (!restarted)
{
await Daemon.StopAll().ConfigureAwait(false);
await Daemon!.StopAll().ConfigureAwait(false);
startPollingForOwnership();
}
}
Expand Down
9 changes: 8 additions & 1 deletion src/Marten/Events/Daemon/INodeCoordinator.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#nullable enable
using System;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -14,7 +15,13 @@ public interface INodeCoordinator: IDisposable
/// <summary>
/// Current daemon being controlled
/// </summary>
IProjectionDaemon Daemon { get; }
IProjectionDaemon? Daemon { get; }

/// <summary>
/// Indicates if the current coordinator is responsible for running the async daemon.
/// Will always be true in single-node environments.
/// </summary>
bool IsPrimary { get; }

/// <summary>
/// Called at the start of the application to register the projection
Expand Down
5 changes: 5 additions & 0 deletions src/Marten/Events/Daemon/IProjectionDaemon.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ public interface IProjectionDaemon: IDisposable
/// </summary>
ShardStateTracker Tracker { get; }

/// <summary>
/// Indicates if this daemon is currently running
/// </summary>
bool IsRunning { get; }

/// <summary>
/// Rebuilds a single projection by projection name inline.
/// Will timeout if a shard takes longer than 5 minutes.
Expand Down
8 changes: 5 additions & 3 deletions src/Marten/Events/Daemon/SoloCoordinator.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#nullable enable
using System.Threading;
using System.Threading.Tasks;

Expand All @@ -7,9 +8,10 @@ namespace Marten.Events.Daemon;
/// Default projection coordinator, assumes that there is only one
/// single node
/// </summary>
internal class SoloCoordinator: INodeCoordinator
internal sealed class SoloCoordinator: INodeCoordinator
{
public IProjectionDaemon Daemon { get; private set; }
public IProjectionDaemon? Daemon { get; private set; }
public bool IsPrimary => true;

public Task Start(IProjectionDaemon agent, CancellationToken token)
{
Expand All @@ -19,7 +21,7 @@ public Task Start(IProjectionDaemon agent, CancellationToken token)

public Task Stop()
{
return Daemon.StopAll();
return Daemon!.StopAll();
}

public void Dispose()
Expand Down

0 comments on commit 4e2bde4

Please sign in to comment.