diff --git a/src/ArtemisNetCoreClient/Framing/Outgoing/RollbackMessage.cs b/src/ArtemisNetCoreClient/Framing/Outgoing/RollbackMessage.cs new file mode 100644 index 0000000..6e73d75 --- /dev/null +++ b/src/ArtemisNetCoreClient/Framing/Outgoing/RollbackMessage.cs @@ -0,0 +1,20 @@ +using ActiveMQ.Artemis.Core.Client.InternalUtilities; + +namespace ActiveMQ.Artemis.Core.Client.Framing.Outgoing; + +internal readonly struct RollbackMessage : IOutgoingPacket +{ + public PacketType PacketType => PacketType.SessionRollbackMessage; + + public required bool ConsiderLastMessageAsDelivered { get; init; } + + public int GetRequiredBufferSize() + { + return sizeof(bool); + } + + public int Encode(Span buffer) + { + return ArtemisBinaryConverter.WriteBool(ref buffer.GetReference(), ConsiderLastMessageAsDelivered); + } +} \ No newline at end of file diff --git a/src/ArtemisNetCoreClient/Framing/PacketType.cs b/src/ArtemisNetCoreClient/Framing/PacketType.cs index b55e503..e4dcb5b 100644 --- a/src/ArtemisNetCoreClient/Framing/PacketType.cs +++ b/src/ArtemisNetCoreClient/Framing/PacketType.cs @@ -9,6 +9,7 @@ internal enum PacketType : sbyte SessionCreateConsumerMessage = 40, SessionAcknowledgeMessage = 41, SessionCommitMessage = 43, + SessionRollbackMessage = 44, SessionQueueQueryMessage = 45, SessionBindingQueryMessage = 49, SessionStart = 67, @@ -26,5 +27,4 @@ internal enum PacketType : sbyte CreateProducerMessage = -20, RemoveProducerMessage = -21, SessionBindingQueryResponseMessage = -22, - } \ No newline at end of file diff --git a/src/ArtemisNetCoreClient/ISession.cs b/src/ArtemisNetCoreClient/ISession.cs index 81fde7f..c01f620 100644 --- a/src/ArtemisNetCoreClient/ISession.cs +++ b/src/ArtemisNetCoreClient/ISession.cs @@ -49,4 +49,9 @@ public interface ISession : IAsyncDisposable /// Commit the current transaction (any pending sends and acks). /// Task CommitAsync(CancellationToken cancellationToken); + + /// + /// Rollback the current transaction (any pending sends and acks). + /// + Task RollbackAsync(CancellationToken cancellationToken); } \ No newline at end of file diff --git a/src/ArtemisNetCoreClient/Session.cs b/src/ArtemisNetCoreClient/Session.cs index 4e252dc..b5e7e65 100644 --- a/src/ArtemisNetCoreClient/Session.cs +++ b/src/ArtemisNetCoreClient/Session.cs @@ -337,6 +337,32 @@ public async Task CommitAsync(CancellationToken cancellationToken) } } + public async Task RollbackAsync(CancellationToken cancellationToken) + { + var request = new RollbackMessage + { + ConsiderLastMessageAsDelivered = false + }; + + await _lock.WaitAsync(cancellationToken); + try + { + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + _ = _completionSources.TryAdd(-1, tcs); + connection.Send(request, ChannelId); + await tcs.Task; + } + catch (Exception) + { + _completionSources.TryRemove(-1, out _); + throw; + } + finally + { + _lock.Release(); + } + } + internal ValueTask RemoveProducerAsync(int producerId) { var request = new RemoveProducerMessage diff --git a/test/ArtemisNetCoreClient.Tests/SessionSpec.cs b/test/ArtemisNetCoreClient.Tests/SessionSpec.cs index 5707004..88dad44 100644 --- a/test/ArtemisNetCoreClient.Tests/SessionSpec.cs +++ b/test/ArtemisNetCoreClient.Tests/SessionSpec.cs @@ -1,5 +1,6 @@ using ActiveMQ.Artemis.Core.Client.Exceptions; using ActiveMQ.Artemis.Core.Client.Tests.Utils; +using NScenario; using Xunit; using Xunit.Abstractions; @@ -183,7 +184,6 @@ public async Task Should_create_multiple_producers_using_the_same_session() Address = addressName }, testFixture.CancellationToken); } - [Fact] public async Task Should_delete_queue() @@ -225,4 +225,55 @@ public async Task Should_not_delete_queue_when_it_does_not_exist() var exception = await Assert.ThrowsAsync(() => session.DeleteQueueAsync(queueName, testFixture.CancellationToken)); Assert.Equal(ActiveMQExceptionType.QueueDoesNotExist, exception.Type); } + + [Fact] + public async Task Should_rollback_pending_sends() + { + await using var testFixture = await TestFixture.CreateAsync(testOutputHelper); + var scenario = TestScenarioFactory.Default(new XUnitOutputAdapter(testOutputHelper)); + + await using var connection = await testFixture.CreateConnectionAsync(); + await using var session = await connection.CreateSessionAsync(new SessionConfiguration + { + AutoCommitSends = false + }, testFixture.CancellationToken); + + var (addressName, queueName) = await scenario.Step("Create address and queue", async () => + { + var addressName = await testFixture.CreateAddressAsync(RoutingType.Anycast); + var queueName = await testFixture.CreateQueueAsync(addressName, RoutingType.Anycast); + return (addressName, queueName); + }); + + await using var producer = await scenario.Step("Create producer", async () => + { + return await session.CreateProducerAsync(new ProducerConfiguration + { + Address = addressName, + RoutingType = RoutingType.Anycast + }, testFixture.CancellationToken); + }); + + await scenario.Step("Send message", async () => + { + await producer.SendMessageAsync(new Message + { + Body = "test_payload"u8.ToArray() + }, testFixture.CancellationToken); + }); + + await scenario.Step("Rollback transaction", async () => + { + await session.RollbackAsync(testFixture.CancellationToken); + }); + + await scenario.Step("Confirm that the queue is empty", async () => + { + var queueInfo = await session.GetQueueInfoAsync(queueName, testFixture.CancellationToken); + Assert.NotNull(queueInfo); + Assert.Equal(0, queueInfo.MessageCount); + }); + } + + // TODO: Add test for Rollback with pending acks } \ No newline at end of file