From 9a80fe6c67504587cca4e1e70e0a53e9150552c3 Mon Sep 17 00:00:00 2001 From: Frank Wagner Date: Fri, 28 Jul 2023 14:18:51 +0200 Subject: [PATCH] Improvements for eryph (#5) * changes for eryph: passing more data * added serialization config --- .../Workflow/IOperationManager.cs | 10 +++++++--- .../DefaultOperationDispatcher.cs | 6 ++++-- .../Rebus.Operations.Core/OperationDispatcherBase.cs | 2 +- .../OperationTaskDispatcherBase.cs | 3 +-- .../Workflow/OperationManagerBase.cs | 8 +++++--- .../Workflow/OperationTaskProgressEventHandler.cs | 3 ++- .../Workflow/ProcessOperationSaga.cs | 6 ++++-- .../Rebus.Operations.Primitives/WorkflowOptions.cs | 8 ++++++++ test/Rebus.Operations.Tests/TestOperationManager.cs | 8 +++++--- 9 files changed, 37 insertions(+), 17 deletions(-) diff --git a/src/Rebus.Operations/Rebus.Operations.Abstractions/Workflow/IOperationManager.cs b/src/Rebus.Operations/Rebus.Operations.Abstractions/Workflow/IOperationManager.cs index fbac907..f915dba 100644 --- a/src/Rebus.Operations/Rebus.Operations.Abstractions/Workflow/IOperationManager.cs +++ b/src/Rebus.Operations/Rebus.Operations.Abstractions/Workflow/IOperationManager.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using System.Threading.Tasks; namespace Dbosoft.Rebus.Operations.Workflow; @@ -7,8 +8,11 @@ namespace Dbosoft.Rebus.Operations.Workflow; public interface IOperationManager { ValueTask GetByIdAsync(Guid operationId); - ValueTask GetOrCreateAsync(Guid operationId, object command); + ValueTask GetOrCreateAsync(Guid operationId, object command, + object? additionalData, IDictionary? additionalHeaders); - ValueTask TryChangeStatusAsync(IOperation operation, OperationStatus newStatus, object? additionalData); - ValueTask AddProgressAsync(Guid progressId, DateTimeOffset timestamp, IOperation operation, IOperationTask task, object? data); + ValueTask TryChangeStatusAsync(IOperation operation, OperationStatus newStatus, object? additionalData, + IDictionary? messageHeaders); + ValueTask AddProgressAsync(Guid progressId, DateTimeOffset timestamp, IOperation operation, IOperationTask task, + object? data, IDictionary? messageHeaders); } \ No newline at end of file diff --git a/src/Rebus.Operations/Rebus.Operations.Core/DefaultOperationDispatcher.cs b/src/Rebus.Operations/Rebus.Operations.Core/DefaultOperationDispatcher.cs index 787d932..9737d80 100644 --- a/src/Rebus.Operations/Rebus.Operations.Core/DefaultOperationDispatcher.cs +++ b/src/Rebus.Operations/Rebus.Operations.Core/DefaultOperationDispatcher.cs @@ -22,8 +22,10 @@ public DefaultOperationDispatcher( _operationManager = operationManager; } - protected override async ValueTask<(IOperation, object)> CreateOperation(object command, object? additionalData,IDictionary? additionalHeaders) + protected override async ValueTask<(IOperation, object)> CreateOperation(object command, object? additionalData, + IDictionary? additionalHeaders) { - return (await _operationManager.GetOrCreateAsync(Guid.NewGuid(), command), command); + return (await _operationManager.GetOrCreateAsync(Guid.NewGuid(), command, + additionalData,additionalHeaders), command); } } \ No newline at end of file diff --git a/src/Rebus.Operations/Rebus.Operations.Core/OperationDispatcherBase.cs b/src/Rebus.Operations/Rebus.Operations.Core/OperationDispatcherBase.cs index 3d090d1..0c48fa0 100644 --- a/src/Rebus.Operations/Rebus.Operations.Core/OperationDispatcherBase.cs +++ b/src/Rebus.Operations/Rebus.Operations.Core/OperationDispatcherBase.cs @@ -52,7 +52,7 @@ protected OperationDispatcherBase(IBus bus, WorkflowOptions options, ILogger GetByIdAsync(Guid operationId); - public abstract ValueTask GetOrCreateAsync(Guid operationId, object command); + public abstract ValueTask GetOrCreateAsync(Guid operationId, object command, + object? additionalData,IDictionary? additionalHeaders); public abstract ValueTask AddProgressAsync(Guid progressId, DateTimeOffset timestamp, IOperation operation, IOperationTask task, - object? data); + object? data, IDictionary? messageHeaders); public abstract ValueTask TryChangeStatusAsync(IOperation operation, OperationStatus newStatus, - object? additionalData); + object? additionalData, IDictionary? messageHeaders); diff --git a/src/Rebus.Operations/Rebus.Operations.Core/Workflow/OperationTaskProgressEventHandler.cs b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/OperationTaskProgressEventHandler.cs index ddc5fbf..2d4f074 100644 --- a/src/Rebus.Operations/Rebus.Operations.Core/Workflow/OperationTaskProgressEventHandler.cs +++ b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/OperationTaskProgressEventHandler.cs @@ -3,6 +3,7 @@ using JetBrains.Annotations; using Microsoft.Extensions.Logging; using Rebus.Handlers; +using Rebus.Pipeline; namespace Dbosoft.Rebus.Operations.Workflow { @@ -39,7 +40,7 @@ await _workflow.Operations.AddProgressAsync( message.Timestamp, operation, task, - message.Data); + message.Data, MessageContext.Current.Headers); } else { diff --git a/src/Rebus.Operations/Rebus.Operations.Core/Workflow/ProcessOperationSaga.cs b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/ProcessOperationSaga.cs index fcb5bb1..3444a48 100644 --- a/src/Rebus.Operations/Rebus.Operations.Core/Workflow/ProcessOperationSaga.cs +++ b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/ProcessOperationSaga.cs @@ -8,6 +8,7 @@ using JetBrains.Annotations; using Microsoft.Extensions.Logging; using Rebus.Handlers; +using Rebus.Pipeline; using Rebus.Sagas; namespace Dbosoft.Rebus.Operations.Workflow @@ -117,7 +118,8 @@ public async Task Handle(OperationTaskAcceptedEvent message) } var opOldStatus = op.Status; - if (await _workflow.Operations.TryChangeStatusAsync(op, OperationStatus.Running, null)) + if (await _workflow.Operations.TryChangeStatusAsync(op, OperationStatus.Running, null, + MessageContext.Current.Headers)) { _log.LogDebug("Operation Workflow {operationId}: Status changed: {oldStatus} -> {newStatus}", message.OperationId, opOldStatus, op.Status); @@ -195,7 +197,7 @@ public async Task Handle(OperationTaskStatusEvent message) if (await _workflow.Operations.TryChangeStatusAsync(op, - newStatus, message.GetMessage())) + newStatus, message.GetMessage(), MessageContext.Current.Headers)) { await _workflow.Messaging.DispatchOperationStatusEventAsync(new OperationStatusEvent { diff --git a/src/Rebus.Operations/Rebus.Operations.Primitives/WorkflowOptions.cs b/src/Rebus.Operations/Rebus.Operations.Primitives/WorkflowOptions.cs index 2083ee4..e3ee2e6 100644 --- a/src/Rebus.Operations/Rebus.Operations.Primitives/WorkflowOptions.cs +++ b/src/Rebus.Operations/Rebus.Operations.Primitives/WorkflowOptions.cs @@ -1,9 +1,17 @@ +using System.Text.Json; + namespace Dbosoft.Rebus.Operations; public class WorkflowOptions { + public WorkflowOptions() + { + JsonSerializerOptions = new JsonSerializerOptions(JsonSerializerDefaults.Web); + } + public WorkflowEventDispatchMode DispatchMode { get; set; } = WorkflowEventDispatchMode.Publish; public string? EventDestination { get; set; } public string? OperationsDestination { get; set; } + public JsonSerializerOptions JsonSerializerOptions { get; set; } } \ No newline at end of file diff --git a/test/Rebus.Operations.Tests/TestOperationManager.cs b/test/Rebus.Operations.Tests/TestOperationManager.cs index 4951536..7ef81da 100644 --- a/test/Rebus.Operations.Tests/TestOperationManager.cs +++ b/test/Rebus.Operations.Tests/TestOperationManager.cs @@ -21,7 +21,8 @@ public static void Reset() } - public override ValueTask GetOrCreateAsync(Guid operationId, object command) + public override ValueTask GetOrCreateAsync(Guid operationId, object command, + object? additionalData, IDictionary? additionalHeaders) { if (Operations.ContainsKey(operationId)) return GetByIdAsync(operationId)!; @@ -35,7 +36,8 @@ public override ValueTask GetOrCreateAsync(Guid operationId, object return new ValueTask(op); } - public override ValueTask TryChangeStatusAsync(IOperation operation, OperationStatus newStatus, object? additionalData) + public override ValueTask TryChangeStatusAsync(IOperation operation, OperationStatus newStatus, + object? additionalData, IDictionary? messageHeaders) { if (!Operations.ContainsKey(operation.Id)) return new ValueTask(false); @@ -46,7 +48,7 @@ public override ValueTask TryChangeStatusAsync(IOperation operation, Opera } public override ValueTask AddProgressAsync(Guid progressId, DateTimeOffset timestamp, IOperation operation, IOperationTask task, - object? data) + object? data, IDictionary? messageHeaders) { if(!Progress.ContainsKey(progressId)) Progress.Add(operation.Id, new List());