Skip to content

Commit

Permalink
New RollUpByTenant() feature for multi-stream projections. Closes GH-…
Browse files Browse the repository at this point in the history
  • Loading branch information
jeremydmiller committed May 21, 2024
1 parent 4637b95 commit 60b4a52
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 0 deletions.
21 changes: 21 additions & 0 deletions docs/events/projections/multi-stream-projections.md
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,27 @@ public class UserGroupsAssignmentProjection: MultiStreamProjection<UserGroupsAss
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/Projections/MultiStreamProjections/CustomGroupers/custom_slicer.cs#L16-L59' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_view-projection-custom-slicer' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

## Rollup by Tenant Id <Badge type="tip" text="7.15.0" />

::: info
This feature was built specifically for a [JasperFx](https://jasperfx.net) client who indeed had this use case in their system
:::

Let's say that your are using conjoined tenancy within your event storage, but want to create some kind of summarized roll up
document per tenant id in a projected document -- like maybe the number of open "accounts" or "issues" or "users."

To do that, there's a recipe for the "event slicing" in multi-stream projections with Marten to just group by the event's
tenant id and make that the identity of the projected document. That usage is shown below:

snippet: sample_rollup_projection_by_tenant_id

Do note that you'll probably also need this flag in your configuration:

```cs
// opts is a StoreOptions object
opts.Events.EnableGlobalProjectionsForConjoinedTenancy = true;
```

## Event "Fan Out" Rules

The `ViewProjection` also provides the ability to "fan out" child events from a parent event into the segment of events being used to
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
using System.Threading.Tasks;
using EventSourcingTests.Aggregation;
using JasperFx.Core;
using Marten;
using Marten.Events.Projections;
using Marten.Schema;
using Marten.Storage;
using Marten.Testing.Harness;
using Shouldly;
using Xunit;

namespace EventSourcingTests.Projections.MultiStreamProjections;

public class rolling_up_by_tenant : OneOffConfigurationsContext
{
[Fact]
public async Task track_totals_by_tenant_id()
{
StoreOptions(opts =>
{
opts.Events.TenancyStyle = TenancyStyle.Conjoined;
opts.Projections.Add<RollupProjection>(ProjectionLifecycle.Async);
opts.Events.EnableGlobalProjectionsForConjoinedTenancy = true;
});

var session1 = theStore.LightweightSession("one");
session1.Events.StartStream(new AEvent(), new BEvent());
session1.Events.StartStream(new BEvent(), new BEvent());
session1.Events.StartStream(new BEvent(), new BEvent());
await session1.SaveChangesAsync();

var session2 = theStore.LightweightSession("two");
session2.Events.StartStream(new AEvent(), new AEvent());
session2.Events.StartStream(new BEvent(), new AEvent());
session2.Events.StartStream(new BEvent(), new BEvent());
await session2.SaveChangesAsync();

var session3 = theStore.LightweightSession("three");
session3.Events.StartStream(new AEvent(), new AEvent());
session3.Events.StartStream(new AEvent(), new AEvent());
session3.Events.StartStream(new BEvent(), new BEvent());
await session3.SaveChangesAsync();

using var daemon = await theStore.BuildProjectionDaemonAsync();
await daemon.StartAllAsync();

await daemon.WaitForNonStaleData(30.Seconds());

(await theSession.LoadAsync<Rollup>("one")).ACount.ShouldBe(1);
(await theSession.LoadAsync<Rollup>("two")).ACount.ShouldBe(3);
(await theSession.LoadAsync<Rollup>("three")).ACount.ShouldBe(4);
}
}

#region sample_rollup_projection_by_tenant_id

public class RollupProjection: MultiStreamProjection<Rollup, string>
{
public RollupProjection()
{
// This opts into doing the event slicing by tenant id
RollUpByTenant();
}

public void Apply(Rollup state, AEvent e) => state.ACount++;
public void Apply(Rollup state, BEvent e) => state.BCount++;
}

public class Rollup
{
[Identity]
public string TenantId { get; set; }
public int ACount { get; set; }
public int BCount { get; set; }
}

#endregion
35 changes: 35 additions & 0 deletions src/Marten/Events/Projections/MultiStreamProjection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Collections.Generic;
using System.ComponentModel;
using System.Linq;
using System.Threading.Tasks;
using JasperFx.CodeGeneration;
using JasperFx.Core.Reflection;
using Marten.Events.Aggregation;
Expand All @@ -12,6 +13,28 @@

namespace Marten.Events.Projections;

public class TenantRollupSlicer<TDoc>: IEventSlicer<TDoc, string>
{
public ValueTask<IReadOnlyList<EventSlice<TDoc, string>>> SliceInlineActions(IQuerySession querySession, IEnumerable<StreamAction> streams)
{
throw new NotSupportedException("This is not supported in Inline projections");
}

public ValueTask<IReadOnlyList<TenantSliceGroup<TDoc, string>>> SliceAsyncEvents(IQuerySession querySession, List<IEvent> events)
{
var sliceGroup = new TenantSliceGroup<TDoc, string>(new Tenant(Tenancy.DefaultTenantId, querySession.Database));
var groups = events.GroupBy(x => x.TenantId);
foreach (var @group in groups)
{
sliceGroup.AddEvents(@group.Key, @group);
}

var list = new List<TenantSliceGroup<TDoc, string>>{sliceGroup};

return ValueTask.FromResult<IReadOnlyList<TenantSliceGroup<TDoc, string>>>(list);
}
}

/// <summary>
/// Project a single document view across events that may span across
/// event streams in a user-defined grouping
Expand All @@ -28,6 +51,18 @@ protected MultiStreamProjection(): base(AggregationScope.MultiStream)
{
}

/// <summary>
/// Group events by the tenant id. Use this option if you need to do roll up summaries by
/// tenant id within a conjoined multi-tenanted event store.
/// </summary>
public void RollUpByTenant()
{
if (typeof(TId) != typeof(string))
throw new InvalidOperationException("Rolling up by Tenant Id requires the identity type to be string");

_customSlicer = (IEventSlicer<TDoc, TId>)new TenantRollupSlicer<TDoc>();
}

internal IEventSlicer<TDoc, TId> Slicer => _customSlicer ?? _defaultSlicer;

protected override Type[] determineEventTypes()
Expand Down

0 comments on commit 60b4a52

Please sign in to comment.