diff --git a/src/Marten/Events/EventGraph.Processing.cs b/src/Marten/Events/EventGraph.Processing.cs index 7983649418..c340e295f4 100644 --- a/src/Marten/Events/EventGraph.Processing.cs +++ b/src/Marten/Events/EventGraph.Processing.cs @@ -20,7 +20,7 @@ public partial class EventGraph private async Task executeTombstoneBlock(UpdateBatch batch, CancellationToken cancellationToken) { - await using var session = (DocumentSessionBase)_store.LightweightSession(); + await using var session = (DocumentSessionBase)(batch.TenantId.IsEmpty() ? _store.LightweightSession() : _store.LightweightSession(batch.TenantId!)); await session.ExecuteBatchAsync(batch, cancellationToken).ConfigureAwait(false); } @@ -174,7 +174,10 @@ internal bool TryCreateTombstoneBatch(DocumentSessionBase session, out UpdateBat operations.AddRange(tombstones); - batch = new UpdateBatch(operations); + batch = new UpdateBatch(operations) + { + TenantId = session.TenantId + }; return true; } @@ -187,7 +190,7 @@ internal void PostTombstones(UpdateBatch tombstoneBatch) { try { - using var session = (DocumentSessionBase)_store.LightweightSession(); + using var session = (DocumentSessionBase)_store.LightweightSession(tombstoneBatch.TenantId); session.ExecuteBatch(tombstoneBatch); } catch (Exception) diff --git a/src/Marten/Internal/UpdateBatch.cs b/src/Marten/Internal/UpdateBatch.cs index e2f54b6724..f92ec2ee24 100644 --- a/src/Marten/Internal/UpdateBatch.cs +++ b/src/Marten/Internal/UpdateBatch.cs @@ -1,11 +1,7 @@ using System; using System.Collections.Generic; using System.Linq; -using System.Threading; using System.Threading.Tasks; -using JasperFx.Core.Exceptions; -using JasperFx.Core.Reflection; -using Marten.Exceptions; using Marten.Internal.Operations; using Marten.Internal.Sessions; @@ -21,9 +17,34 @@ public UpdateBatch(IReadOnlyList operations) _operations = operations; } + public string? TenantId { get; set; } + + public IReadOnlyList DocumentTypes() + { + return _operations.Select(x => x.DocumentType).Where(x => x != null).Distinct().ToList(); + } + + public Task PostUpdateAsync(IMartenSession session) + { + return Task.CompletedTask; + } + + public Task PreUpdateAsync(IMartenSession session) + { + return Task.CompletedTask; + } + + public IReadOnlyList BuildPages(IMartenSession session) + { + return buildPages(session).ToList(); + } + private IEnumerable buildPages(IMartenSession session) { - if (!_operations.Any()) yield break; + if (!_operations.Any()) + { + yield break; + } if (_operations.Count < session.Options.UpdateBatchSize) { @@ -47,26 +68,4 @@ private IEnumerable buildPages(IMartenSession session) } } } - - public IReadOnlyList DocumentTypes() - { - return _operations.Select(x => x.DocumentType).Where(x => x != null).Distinct().ToList(); - } - - public Task PostUpdateAsync(IMartenSession session) - { - return Task.CompletedTask; - } - - public Task PreUpdateAsync(IMartenSession session) - { - return Task.CompletedTask; - } - - public IReadOnlyList BuildPages(IMartenSession session) - { - return buildPages(session).ToList(); - } - - }