Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvements and utilities for automated testing using the async daemon #3228

Merged
merged 3 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions src/CoreTests/DocumentCleanerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,27 @@
using JasperFx.Core;
using JasperFx.Core.Reflection;
using Marten;
using Marten.Events.Daemon;
using Marten.Events.Daemon.Internals;
using Marten.Events.Daemon.Progress;
using Marten.Schema;
using Marten.Testing.Documents;
using Marten.Testing.Harness;
using Shouldly;
using Xunit;
using Xunit.Abstractions;

namespace CoreTests;

public class DocumentCleanerTests: OneOffConfigurationsContext
{
private readonly ITestOutputHelper _output;
private IDocumentCleaner theCleaner => theStore.Advanced.Clean;

public DocumentCleanerTests(ITestOutputHelper output)
{
_output = output;
}

[Fact]
public async Task clean_table()
Expand Down Expand Up @@ -135,6 +144,14 @@ public async Task delete_all_event_data()
[Fact]
public async Task delete_all_event_data_async()
{
theSession.Logger = new TestOutputMartenLogger(_output);

theSession.QueueOperation(new InsertProjectionProgress(theStore.Events,
new EventRange(new ShardName("Projection1", "All"), 1000)));

theSession.QueueOperation(new InsertProjectionProgress(theStore.Events,
new EventRange(new ShardName("Projection2", "All"), 1000)));

var streamId = Guid.NewGuid();
theSession.Events.StartStream<Quest>(streamId, new QuestStarted());

Expand All @@ -144,6 +161,9 @@ public async Task delete_all_event_data_async()

theSession.Events.QueryRawEventDataOnly<QuestStarted>().ShouldBeEmpty();
(await theSession.Events.FetchStreamAsync(streamId)).ShouldBeEmpty();

var progress = await theStore.Advanced.AllProjectionProgress();
progress.Any().ShouldBeFalse();
}

private static void ShouldBeEmpty<T>(T[] documentTables)
Expand Down
48 changes: 48 additions & 0 deletions src/DaemonTests/pausing_and_resuming_the_daemon.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
using System.Threading.Tasks;
using Confluent.Kafka;
using DaemonTests.TestingSupport;
using JasperFx.Core;
using Marten;
using Marten.Events;
using Marten.Events.Daemon.Resiliency;
using Marten.Events.Projections;
using Marten.Testing.Harness;
using Microsoft.Extensions.Hosting;
using Xunit;
using Xunit.Abstractions;

namespace DaemonTests;

public class pausing_and_resuming_the_daemon
{
[Fact]
public async Task stop_and_resume_from_the_host_extensions()
{
using var host = await Host.CreateDefaultBuilder()
.ConfigureServices(services =>
{
services.AddMarten(opts =>
{
opts.Connection(ConnectionSource.ConnectionString);
opts.DatabaseSchemaName = "coordinator";

opts.Projections.Add<TestingSupport.TripProjection>(ProjectionLifecycle.Async);
}).AddAsyncDaemon(DaemonMode.Solo);
}).StartAsync();

await host.PauseAllDaemonsAsync();

await host.ResumeAllDaemonsAsync();

await using var session = host.DocumentStore().LightweightSession();
var id = session.Events.StartStream<TestingSupport.TripProjection>(new TripStarted()).Id;

await session.SaveChangesAsync();

await host.WaitForNonStaleProjectionDataAsync(15.Seconds());

var trip = await session.LoadAsync<Trip>(id);
trip.ShouldNotBeNull();
}

}
27 changes: 27 additions & 0 deletions src/Marten/Events/AsyncProjectionTestingExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,38 @@
using JasperFx.Core.Reflection;
using Marten.Events.Daemon;
using Marten.Storage;
using Microsoft.Extensions.Hosting;

namespace Marten.Events;

public static class TestingExtensions
{
/// <summary>
/// Use with caution! This will try to wait for all projections to "catch up" to the currently
/// known farthest known sequence of the event store
/// </summary>
/// <param name="timeout"></param>
/// <returns></returns>
public static Task WaitForNonStaleProjectionDataAsync(this IHost host, TimeSpan timeout)
{
return host.DocumentStore().WaitForNonStaleProjectionDataAsync(timeout);
}

/// <summary>
/// Wait for any running async daemon for a specific tenant id or database name to catch up to the latest event
/// sequence at the time
/// this method is invoked for all projections. This method is meant to aid in automated testing
/// </summary>
/// <param name="tenantIdOrDatabaseName">Either a tenant id or the name of a database within the system</param>
public static async Task WaitForNonStaleProjectionDataAsync(this IHost host,
string tenantIdOrDatabaseName, TimeSpan timeout)
{
// Assuming there's only one database in this usage
var database = await host.DocumentStore().Storage.FindOrCreateDatabase(tenantIdOrDatabaseName).ConfigureAwait(false);

await database.WaitForNonStaleProjectionDataAsync(timeout).ConfigureAwait(false);
}

/// <summary>
/// Wait for any running async daemons to catch up to the latest event sequence at the time
/// this method is invoked for all projections. This method is meant to aid in automated testing
Expand Down
67 changes: 67 additions & 0 deletions src/Marten/HostExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Marten.Events;
using Marten.Events.Daemon.Coordination;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;

namespace Marten;

public static class HostExtensions
{
/// <summary>
/// Testing helper to pause all projection daemons in the system and completely
/// disable the daemon projection assignments
/// </summary>
/// <param name="host"></param>
/// <returns></returns>
public static Task PauseAllDaemonsAsync(this IHost host)
{
var coordinator = host.Services.GetRequiredService<IProjectionCoordinator>();
return coordinator.PauseAsync();
}

/// <summary>
/// Testing helper to resume all projection daemons in the system and restart
/// the daemon projection assignments
/// </summary>
/// <param name="host"></param>
/// <returns></returns>
public static Task ResumeAllDaemonsAsync(this IHost host)
{
var coordinator = host.Services.GetRequiredService<IProjectionCoordinator>();
return coordinator.ResumeAsync();
}

/// <summary>
/// Retrieve the Marten document store for this IHost
/// </summary>
/// <param name="host"></param>
/// <returns></returns>
public static IDocumentStore DocumentStore(this IHost host)
{
return host.Services.GetRequiredService<IDocumentStore>();
}

/// <summary>
/// Clean off all Marten data in the default DocumentStore for this host
/// </summary>
/// <param name="host"></param>
public static async Task CleanAllMartenDataAsync(this IHost host)
{
var store = host.DocumentStore();
await store.Advanced.Clean.DeleteAllDocumentsAsync(CancellationToken.None).ConfigureAwait(false);
await store.Advanced.Clean.DeleteAllEventDataAsync(CancellationToken.None).ConfigureAwait(false);
}

/// <summary>
/// Call DocumentStore.ResetAllData() on the document store in this host
/// </summary>
/// <param name="host"></param>
public static Task ResetAllMartenDataAsync(this IHost host)
{
var store = host.DocumentStore();
return store.Advanced.ResetAllData(CancellationToken.None);
}
}
8 changes: 6 additions & 2 deletions src/Marten/Storage/MartenDatabase.DocumentCleaner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Threading;
using System.Threading.Tasks;
using JasperFx.Core;
using Marten.Events;
using Marten.Exceptions;
using Marten.Internal;
using Marten.Schema;
Expand Down Expand Up @@ -202,13 +203,16 @@ public void DeleteAllEventData()

public async Task DeleteAllEventDataAsync(CancellationToken ct = default)
{
await EnsureStorageExistsAsync(typeof(IEvent), ct).ConfigureAwait(false);

await using var connection = CreateConnection();
await connection.OpenAsync(ct).ConfigureAwait(false);

var tx = await connection.BeginTransactionAsync(ct).ConfigureAwait(false);

var deleteEventDataSql = toDeleteEventDataSql();
await connection.CreateCommand(deleteEventDataSql, tx).ExecuteNonQueryAsync(ct).ConfigureAwait(false);

await tx.CommitAsync(ct).ConfigureAwait(false);
}

Expand Down Expand Up @@ -245,8 +249,8 @@ IF EXISTS(SELECT * FROM information_schema.tables
WHERE table_name = 'mt_streams' AND table_schema = '{Options.Events.DatabaseSchemaName}')
THEN TRUNCATE TABLE {Options.Events.DatabaseSchemaName}.mt_streams CASCADE; END IF;
IF EXISTS(SELECT * FROM information_schema.tables
WHERE table_name = 'mt_mark_event_progression' AND table_schema = '{Options.Events.DatabaseSchemaName}')
THEN delete from {Options.Events.DatabaseSchemaName}.mt_mark_event_progression; END IF;
WHERE table_name = 'mt_event_progression' AND table_schema = '{Options.Events.DatabaseSchemaName}')
THEN delete from {Options.Events.DatabaseSchemaName}.mt_event_progression; END IF;
END; $$;
";
}
Expand Down
Loading