diff --git a/docs/events/projections/multi-stream-projections.md b/docs/events/projections/multi-stream-projections.md index 53c95b609b..80a404d0ea 100644 --- a/docs/events/projections/multi-stream-projections.md +++ b/docs/events/projections/multi-stream-projections.md @@ -418,6 +418,27 @@ public class UserGroupsAssignmentProjection: MultiStreamProjectionsnippet source | anchor +## Rollup by Tenant Id + +::: info +This feature was built specifically for a [JasperFx](https://jasperfx.net) client who indeed had this use case in their system +::: + +Let's say that your are using conjoined tenancy within your event storage, but want to create some kind of summarized roll up +document per tenant id in a projected document -- like maybe the number of open "accounts" or "issues" or "users." + +To do that, there's a recipe for the "event slicing" in multi-stream projections with Marten to just group by the event's +tenant id and make that the identity of the projected document. That usage is shown below: + +snippet: sample_rollup_projection_by_tenant_id + +Do note that you'll probably also need this flag in your configuration: + +```cs +// opts is a StoreOptions object +opts.Events.EnableGlobalProjectionsForConjoinedTenancy = true; +``` + ## Event "Fan Out" Rules The `ViewProjection` also provides the ability to "fan out" child events from a parent event into the segment of events being used to diff --git a/src/EventSourcingTests/Projections/MultiStreamProjections/rolling_up_by_tenant.cs b/src/EventSourcingTests/Projections/MultiStreamProjections/rolling_up_by_tenant.cs new file mode 100644 index 0000000000..2b33c51859 --- /dev/null +++ b/src/EventSourcingTests/Projections/MultiStreamProjections/rolling_up_by_tenant.cs @@ -0,0 +1,77 @@ +using System.Threading.Tasks; +using EventSourcingTests.Aggregation; +using JasperFx.Core; +using Marten; +using Marten.Events.Projections; +using Marten.Schema; +using Marten.Storage; +using Marten.Testing.Harness; +using Shouldly; +using Xunit; + +namespace EventSourcingTests.Projections.MultiStreamProjections; + +public class rolling_up_by_tenant : OneOffConfigurationsContext +{ + [Fact] + public async Task track_totals_by_tenant_id() + { + StoreOptions(opts => + { + opts.Events.TenancyStyle = TenancyStyle.Conjoined; + opts.Projections.Add(ProjectionLifecycle.Async); + opts.Events.EnableGlobalProjectionsForConjoinedTenancy = true; + }); + + var session1 = theStore.LightweightSession("one"); + session1.Events.StartStream(new AEvent(), new BEvent()); + session1.Events.StartStream(new BEvent(), new BEvent()); + session1.Events.StartStream(new BEvent(), new BEvent()); + await session1.SaveChangesAsync(); + + var session2 = theStore.LightweightSession("two"); + session2.Events.StartStream(new AEvent(), new AEvent()); + session2.Events.StartStream(new BEvent(), new AEvent()); + session2.Events.StartStream(new BEvent(), new BEvent()); + await session2.SaveChangesAsync(); + + var session3 = theStore.LightweightSession("three"); + session3.Events.StartStream(new AEvent(), new AEvent()); + session3.Events.StartStream(new AEvent(), new AEvent()); + session3.Events.StartStream(new BEvent(), new BEvent()); + await session3.SaveChangesAsync(); + + using var daemon = await theStore.BuildProjectionDaemonAsync(); + await daemon.StartAllAsync(); + + await daemon.WaitForNonStaleData(30.Seconds()); + + (await theSession.LoadAsync("one")).ACount.ShouldBe(1); + (await theSession.LoadAsync("two")).ACount.ShouldBe(3); + (await theSession.LoadAsync("three")).ACount.ShouldBe(4); + } +} + +#region sample_rollup_projection_by_tenant_id + +public class RollupProjection: MultiStreamProjection +{ + public RollupProjection() + { + // This opts into doing the event slicing by tenant id + RollUpByTenant(); + } + + public void Apply(Rollup state, AEvent e) => state.ACount++; + public void Apply(Rollup state, BEvent e) => state.BCount++; +} + +public class Rollup +{ + [Identity] + public string TenantId { get; set; } + public int ACount { get; set; } + public int BCount { get; set; } +} + +#endregion diff --git a/src/Marten/Events/Projections/MultiStreamProjection.cs b/src/Marten/Events/Projections/MultiStreamProjection.cs index 00432394a5..652f156e56 100644 --- a/src/Marten/Events/Projections/MultiStreamProjection.cs +++ b/src/Marten/Events/Projections/MultiStreamProjection.cs @@ -3,6 +3,7 @@ using System.Collections.Generic; using System.ComponentModel; using System.Linq; +using System.Threading.Tasks; using JasperFx.CodeGeneration; using JasperFx.Core.Reflection; using Marten.Events.Aggregation; @@ -12,6 +13,28 @@ namespace Marten.Events.Projections; +public class TenantRollupSlicer: IEventSlicer +{ + public ValueTask>> SliceInlineActions(IQuerySession querySession, IEnumerable streams) + { + throw new NotSupportedException("This is not supported in Inline projections"); + } + + public ValueTask>> SliceAsyncEvents(IQuerySession querySession, List events) + { + var sliceGroup = new TenantSliceGroup(new Tenant(Tenancy.DefaultTenantId, querySession.Database)); + var groups = events.GroupBy(x => x.TenantId); + foreach (var @group in groups) + { + sliceGroup.AddEvents(@group.Key, @group); + } + + var list = new List>{sliceGroup}; + + return ValueTask.FromResult>>(list); + } +} + /// /// Project a single document view across events that may span across /// event streams in a user-defined grouping @@ -28,6 +51,18 @@ protected MultiStreamProjection(): base(AggregationScope.MultiStream) { } + /// + /// Group events by the tenant id. Use this option if you need to do roll up summaries by + /// tenant id within a conjoined multi-tenanted event store. + /// + public void RollUpByTenant() + { + if (typeof(TId) != typeof(string)) + throw new InvalidOperationException("Rolling up by Tenant Id requires the identity type to be string"); + + _customSlicer = (IEventSlicer)new TenantRollupSlicer(); + } + internal IEventSlicer Slicer => _customSlicer ?? _defaultSlicer; protected override Type[] determineEventTypes()