Skip to content

Commit

Permalink
Carrying through the tenant id to tombstone publishing
Browse files Browse the repository at this point in the history
  • Loading branch information
jeremydmiller committed Jun 24, 2024
1 parent db185ba commit 1325c2c
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 30 deletions.
9 changes: 6 additions & 3 deletions src/Marten/Events/EventGraph.Processing.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public partial class EventGraph

private async Task executeTombstoneBlock(UpdateBatch batch, CancellationToken cancellationToken)
{
await using var session = (DocumentSessionBase)_store.LightweightSession();
await using var session = (DocumentSessionBase)(batch.TenantId.IsEmpty() ? _store.LightweightSession() : _store.LightweightSession(batch.TenantId!));
await session.ExecuteBatchAsync(batch, cancellationToken).ConfigureAwait(false);
}

Expand Down Expand Up @@ -174,7 +174,10 @@ internal bool TryCreateTombstoneBatch(DocumentSessionBase session, out UpdateBat

operations.AddRange(tombstones);

batch = new UpdateBatch(operations);
batch = new UpdateBatch(operations)
{
TenantId = session.TenantId
};

return true;
}
Expand All @@ -187,7 +190,7 @@ internal void PostTombstones(UpdateBatch tombstoneBatch)
{
try
{
using var session = (DocumentSessionBase)_store.LightweightSession();
using var session = (DocumentSessionBase)_store.LightweightSession(tombstoneBatch.TenantId);
session.ExecuteBatch(tombstoneBatch);
}
catch (Exception)
Expand Down
53 changes: 26 additions & 27 deletions src/Marten/Internal/UpdateBatch.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using JasperFx.Core.Exceptions;
using JasperFx.Core.Reflection;
using Marten.Exceptions;
using Marten.Internal.Operations;
using Marten.Internal.Sessions;

Expand All @@ -21,9 +17,34 @@ public UpdateBatch(IReadOnlyList<IStorageOperation> operations)
_operations = operations;
}

public string? TenantId { get; set; }

public IReadOnlyList<Type> DocumentTypes()
{
return _operations.Select(x => x.DocumentType).Where(x => x != null).Distinct().ToList();
}

public Task PostUpdateAsync(IMartenSession session)
{
return Task.CompletedTask;
}

public Task PreUpdateAsync(IMartenSession session)
{
return Task.CompletedTask;
}

public IReadOnlyList<OperationPage> BuildPages(IMartenSession session)
{
return buildPages(session).ToList();
}

private IEnumerable<OperationPage> buildPages(IMartenSession session)
{
if (!_operations.Any()) yield break;
if (!_operations.Any())
{
yield break;
}

if (_operations.Count < session.Options.UpdateBatchSize)
{
Expand All @@ -47,26 +68,4 @@ private IEnumerable<OperationPage> buildPages(IMartenSession session)
}
}
}

public IReadOnlyList<Type> DocumentTypes()
{
return _operations.Select(x => x.DocumentType).Where(x => x != null).Distinct().ToList();
}

public Task PostUpdateAsync(IMartenSession session)
{
return Task.CompletedTask;
}

public Task PreUpdateAsync(IMartenSession session)
{
return Task.CompletedTask;
}

public IReadOnlyList<OperationPage> BuildPages(IMartenSession session)
{
return buildPages(session).ToList();
}


}

0 comments on commit 1325c2c

Please sign in to comment.