diff --git a/src/main/Hangfire.Storage.SQLite/ExpirationManager.cs b/src/main/Hangfire.Storage.SQLite/ExpirationManager.cs index e3cb2bd..1627a6e 100644 --- a/src/main/Hangfire.Storage.SQLite/ExpirationManager.cs +++ b/src/main/Hangfire.Storage.SQLite/ExpirationManager.cs @@ -105,10 +105,8 @@ private int RemoveExpireRows(HangfireDbContext db, string table) try { - var _lock = new SQLiteDistributedLock(DistributedLockKey, DefaultLockTimeout, - db, db.StorageOptions); - - using (_lock) + using (SQLiteDistributedLock.Acquire(DistributedLockKey, DefaultLockTimeout, + db, db.StorageOptions)) { rowsAffected = db.Database.Execute(deleteScript); } diff --git a/src/main/Hangfire.Storage.SQLite/Hangfire.Storage.SQLite.csproj b/src/main/Hangfire.Storage.SQLite/Hangfire.Storage.SQLite.csproj index 7c4da02..b5601ba 100644 --- a/src/main/Hangfire.Storage.SQLite/Hangfire.Storage.SQLite.csproj +++ b/src/main/Hangfire.Storage.SQLite/Hangfire.Storage.SQLite.csproj @@ -7,7 +7,7 @@ netstandard2.0;net48 - 0.4.1 + 0.4.2 RaisedApp RaisedApp Copyright © 2019 - Present @@ -20,8 +20,12 @@ Hangfire Storage SQLite An Alternative SQLite Storage for Hangfire - 0.4.1 - - Stability and retry enhancements introduced by: Daniel Lindblom + 0.4.2 + -remove re-entrancy (fixes SQLiteDistributedLock doesn't play right with async #68). Thanks to @kirides + -pause heartbeat timer while processing. Thanks to @kirides + -update expiration using SQL Update statement in a single step. Thanks to @kirides + -Added Heartbeat event (for testing). Thanks to @kirides + -if we no longer own the lock, we immediately dispose the heartbeat timer (fixes Unable to update heartbeat - still happening in .NET 6.0 #69). Thanks to @kirides diff --git a/src/main/Hangfire.Storage.SQLite/HangfireSQLiteConnection.cs b/src/main/Hangfire.Storage.SQLite/HangfireSQLiteConnection.cs index 0c23d29..48668f3 100644 --- a/src/main/Hangfire.Storage.SQLite/HangfireSQLiteConnection.cs +++ b/src/main/Hangfire.Storage.SQLite/HangfireSQLiteConnection.cs @@ -47,7 +47,7 @@ public override void Dispose() public override IDisposable AcquireDistributedLock(string resource, TimeSpan timeout) { return Retry.Twice((_) => - new SQLiteDistributedLock($"HangFire:{resource}", timeout, DbContext, _storageOptions) + SQLiteDistributedLock.Acquire($"HangFire:{resource}", timeout, DbContext, _storageOptions) ); } diff --git a/src/main/Hangfire.Storage.SQLite/SQLiteDistributedLock.cs b/src/main/Hangfire.Storage.SQLite/SQLiteDistributedLock.cs index ef4b534..6b78114 100644 --- a/src/main/Hangfire.Storage.SQLite/SQLiteDistributedLock.cs +++ b/src/main/Hangfire.Storage.SQLite/SQLiteDistributedLock.cs @@ -2,7 +2,7 @@ using Hangfire.Storage.SQLite.Entities; using SQLite; using System; -using System.Collections.Generic; +using System.Diagnostics; using System.Threading; namespace Hangfire.Storage.SQLite @@ -14,9 +14,6 @@ public class SQLiteDistributedLock : IDisposable { private static readonly ILog Logger = LogProvider.For(); - private static readonly ThreadLocal> AcquiredLocks - = new ThreadLocal>(() => new Dictionary(StringComparer.OrdinalIgnoreCase)); - private readonly string _resource; private readonly string _resourceKey; @@ -30,15 +27,17 @@ private static readonly ThreadLocal> AcquiredLocks private string EventWaitHandleName => string.Intern($@"{GetType().FullName}.{_resource}"); + public event Action Heartbeat; + /// /// Creates SQLite distributed lock /// /// Lock resource - /// Lock timeout /// Lock database /// Database options /// Thrown if lock is not acuired within the timeout - public SQLiteDistributedLock(string resource, TimeSpan timeout, HangfireDbContext database, + private SQLiteDistributedLock(string resource, + HangfireDbContext database, SQLiteStorageOptions storageOptions) { _resource = resource ?? throw new ArgumentNullException(nameof(resource)); @@ -50,22 +49,25 @@ public SQLiteDistributedLock(string resource, TimeSpan timeout, HangfireDbContex { throw new ArgumentException($@"The {nameof(resource)} cannot be empty", nameof(resource)); } + } + + public static SQLiteDistributedLock Acquire( + string resource, + TimeSpan timeout, + HangfireDbContext database, + SQLiteStorageOptions storageOptions) + { if (timeout.TotalSeconds > int.MaxValue) { throw new ArgumentException($"The timeout specified is too large. Please supply a timeout equal to or less than {int.MaxValue} seconds", nameof(timeout)); } - if (!AcquiredLocks.Value.ContainsKey(_resource) || AcquiredLocks.Value[_resource] == 0) - { - Cleanup(); - Acquire(timeout); - AcquiredLocks.Value[_resource] = 1; - StartHeartBeat(); - } - else - { - AcquiredLocks.Value[_resource]++; - } + var slock = new SQLiteDistributedLock(resource, database, storageOptions); + + slock.Acquire(timeout); + slock.StartHeartBeat(); + + return slock; } /// @@ -78,96 +80,52 @@ public void Dispose() { return; } + _completed = true; + _heartbeatTimer?.Dispose(); + Release(); + } - if (!AcquiredLocks.Value.ContainsKey(_resource)) + private bool TryAcquireLock() + { + Cleanup(); + try { - return; - } - - AcquiredLocks.Value[_resource]--; + var distributedLock = new DistributedLock + { + Id = Guid.NewGuid().ToString(), + Resource = _resource, + ResourceKey = _resourceKey, + ExpireAt = DateTime.UtcNow.Add(_storageOptions.DistributedLockLifetime) + }; - if (AcquiredLocks.Value[_resource] > 0) - { - return; + return _dbContext.Database.Insert(distributedLock) == 1; } - - // Timer callback may be invoked after the Dispose method call, - // but since we use the resource key, we will not disturb other owners. - AcquiredLocks.Value.Remove(_resource); - - if (_heartbeatTimer != null) + catch (SQLiteException e) when (e.Result == SQLite3.Result.Constraint) { - _heartbeatTimer.Dispose(); - _heartbeatTimer = null; + return false; } - - Release(); - - Cleanup(); } private void Acquire(TimeSpan timeout) { - try + var sw = Stopwatch.StartNew(); + do { - var isLockAcquired = false; - var now = DateTime.UtcNow; - var lockTimeoutTime = now.Add(timeout); - - while (lockTimeoutTime >= now) + if (TryAcquireLock()) { - Cleanup(); - - lock (EventWaitHandleName) - { - var result = _dbContext.DistributedLockRepository.FirstOrDefault(_ => _.Resource == _resource); - - if (result == null) - { - try - { - var distributedLock = new DistributedLock(); - distributedLock.Id = Guid.NewGuid().ToString(); - distributedLock.Resource = _resource; - distributedLock.ResourceKey = _resourceKey; - distributedLock.ExpireAt = DateTime.UtcNow.Add(_storageOptions.DistributedLockLifetime); - - _dbContext.Database.Insert(distributedLock); - - // we were able to acquire the lock - break the loop - isLockAcquired = true; - break; - } - catch (SQLiteException e) when (e.Result == SQLite3.Result.Constraint) - { - // The lock already exists preventing us from inserting. - continue; - } - } - } - - // we couldn't acquire the lock - wait a bit and try again - var waitTime = (int)timeout.TotalMilliseconds / 10; - lock (EventWaitHandleName) - Monitor.Wait(EventWaitHandleName, waitTime); - - now = DateTime.UtcNow; + return; } - if (!isLockAcquired) + var waitTime = (int) timeout.TotalMilliseconds / 10; + // either wait for the event to be raised, or timeout + lock (EventWaitHandleName) { - throw new DistributedLockTimeoutException(_resource); + Monitor.Wait(EventWaitHandleName, waitTime); } - } - catch (DistributedLockTimeoutException ex) - { - throw ex; - } - catch (Exception ex) - { - throw ex; - } + } while (sw.Elapsed <= timeout); + + throw new DistributedLockTimeoutException(_resource); } /// @@ -179,9 +137,12 @@ private void Release() Retry.Twice((retry) => { // Remove resource lock (if it's still ours) - _dbContext.DistributedLockRepository.Delete(_ => _.Resource == _resource && _.ResourceKey == _resourceKey); - lock (EventWaitHandleName) - Monitor.Pulse(EventWaitHandleName); + var count = _dbContext.DistributedLockRepository.Delete(_ => _.Resource == _resource && _.ResourceKey == _resourceKey); + if (count != 0) + { + lock (EventWaitHandleName) + Monitor.Pulse(EventWaitHandleName); + } }); } @@ -192,7 +153,7 @@ private void Cleanup() Retry.Twice((_) => { // Delete expired locks (of any owner) _dbContext.DistributedLockRepository. - Delete(x => x.Resource == _resource && x.ExpireAt < DateTime.UtcNow); + Delete(x => x.Resource == _resource && x.ExpireAt < DateTime.UtcNow); }); } catch (Exception ex) @@ -210,27 +171,48 @@ private void StartHeartBeat() _heartbeatTimer = new Timer(state => { + // stop timer + _heartbeatTimer?.Change(Timeout.Infinite, Timeout.Infinite); // Timer callback may be invoked after the Dispose method call, // but since we use the resource key, we will not disturb other owners. try { - var distributedLock = _dbContext.DistributedLockRepository.FirstOrDefault(x => x.Resource == _resource && x.ResourceKey == _resourceKey); - if (distributedLock != null) - { - distributedLock.ExpireAt = DateTime.UtcNow.Add(_storageOptions.DistributedLockLifetime); - - _dbContext.Database.Update(distributedLock); - } - else + var didUpdate = UpdateExpiration(_dbContext.DistributedLockRepository, DateTime.UtcNow.Add(_storageOptions.DistributedLockLifetime)); + Heartbeat?.Invoke(didUpdate); + if (!didUpdate) { Logger.ErrorFormat("Unable to update heartbeat on the resource '{0}'. The resource is not locked or is locked by another owner.", _resource); + + // if we no longer have a lock, stop the heartbeat immediately + _heartbeatTimer?.Dispose(); + return; } } catch (Exception ex) { Logger.ErrorFormat("Unable to update heartbeat on the resource '{0}'. {1}", _resource, ex); } + // restart timer + _heartbeatTimer?.Change(timerInterval, timerInterval); }, null, timerInterval, timerInterval); } + + private bool UpdateExpiration(TableQuery tableQuery, DateTime expireAt) + { + var expireColumn = tableQuery.Table.FindColumnWithPropertyName(nameof(DistributedLock.ExpireAt)).Name; + var resourceColumn = tableQuery.Table.FindColumnWithPropertyName(nameof(DistributedLock.Resource)).Name; + var resourceKeyColumn = tableQuery.Table.FindColumnWithPropertyName(nameof(DistributedLock.ResourceKey)).Name; + var table = tableQuery.Table.TableName; + + var command = tableQuery.Connection.CreateCommand($@"UPDATE ""{table}"" + SET ""{expireColumn}"" = ? + WHERE ""{resourceColumn}"" = ? + AND ""{resourceKeyColumn}"" = ?", + expireAt, + _resource, + _resourceKey); + + return command.ExecuteNonQuery() != 0; + } } -} +} \ No newline at end of file diff --git a/src/main/Hangfire.Storage.SQLite/SQLiteStorage.cs b/src/main/Hangfire.Storage.SQLite/SQLiteStorage.cs index 7fe98ed..7f7adc3 100644 --- a/src/main/Hangfire.Storage.SQLite/SQLiteStorage.cs +++ b/src/main/Hangfire.Storage.SQLite/SQLiteStorage.cs @@ -14,6 +14,24 @@ public class SQLiteStorage : JobStorage, IDisposable private readonly SQLiteDbConnectionFactory _dbConnectionFactory; private readonly SQLiteStorageOptions _storageOptions; + + private readonly Dictionary _features = new Dictionary(StringComparer.OrdinalIgnoreCase) + { + { "Storage.ExtendedApi", false }, + { "Job.Queue", true }, + { "Connection.GetUtcDateTime", false }, + { "Connection.BatchedGetFirstByLowestScoreFromSet", false }, + { "Connection.GetSetContains", true }, + { "Connection.GetSetCount.Limited", false }, + { "BatchedGetFirstByLowestScoreFromSet", false }, + { "Transaction.AcquireDistributedLock", true }, + { "Transaction.CreateJob", true }, + { "Transaction.SetJobParameter", true }, + { "TransactionalAcknowledge:InMemoryFetchedJob", false }, + { "Monitoring.DeletedStateGraphs", false }, + { "Monitoring.AwaitingJobs", false } + }; + private ConcurrentQueue _dbContextPool = new ConcurrentQueue(); /// @@ -113,6 +131,15 @@ private void EnqueueOrPhaseOut(PooledHangfireDbContext dbContext) } } + public override bool HasFeature(string featureId) + { + if (featureId == null) throw new ArgumentNullException(nameof(featureId)); + + return _features.TryGetValue(featureId, out var isSupported) + ? isSupported + : base.HasFeature(featureId); + } + /// /// Returns text representation of the object /// diff --git a/src/samples/WebSample/Program.cs b/src/samples/WebSample/Program.cs index cfc5754..3ad0c0d 100644 --- a/src/samples/WebSample/Program.cs +++ b/src/samples/WebSample/Program.cs @@ -18,7 +18,10 @@ .UseSQLiteStorage("Hangfire.db") .UseHeartbeatPage(checkInterval: TimeSpan.FromSeconds(10)) .UseJobsLogger()); -services.AddHangfireServer(); +services.AddHangfireServer(options => +{ + options.Queues = new[] { "test_queue_1", "default" }; +}); var app = builder.Build(); @@ -27,4 +30,12 @@ RecurringJob.AddOrUpdate("TaskMethod()", (TaskSample t) => t.TaskMethod(), Cron.Minutely); RecurringJob.AddOrUpdate("TaskMethod2()", (TaskSample t) => t.TaskMethod2(null), Cron.Minutely); +var t = app.Services.GetService(); +t.Enqueue(queue: "test_queue_1", methodCall: () => Console.WriteLine("Testing......")); +t.Enqueue(queue: "test_queue_1", methodCall: () => Console.WriteLine("Testing......")); +t.Enqueue(queue: "test_queue_1", methodCall: () => Console.WriteLine("Testing......")); +t.Enqueue(queue: "test_queue_1", methodCall: () => Console.WriteLine("Testing......")); +t.Enqueue(queue: "test_queue_1", methodCall: () => Console.WriteLine("Testing......")); +t.Enqueue(queue: "test_queue_1", methodCall: () => Console.WriteLine("Testing......")); + app.Run(); \ No newline at end of file diff --git a/src/test/Hangfire.Storage.SQLite.Test/SQLiteDistributedLockFacts.cs b/src/test/Hangfire.Storage.SQLite.Test/SQLiteDistributedLockFacts.cs index 62b6e8c..f536ab2 100644 --- a/src/test/Hangfire.Storage.SQLite.Test/SQLiteDistributedLockFacts.cs +++ b/src/test/Hangfire.Storage.SQLite.Test/SQLiteDistributedLockFacts.cs @@ -1,8 +1,10 @@ using Hangfire.Storage.SQLite.Entities; using Hangfire.Storage.SQLite.Test.Utils; using System; +using System.Diagnostics; using System.Linq; using System.Threading; +using System.Threading.Tasks; using Xunit; namespace Hangfire.Storage.SQLite.Test @@ -15,7 +17,7 @@ public void Ctor_ThrowsAnException_WhenResourceIsNull() UseConnection(database => { var exception = Assert.Throws( - () => new SQLiteDistributedLock(null, TimeSpan.Zero, database, new SQLiteStorageOptions())); + () => SQLiteDistributedLock.Acquire(null, TimeSpan.Zero, database, new SQLiteStorageOptions())); Assert.Equal("resource", exception.ParamName); }); @@ -25,7 +27,7 @@ public void Ctor_ThrowsAnException_WhenResourceIsNull() public void Ctor_ThrowsAnException_WhenConnectionIsNull() { var exception = Assert.Throws( - () => new SQLiteDistributedLock("resource1", TimeSpan.Zero, null, new SQLiteStorageOptions())); + () => SQLiteDistributedLock.Acquire("resource1", TimeSpan.Zero, null, new SQLiteStorageOptions())); Assert.Equal("database", exception.ParamName); } @@ -36,7 +38,7 @@ public void Ctor_SetLock_WhenResourceIsNotLocked() UseConnection(database => { using ( - new SQLiteDistributedLock("resource1", TimeSpan.Zero, database, new SQLiteStorageOptions())) + SQLiteDistributedLock.Acquire("resource1", TimeSpan.Zero, database, new SQLiteStorageOptions())) { var locksCount = database.DistributedLockRepository.Count(_ => _.Resource == "resource1"); @@ -50,7 +52,7 @@ public void Ctor_SetReleaseLock_WhenResourceIsNotLocked() { UseConnection(database => { - using (new SQLiteDistributedLock("resource1", TimeSpan.Zero, database, new SQLiteStorageOptions())) + using (SQLiteDistributedLock.Acquire("resource1", TimeSpan.Zero, database, new SQLiteStorageOptions())) { var locksCount = database.DistributedLockRepository.Count(_ => _.Resource == "resource1"); Assert.Equal(1, locksCount); @@ -62,38 +64,49 @@ public void Ctor_SetReleaseLock_WhenResourceIsNotLocked() } [Fact] - public void Ctor_AcquireLockWithinSameThread_WhenResourceIsLocked() + public void Ctor_AcquireLockWithinSameThread_WhenResourceIsLocked_Should_Fail() { UseConnection(database => { - using (new SQLiteDistributedLock("resource1", TimeSpan.Zero, database, new SQLiteStorageOptions())) + using (SQLiteDistributedLock.Acquire("resource1", TimeSpan.Zero, database, new SQLiteStorageOptions())) { var locksCount = database.DistributedLockRepository.Count(_ => _.Resource == "resource1"); Assert.Equal(1, locksCount); - using (new SQLiteDistributedLock("resource1", TimeSpan.Zero, database, new SQLiteStorageOptions())) + Assert.Throws(() => { - locksCount = database.DistributedLockRepository.Count(_ => _.Resource == "resource1"); - Assert.Equal(1, locksCount); - } + using (SQLiteDistributedLock.Acquire("resource1", TimeSpan.Zero, database, new SQLiteStorageOptions())) + { + locksCount = database.DistributedLockRepository.Count(_ => _.Resource == "resource1"); + Assert.Equal(1, locksCount); + } + }); } }); } + + private Thread NewBackgroundThread(ThreadStart start) + { + return new Thread(start) + { + IsBackground = true + }; + } [Fact] public void Ctor_ThrowsAnException_WhenResourceIsLocked() { UseConnection(database => { - using (new SQLiteDistributedLock("resource1", TimeSpan.Zero, database, new SQLiteStorageOptions())) + using (SQLiteDistributedLock.Acquire("resource1", TimeSpan.Zero, database, new SQLiteStorageOptions())) { var locksCount = database.DistributedLockRepository.Count(_ => _.Resource == "resource1"); Assert.Equal(1, locksCount); - var t = new Thread(() => + var t = NewBackgroundThread(() => { Assert.Throws(() => - new SQLiteDistributedLock("resource1", TimeSpan.Zero, database, new SQLiteStorageOptions())); + SQLiteDistributedLock.Acquire("resource1", TimeSpan.Zero, database, new SQLiteStorageOptions())); }); t.Start(); Assert.True(t.Join(5000), "Thread is hanging unexpected"); @@ -106,11 +119,11 @@ public void Ctor_WaitForLock_SignaledAtLockRelease() { var storage = ConnectionUtils.CreateStorage(); using var mre = new ManualResetEventSlim(); - var t = new Thread(() => + var t = NewBackgroundThread(() => { UseConnection(database => { - using (new SQLiteDistributedLock("resource1", TimeSpan.Zero, database, new SQLiteStorageOptions())) + using (SQLiteDistributedLock.Acquire("resource1", TimeSpan.Zero, database, new SQLiteStorageOptions())) { mre.Set(); Thread.Sleep(TimeSpan.FromSeconds(3)); @@ -125,11 +138,13 @@ public void Ctor_WaitForLock_SignaledAtLockRelease() mre.Wait(TimeSpan.FromSeconds(5)); // Record when we try to aquire the lock - var startTime = DateTime.UtcNow; - using (new SQLiteDistributedLock("resource1", TimeSpan.FromSeconds(10), database, new SQLiteStorageOptions())) + var startTime = Stopwatch.StartNew(); + using (SQLiteDistributedLock.Acquire("resource1", TimeSpan.FromMinutes(15), database, new SQLiteStorageOptions())) { - Assert.InRange(DateTime.UtcNow - startTime, TimeSpan.Zero, TimeSpan.FromSeconds(5)); + Assert.InRange(startTime.Elapsed, TimeSpan.Zero, TimeSpan.FromSeconds(5)); } + + t.Join(); }, storage); } @@ -143,14 +158,14 @@ public void Ctor_WaitForLock_OnlySingleLockCanBeAcquired() var storage = ConnectionUtils.CreateStorage(); // Spawn multiple threads to race each other. - var threads = Enumerable.Range(0, numThreads).Select(i => new Thread(() => + var threads = Enumerable.Range(0, numThreads).Select(i => NewBackgroundThread(() => { using var connection = storage.CreateAndOpenConnection(); // Wait for the start signal. manualResetEvent.Wait(); // Attempt to acquire the distributed lock. - using (new SQLiteDistributedLock("resource1", TimeSpan.FromSeconds(10), connection, new SQLiteStorageOptions())) + using (SQLiteDistributedLock.Acquire("resource1", TimeSpan.FromSeconds(10), connection, new SQLiteStorageOptions())) { // Find out if any other threads managed to acquire the lock. var oldConcurrencyCounter = Interlocked.CompareExchange(ref concurrencyCounter, 1, 0); @@ -174,7 +189,6 @@ public void Ctor_WaitForLock_OnlySingleLockCanBeAcquired() threads.ForEach(t => Assert.True(t.Join(TimeSpan.FromSeconds(120)), "Thread is hanging unexpected")); // All the threads should report success. - Interlocked.MemoryBarrier(); Assert.DoesNotContain(false, success); } @@ -184,7 +198,7 @@ public void Ctor_ThrowsAnException_WhenOptionsIsNull() UseConnection(database => { var exception = Assert.Throws(() => - new SQLiteDistributedLock("resource1", TimeSpan.Zero, database, null)); + SQLiteDistributedLock.Acquire("resource1", TimeSpan.Zero, database, null)); Assert.Equal("storageOptions", exception.ParamName); }); @@ -195,7 +209,7 @@ public void Ctor_SetLockExpireAtWorks_WhenResourceIsNotLocked() { UseConnection(database => { - using (new SQLiteDistributedLock("resource1", TimeSpan.Zero, database, new SQLiteStorageOptions() { DistributedLockLifetime = TimeSpan.FromSeconds(3) })) + using (SQLiteDistributedLock.Acquire("resource1", TimeSpan.Zero, database, new SQLiteStorageOptions() { DistributedLockLifetime = TimeSpan.FromSeconds(3) })) { DateTime initialExpireAt = DateTime.UtcNow; Thread.Sleep(TimeSpan.FromSeconds(5)); @@ -223,18 +237,85 @@ public void Ctor_SetLockExpireAtWorks_WhenResourceIsLockedAndExpiring() // try to get the lock in the next 10 seconds // ideally, after ~3 seconds, the constructor should succeed - using (new SQLiteDistributedLock("resource1", TimeSpan.FromSeconds(10), database, new SQLiteStorageOptions() { DistributedLockLifetime = TimeSpan.FromSeconds(3) })) + using (SQLiteDistributedLock.Acquire("resource1", TimeSpan.FromSeconds(10), database, new SQLiteStorageOptions() { DistributedLockLifetime = TimeSpan.FromSeconds(3) })) { DistributedLock lockEntry = database.DistributedLockRepository.FirstOrDefault(_ => _.Resource == "resource1"); Assert.NotNull(lockEntry); } }); } + + [Fact] + public async Task Heartbeat_Fires_WithSuccess() + { + await UseConnectionAsync(async database => + { + // try to get the lock in the next 10 seconds + // ideally, after ~3 seconds, the constructor should succeed + using var slock = SQLiteDistributedLock.Acquire("resource1", TimeSpan.FromSeconds(10), database, new SQLiteStorageOptions + { + DistributedLockLifetime = TimeSpan.FromSeconds(3) + }); + var result = await WaitForHeartBeat(slock, TimeSpan.FromSeconds(3)); + Assert.True(result); + }); + } + + [Fact] + public void Heartbeat_Fires_With_Fail_If_Lock_No_Longer_Exists() + { + UseConnection(database => + { + // try to get the lock in the next 10 seconds + // ideally, after ~3 seconds, the constructor should succeed + using var slock = SQLiteDistributedLock.Acquire("resource1", TimeSpan.FromSeconds(10), database, new SQLiteStorageOptions + { + DistributedLockLifetime = TimeSpan.FromSeconds(14) + }); + + using var mre = new ManualResetEventSlim(); + bool? lastResult = null; + slock.Heartbeat += success => + { + lastResult = success; + if (!success) + { + mre.Set(); + } + }; + database.DistributedLockRepository.Delete(x => x.Resource == "resource1"); + mre.Wait(TimeSpan.FromSeconds(10)); + Assert.False(lastResult, "Lock should have not been updated"); + }); + } + + private async Task WaitForHeartBeat(SQLiteDistributedLock slock, TimeSpan timeOut) + { + var tcs = new TaskCompletionSource(); + Action onHeartbeat = success => tcs.TrySetResult(success); + slock.Heartbeat += onHeartbeat; + + try + { + return await tcs.Task.WaitAsync(timeOut); + } + finally + { + slock.Heartbeat -= onHeartbeat; + } + } + private static void UseConnection(Action action, SQLiteStorage storage = null) { using var connection = storage?.CreateAndOpenConnection() ?? ConnectionUtils.CreateConnection(); action(connection); } + + private static async Task UseConnectionAsync(Func func, SQLiteStorage storage = null) + { + using var connection = storage?.CreateAndOpenConnection() ?? ConnectionUtils.CreateConnection(); + await func(connection); + } } } \ No newline at end of file