From ace44fdfdad6811e1f1344a89ba7e41758a41298 Mon Sep 17 00:00:00 2001 From: Ben Wilde Date: Tue, 28 Jan 2025 02:25:55 -0600 Subject: [PATCH] Fix for case where a TopicActor can continue sending messages to a member that no longer exists. (#2147) --- src/Proto.Cluster/PubSub/BatchingProducer.cs | 12 ++- src/Proto.Cluster/PubSub/TopicActor.cs | 19 +++- .../PubSubClientTests.cs | 86 +++++++++++++++++++ 3 files changed, 114 insertions(+), 3 deletions(-) create mode 100644 tests/Proto.Cluster.PubSub.Tests/PubSubClientTests.cs diff --git a/src/Proto.Cluster/PubSub/BatchingProducer.cs b/src/Proto.Cluster/PubSub/BatchingProducer.cs index 8ec56c7027..591dcd2163 100644 --- a/src/Proto.Cluster/PubSub/BatchingProducer.cs +++ b/src/Proto.Cluster/PubSub/BatchingProducer.cs @@ -231,8 +231,10 @@ private async Task PublishBatch(PubSubBatchWithReceipts batchWrapper) { retries++; - var response = await _publisher.PublishBatch(_topic, batchWrapper.Batch, - CancellationTokens.FromSeconds(_config.PublishTimeoutInSeconds)).ConfigureAwait(false); + // timeout immediately if the producer is stopped + using var tokenSource = CancellationTokenSource.CreateLinkedTokenSource( + _cts.Token, CancellationTokens.FromSeconds(_config.PublishTimeoutInSeconds)); + var response = await _publisher.PublishBatch(_topic, batchWrapper.Batch, tokenSource.Token).ConfigureAwait(false); if (response == null) { @@ -244,6 +246,12 @@ private async Task PublishBatch(PubSubBatchWithReceipts batchWrapper) } catch (Exception e) { + if (_cts.Token.IsCancellationRequested) + { + // we are stopping + break; + } + var decision = await _config.OnPublishingError(retries, e, batchWrapper.Batch).ConfigureAwait(false); if (decision == PublishingErrorDecision.FailBatchAndStop) diff --git a/src/Proto.Cluster/PubSub/TopicActor.cs b/src/Proto.Cluster/PubSub/TopicActor.cs index 72da69bba4..c840dffea3 100644 --- a/src/Proto.Cluster/PubSub/TopicActor.cs +++ b/src/Proto.Cluster/PubSub/TopicActor.cs @@ -46,6 +46,7 @@ public Task ReceiveAsync(IContext context) => PubSubBatch batch => OnPubSubBatch(context, batch), NotifyAboutFailingSubscribersRequest msg => OnNotifyAboutFailingSubscribers(context, msg), ClusterTopology msg => OnClusterTopologyChanged(context, msg), + DeadLetterResponse msg => OnDeadLetterResponse(msg), _ => Task.CompletedTask }; @@ -113,7 +114,12 @@ from member in members foreach (var md in memberDeliveries) { - context.Send(md.Pid, md.Message); + // use request instead of send. The delivery actor doesn't respond, but we'll get a DeadLetterResponse if we can't reach it. + // it's possible for a proto actor client to subscribe, and then shutdown without unsubscribing, and without us knowing that it left, + // since as a client it was never in the topology. This will allow us to stop sending to a subscriber that no longer exists. + // in theory, it's possible to unsubscribe a subscriber that's still alive, if they are unreachable for a short time, + // but regardless, application level logic is always required to ensure the subscription remains alive anyway. + context.Request(md.Pid, md.Message); } context.Respond(new PublishResponse()); @@ -298,4 +304,15 @@ private async Task OnSubscribe(IContext context, SubscribeRequest sub) await SaveSubscriptions(_topic, new Subscribers { Subscribers_ = { _subscribers } }).ConfigureAwait(false); context.Respond(new SubscribeResponse()); } + + private async Task OnDeadLetterResponse(DeadLetterResponse msg) + { + var deadLetterSub = msg.Target == null ? null : _subscribers.FirstOrDefault(s => s.Pid.Address == msg.Target.Address); + if (deadLetterSub != null) + { + _subscribers = _subscribers.Remove(deadLetterSub); + Logger.LogDebug("Topic {Topic} - {Subscriber} unsubscribed due to dead letter", _topic, deadLetterSub); + await SaveSubscriptions(_topic, new Subscribers { Subscribers_ = { _subscribers } }).ConfigureAwait(false); + } + } } diff --git a/tests/Proto.Cluster.PubSub.Tests/PubSubClientTests.cs b/tests/Proto.Cluster.PubSub.Tests/PubSubClientTests.cs new file mode 100644 index 0000000000..e497181fc4 --- /dev/null +++ b/tests/Proto.Cluster.PubSub.Tests/PubSubClientTests.cs @@ -0,0 +1,86 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2015-2024 Asynkron AB All rights reserved +// +// ----------------------------------------------------------------------- + +using FluentAssertions; +using Xunit; +using static Proto.Cluster.PubSub.Tests.WaitHelper; + +namespace Proto.Cluster.PubSub.Tests; + +[Collection("PubSub")] // The CI is just to slow to run cluster fixture based tests in parallel +public class PubSubClientTests : IAsyncLifetime +{ + private readonly PubSubClusterFixture _fixture; + + public PubSubClientTests() + { + _fixture = new PubSubClusterFixture(); + } + + public async Task InitializeAsync() + { + await _fixture.SpawnMember(); + await _fixture.SpawnClient(); + } + + public Task DisposeAsync() => _fixture.DisposeAsync(); + + [Fact] + public async Task When_client_leaves_PID_subscribers_get_removed_due_to_dead_letter() + { + const string topic = "leaving-client"; + + // pid subscriber + var props = Props.FromFunc(ctx => + { + if (ctx.Message is DataPublished msg) + { + _fixture.Deliveries.Add(new Delivery(ctx.Self.ToDiagnosticString(), msg.Data)); + } + + return Task.CompletedTask; + } + ); + + var client = _fixture.Clients.First(); + var clientPid = client.System.Root.Spawn(props); + await client.Subscribe(topic, clientPid); + + var subscribers = await _fixture.GetSubscribersForTopic(topic); + subscribers.Subscribers_.Count.Should().Be(1); + subscribers.Subscribers_.Should().Contain(s => s.Pid.Equals(clientPid)); + + // message should send + await _fixture.PublishData(topic, 1); + + await WaitUntil(() => _fixture.Deliveries.Count == 1); + _fixture.Deliveries.Count.Should().Be(1); + _fixture.Deliveries.Should().ContainSingle(d => d.Data == 1); + + // shutdown client + await _fixture.RemoveNode(client, graceful: true); + + // subscription should remain, because it never unsubscribed, and there was no topology change + subscribers = await _fixture.GetSubscribersForTopic(topic); + subscribers.Subscribers_.Count.Should().Be(1); + subscribers.Subscribers_.Should().Contain(s => s.Pid.Equals(clientPid)); + + // messages should not send or attempt to send + await _fixture.PublishData(topic, 2); + await Task.Delay(3000); + await _fixture.PublishData(topic, 2); + + // dead letter should be received, so the subscription is removed + await WaitUntil(async () => + { + subscribers = await _fixture.GetSubscribersForTopic(topic); + return subscribers.Subscribers_.Count == 0; + }); + + _fixture.Deliveries.Count.Should().Be(1); + _fixture.Deliveries.Should().ContainSingle(d => d.Data == 1); + } +} \ No newline at end of file