Skip to content

Commit

Permalink
Fix for case where a TopicActor can continue sending messages to a me…
Browse files Browse the repository at this point in the history
…mber that no longer exists. (#2147)
  • Loading branch information
benbenwilde authored Jan 28, 2025
1 parent 54eef66 commit ace44fd
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 3 deletions.
12 changes: 10 additions & 2 deletions src/Proto.Cluster/PubSub/BatchingProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,10 @@ private async Task PublishBatch(PubSubBatchWithReceipts batchWrapper)
{
retries++;

var response = await _publisher.PublishBatch(_topic, batchWrapper.Batch,
CancellationTokens.FromSeconds(_config.PublishTimeoutInSeconds)).ConfigureAwait(false);
// timeout immediately if the producer is stopped
using var tokenSource = CancellationTokenSource.CreateLinkedTokenSource(
_cts.Token, CancellationTokens.FromSeconds(_config.PublishTimeoutInSeconds));
var response = await _publisher.PublishBatch(_topic, batchWrapper.Batch, tokenSource.Token).ConfigureAwait(false);

if (response == null)
{
Expand All @@ -244,6 +246,12 @@ private async Task PublishBatch(PubSubBatchWithReceipts batchWrapper)
}
catch (Exception e)
{
if (_cts.Token.IsCancellationRequested)
{
// we are stopping
break;
}

var decision = await _config.OnPublishingError(retries, e, batchWrapper.Batch).ConfigureAwait(false);

if (decision == PublishingErrorDecision.FailBatchAndStop)
Expand Down
19 changes: 18 additions & 1 deletion src/Proto.Cluster/PubSub/TopicActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public Task ReceiveAsync(IContext context) =>
PubSubBatch batch => OnPubSubBatch(context, batch),
NotifyAboutFailingSubscribersRequest msg => OnNotifyAboutFailingSubscribers(context, msg),
ClusterTopology msg => OnClusterTopologyChanged(context, msg),
DeadLetterResponse msg => OnDeadLetterResponse(msg),
_ => Task.CompletedTask
};

Expand Down Expand Up @@ -113,7 +114,12 @@ from member in members

foreach (var md in memberDeliveries)
{
context.Send(md.Pid, md.Message);
// use request instead of send. The delivery actor doesn't respond, but we'll get a DeadLetterResponse if we can't reach it.
// it's possible for a proto actor client to subscribe, and then shutdown without unsubscribing, and without us knowing that it left,
// since as a client it was never in the topology. This will allow us to stop sending to a subscriber that no longer exists.
// in theory, it's possible to unsubscribe a subscriber that's still alive, if they are unreachable for a short time,
// but regardless, application level logic is always required to ensure the subscription remains alive anyway.
context.Request(md.Pid, md.Message);
}

context.Respond(new PublishResponse());
Expand Down Expand Up @@ -298,4 +304,15 @@ private async Task OnSubscribe(IContext context, SubscribeRequest sub)
await SaveSubscriptions(_topic, new Subscribers { Subscribers_ = { _subscribers } }).ConfigureAwait(false);
context.Respond(new SubscribeResponse());
}

private async Task OnDeadLetterResponse(DeadLetterResponse msg)
{
var deadLetterSub = msg.Target == null ? null : _subscribers.FirstOrDefault(s => s.Pid.Address == msg.Target.Address);
if (deadLetterSub != null)
{
_subscribers = _subscribers.Remove(deadLetterSub);
Logger.LogDebug("Topic {Topic} - {Subscriber} unsubscribed due to dead letter", _topic, deadLetterSub);
await SaveSubscriptions(_topic, new Subscribers { Subscribers_ = { _subscribers } }).ConfigureAwait(false);
}
}
}
86 changes: 86 additions & 0 deletions tests/Proto.Cluster.PubSub.Tests/PubSubClientTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// -----------------------------------------------------------------------
// <copyright file = "PubSubClientTests.cs" company = "Asynkron AB">
// Copyright (C) 2015-2024 Asynkron AB All rights reserved
// </copyright>
// -----------------------------------------------------------------------

using FluentAssertions;
using Xunit;
using static Proto.Cluster.PubSub.Tests.WaitHelper;

namespace Proto.Cluster.PubSub.Tests;

[Collection("PubSub")] // The CI is just to slow to run cluster fixture based tests in parallel
public class PubSubClientTests : IAsyncLifetime
{
private readonly PubSubClusterFixture _fixture;

public PubSubClientTests()
{
_fixture = new PubSubClusterFixture();
}

public async Task InitializeAsync()
{
await _fixture.SpawnMember();
await _fixture.SpawnClient();
}

public Task DisposeAsync() => _fixture.DisposeAsync();

[Fact]
public async Task When_client_leaves_PID_subscribers_get_removed_due_to_dead_letter()
{
const string topic = "leaving-client";

// pid subscriber
var props = Props.FromFunc(ctx =>
{
if (ctx.Message is DataPublished msg)
{
_fixture.Deliveries.Add(new Delivery(ctx.Self.ToDiagnosticString(), msg.Data));
}

return Task.CompletedTask;
}
);

var client = _fixture.Clients.First();
var clientPid = client.System.Root.Spawn(props);
await client.Subscribe(topic, clientPid);

var subscribers = await _fixture.GetSubscribersForTopic(topic);
subscribers.Subscribers_.Count.Should().Be(1);
subscribers.Subscribers_.Should().Contain(s => s.Pid.Equals(clientPid));

// message should send
await _fixture.PublishData(topic, 1);

await WaitUntil(() => _fixture.Deliveries.Count == 1);
_fixture.Deliveries.Count.Should().Be(1);
_fixture.Deliveries.Should().ContainSingle(d => d.Data == 1);

// shutdown client
await _fixture.RemoveNode(client, graceful: true);

// subscription should remain, because it never unsubscribed, and there was no topology change
subscribers = await _fixture.GetSubscribersForTopic(topic);
subscribers.Subscribers_.Count.Should().Be(1);
subscribers.Subscribers_.Should().Contain(s => s.Pid.Equals(clientPid));

// messages should not send or attempt to send
await _fixture.PublishData(topic, 2);
await Task.Delay(3000);
await _fixture.PublishData(topic, 2);

// dead letter should be received, so the subscription is removed
await WaitUntil(async () =>
{
subscribers = await _fixture.GetSubscribersForTopic(topic);
return subscribers.Subscribers_.Count == 0;
});

_fixture.Deliveries.Count.Should().Be(1);
_fixture.Deliveries.Should().ContainSingle(d => d.Data == 1);
}
}

0 comments on commit ace44fd

Please sign in to comment.