Skip to content

Commit

Permalink
Spanmap (#1887)
Browse files Browse the repository at this point in the history
* tracing in pubsub tests
rogeralsing authored Dec 6, 2022
1 parent bd33bb1 commit f6963ed
Showing 3 changed files with 220 additions and 188 deletions.
360 changes: 196 additions & 164 deletions tests/Proto.Cluster.PubSub.Tests/PubSubTests.cs
Original file line number Diff line number Diff line change
@@ -30,281 +30,313 @@ public PubSubTests(PubSubClusterFixture fixture, ITestOutputHelper output)
[Fact]
public async Task Can_deliver_single_messages()
{
var subscriberIds = _fixture.SubscriberIds("single-test", 20);
const string topic = "single-test-topic";
const int numMessages = 100;

await _fixture.SubscribeAllTo(topic, subscriberIds);

for (var i = 0; i < numMessages; i++)
await _fixture.Trace(async () =>
{
var response = await _fixture.PublishData(topic, i);
var subscriberIds = _fixture.SubscriberIds("single-test", 20);
const string topic = "single-test-topic";
const int numMessages = 100;

await _fixture.SubscribeAllTo(topic, subscriberIds);

if (response == null)
for (var i = 0; i < numMessages; i++)
{
_output.WriteLine(await _fixture.Members.DumpClusterState());
}
var response = await _fixture.PublishData(topic, i);

if (response == null)
{
_output.WriteLine(await _fixture.Members.DumpClusterState());
}

response.Should().NotBeNull("publishing should not time out");
response!.Status.Should().Be(PublishStatus.Ok);
}
response.Should().NotBeNull("publishing should not time out");
response!.Status.Should().Be(PublishStatus.Ok);
}

await _fixture.VerifyAllSubscribersGotAllTheData(subscriberIds, numMessages);
await _fixture.VerifyAllSubscribersGotAllTheData(subscriberIds, numMessages);
});
}

[Fact]
public async Task Can_deliver_message_batches()
{
var subscriberIds = _fixture.SubscriberIds("batch-test", 20);
const string topic = "batch-test-topic";
const int numMessages = 100;

await _fixture.SubscribeAllTo(topic, subscriberIds);

for (var i = 0; i < numMessages / 10; i++)
await _fixture.Trace(async () =>
{
var data = Enumerable.Range(i * 10, 10).ToArray();
var response = await _fixture.PublishDataBatch(topic, data);
var subscriberIds = _fixture.SubscriberIds("batch-test", 20);
const string topic = "batch-test-topic";
const int numMessages = 100;

await _fixture.SubscribeAllTo(topic, subscriberIds);

if (response == null)
for (var i = 0; i < numMessages / 10; i++)
{
_output.WriteLine(await _fixture.Members.DumpClusterState());
}
var data = Enumerable.Range(i * 10, 10).ToArray();
var response = await _fixture.PublishDataBatch(topic, data);

response.Should().NotBeNull("publishing should not time out");
}
if (response == null)
{
_output.WriteLine(await _fixture.Members.DumpClusterState());
}

response.Should().NotBeNull("publishing should not time out");
}

await _fixture.VerifyAllSubscribersGotAllTheData(subscriberIds, numMessages);
await _fixture.VerifyAllSubscribersGotAllTheData(subscriberIds, numMessages);
});
}

[Fact]
public async Task Unsubscribed_actor_does_not_receive_messages()
{
const string sub1 = "unsubscribe-test-1";
const string sub2 = "unsubscribe-test-2";
const string topic = "unsubscribe-test";
await _fixture.Trace(async () =>
{
const string sub1 = "unsubscribe-test-1";
const string sub2 = "unsubscribe-test-2";
const string topic = "unsubscribe-test";

await _fixture.SubscribeTo(topic, sub1);
await _fixture.SubscribeTo(topic, sub2);
await _fixture.SubscribeTo(topic, sub1);
await _fixture.SubscribeTo(topic, sub2);

await _fixture.UnsubscribeFrom(topic, sub2);
await _fixture.UnsubscribeFrom(topic, sub2);

await _fixture.PublishData(topic, 1);
await Task.Delay(1000); // give time for the message "not to be delivered" to second subscriber
await _fixture.PublishData(topic, 1);
await Task.Delay(1000); // give time for the message "not to be delivered" to second subscriber

await WaitUntil(() => _fixture.Deliveries.Count == 1,
"only one delivery should happen because the other actor is unsubscribed");
await WaitUntil(() => _fixture.Deliveries.Count == 1,
"only one delivery should happen because the other actor is unsubscribed");

_fixture.Deliveries.Should()
.HaveCount(1, "only one delivery should happen because the other actor is unsubscribed");
_fixture.Deliveries.Should()
.HaveCount(1, "only one delivery should happen because the other actor is unsubscribed");

_fixture.Deliveries.First().Identity.Should().Be(sub1, "the other actor should be unsubscribed");
_fixture.Deliveries.First().Identity.Should().Be(sub1, "the other actor should be unsubscribed");
});
}

[Fact]
public async Task Can_subscribe_with_PID()
{
const string topic = "pid-subscribe";
await _fixture.Trace(async () =>
{
const string topic = "pid-subscribe";

DataPublished? deliveredMessage = null;
DataPublished? deliveredMessage = null;

var props = Props.FromFunc(ctx =>
{
if (ctx.Message is DataPublished d)
var props = Props.FromFunc(ctx =>
{
deliveredMessage = d;
}
if (ctx.Message is DataPublished d)
{
deliveredMessage = d;
}

return Task.CompletedTask;
}
);
return Task.CompletedTask;
}
);

var member = _fixture.Members.First();
var pid = member.System.Root.Spawn(props);
await member.Subscribe(topic, pid);
var member = _fixture.Members.First();
var pid = member.System.Root.Spawn(props);
await member.Subscribe(topic, pid);

await _fixture.PublishData(topic, 1);
await _fixture.PublishData(topic, 1);

await WaitUntil(() => deliveredMessage != null, "Message should be delivered");
deliveredMessage.Should().BeEquivalentTo(new DataPublished(1));
await WaitUntil(() => deliveredMessage != null, "Message should be delivered");
deliveredMessage.Should().BeEquivalentTo(new DataPublished(1));
});
}

[Fact]
public async Task Can_unsubscribe_with_PID()
{
const string topic = "pid-unsubscribe";
await _fixture.Trace(async () =>
{
const string topic = "pid-unsubscribe";

var deliveryCount = 0;
var deliveryCount = 0;

var props = Props.FromFunc(ctx =>
{
if (ctx.Message is DataPublished)
var props = Props.FromFunc(ctx =>
{
Interlocked.Increment(ref deliveryCount);
}
if (ctx.Message is DataPublished)
{
Interlocked.Increment(ref deliveryCount);
}

return Task.CompletedTask;
}
);
return Task.CompletedTask;
}
);

var member = _fixture.Members.First();
var pid = member.System.Root.Spawn(props);
var member = _fixture.Members.First();
var pid = member.System.Root.Spawn(props);

await member.Subscribe(topic, pid);
await member.Unsubscribe(topic, pid);
await member.Subscribe(topic, pid);
await member.Unsubscribe(topic, pid);

await _fixture.PublishData(topic, 1);
await Task.Delay(1000); // give time for the message "not to be delivered" to second subscriber
await _fixture.PublishData(topic, 1);
await Task.Delay(1000); // give time for the message "not to be delivered" to second subscriber

deliveryCount.Should().Be(0);
deliveryCount.Should().Be(0);
});
}

[Fact]
public async Task Stopped_actor_that_did_not_unsubscribe_does_not_block_publishing_to_topic()
{
const string topic = "missing-unsubscribe";
await _fixture.Trace(async () =>
{
const string topic = "missing-unsubscribe";

var deliveryCount = 0;
var deliveryCount = 0;

// this scenario is only relevant for regular actors,
// virtual actors always exist, so the msgs should never be deadlettered
var props = Props.FromFunc(ctx =>
{
if (ctx.Message is DataPublished)
// this scenario is only relevant for regular actors,
// virtual actors always exist, so the msgs should never be deadlettered
var props = Props.FromFunc(ctx =>
{
Interlocked.Increment(ref deliveryCount);
}
if (ctx.Message is DataPublished)
{
Interlocked.Increment(ref deliveryCount);
}

return Task.CompletedTask;
}
);
return Task.CompletedTask;
}
);

// spawn two actors and subscribe them to the topic
var member = _fixture.Members.First();
var pid1 = member.System.Root.Spawn(props);
var pid2 = member.System.Root.Spawn(props);
// spawn two actors and subscribe them to the topic
var member = _fixture.Members.First();
var pid1 = member.System.Root.Spawn(props);
var pid2 = member.System.Root.Spawn(props);

await member.Subscribe(topic, pid1);
await member.Subscribe(topic, pid2);
await member.Subscribe(topic, pid1);
await member.Subscribe(topic, pid2);

// publish one message
await _fixture.PublishData(topic, 1);
await WaitUntil(() => deliveryCount == 2, "both messages should be delivered");
// publish one message
await _fixture.PublishData(topic, 1);
await WaitUntil(() => deliveryCount == 2, "both messages should be delivered");

// kill one of the actors
await member.System.Root.StopAsync(pid2);
// kill one of the actors
await member.System.Root.StopAsync(pid2);

// publish again
var response = await _fixture.PublishData(topic, 2);
// publish again
var response = await _fixture.PublishData(topic, 2);

response.Should().NotBeNull("the publish operation shouldn't have timed out");
await WaitUntil(() => deliveryCount == 3, "second publish should be delivered only to one of the actors");
response.Should().NotBeNull("the publish operation shouldn't have timed out");
await WaitUntil(() => deliveryCount == 3, "second publish should be delivered only to one of the actors");

await WaitUntil(async () =>
{
var subscribers = await _fixture.GetSubscribersForTopic(topic);
await WaitUntil(async () =>
{
var subscribers = await _fixture.GetSubscribersForTopic(topic);

return !subscribers.Subscribers_!.Contains(new SubscriberIdentity { Pid = pid2 });
}
);
return !subscribers.Subscribers_!.Contains(new SubscriberIdentity { Pid = pid2 });
}
);
});
}

[Fact]
public async Task Slow_PID_subscriber_that_times_out_does_not_prevent_subsequent_publishes()
{
const string topic = "slow-pid-subscriber";
await _fixture.Trace(async () =>
{
const string topic = "slow-pid-subscriber";

var deliveryCount = 0;
var deliveryCount = 0;

// a slow subscriber that times out
var props = Props.FromFunc(async ctx =>
{
await Task.Delay(4000,
_fixture
.CancelWhenDisposing); // 4 seconds is longer than the subscriber timeout configured in the test fixture
// a slow subscriber that times out
var props = Props.FromFunc(async ctx =>
{
await Task.Delay(4000,
_fixture
.CancelWhenDisposing); // 4 seconds is longer than the subscriber timeout configured in the test fixture

Interlocked.Increment(ref deliveryCount);
}
);
Interlocked.Increment(ref deliveryCount);
}
);

// subscribe
var member = _fixture.RandomMember();
var pid = member.System.Root.Spawn(props);
await member.Subscribe(topic, pid);
// subscribe
var member = _fixture.RandomMember();
var pid = member.System.Root.Spawn(props);
await member.Subscribe(topic, pid);

// publish one message
await _fixture.PublishData(topic, 1);
// publish one message
await _fixture.PublishData(topic, 1);

// next published message should also be delivered
await _fixture.PublishData(topic, 1);
// next published message should also be delivered
await _fixture.PublishData(topic, 1);

await WaitUntil(() => deliveryCount == 2,
"A timing out subscriber should not prevent subsequent publishes", TimeSpan.FromSeconds(10)
);
await WaitUntil(() => deliveryCount == 2,
"A timing out subscriber should not prevent subsequent publishes", TimeSpan.FromSeconds(10)
);
});
}

[Fact]
public async Task Slow_ClusterIdentity_subscriber_that_times_out_does_not_prevent_subsequent_publishes()
{
const string topic = "slow-ci-subscriber";
await _fixture.Trace(async () =>
{
const string topic = "slow-ci-subscriber";

// subscribe
await _fixture.SubscribeTo(topic, "slow-ci-1", PubSubClusterFixture.TimeoutSubscriberKind);
// subscribe
await _fixture.SubscribeTo(topic, "slow-ci-1", PubSubClusterFixture.TimeoutSubscriberKind);

// publish one message
await _fixture.PublishData(topic, 1);
// publish one message
await _fixture.PublishData(topic, 1);

// next published message should also be delivered
await _fixture.PublishData(topic, 1);
// next published message should also be delivered
await _fixture.PublishData(topic, 1);

await WaitUntil(() => _fixture.Deliveries.Count == 2,
"A timing out subscriber should not prevent subsequent publishes", TimeSpan.FromSeconds(10)
);
await WaitUntil(() => _fixture.Deliveries.Count == 2,
"A timing out subscriber should not prevent subsequent publishes", TimeSpan.FromSeconds(10)
);
});
}

[Fact]
[SuppressMessage("ReSharper", "AccessToDisposedClosure")]
public async Task Can_publish_messages_via_batching_producer()
{
var subscriberIds = _fixture.SubscriberIds("batching-producer-test", 20);
const string topic = "batching-producer";
const int numMessages = 100;
await _fixture.Trace(async () =>
{
var subscriberIds = _fixture.SubscriberIds("batching-producer-test", 20);
const string topic = "batching-producer";
const int numMessages = 100;

await _fixture.SubscribeAllTo(topic, subscriberIds);
await _fixture.SubscribeAllTo(topic, subscriberIds);

await using var producer = _fixture.Members.First()
.BatchingProducer(topic, new BatchingProducerConfig { BatchSize = 10 });
await using var producer = _fixture.Members.First()
.BatchingProducer(topic, new BatchingProducerConfig { BatchSize = 10 });

var tasks = Enumerable.Range(0, numMessages).Select(i => producer.ProduceAsync(new DataPublished(i)));
await Task.WhenAll(tasks);
var tasks = Enumerable.Range(0, numMessages).Select(i => producer.ProduceAsync(new DataPublished(i)));
await Task.WhenAll(tasks);

await _fixture.VerifyAllSubscribersGotAllTheData(subscriberIds, numMessages);
await _fixture.VerifyAllSubscribersGotAllTheData(subscriberIds, numMessages);
});
}

[Fact]
public async Task Will_expire_topic_actor_after_idle()
{
var subscriberIds = _fixture.SubscriberIds("batching-producer-test", 20);
const string topic = "batching-producer";
const int numMessages = 100;
await _fixture.Trace(async () =>
{
var subscriberIds = _fixture.SubscriberIds("batching-producer-test", 20);
const string topic = "batching-producer";
const int numMessages = 100;

await _fixture.SubscribeAllTo(topic, subscriberIds);
await _fixture.SubscribeAllTo(topic, subscriberIds);

var firstCluster = _fixture.Members.First();
var firstCluster = _fixture.Members.First();

await using var producer = firstCluster
.BatchingProducer(topic, new BatchingProducerConfig {PublisherIdleTimeout = TimeSpan.FromSeconds(2)});
await using var producer = firstCluster
.BatchingProducer(topic, new BatchingProducerConfig { PublisherIdleTimeout = TimeSpan.FromSeconds(2) });

var tasks = Enumerable.Range(0, numMessages).Select(i => producer.ProduceAsync(new DataPublished(i)));
await Task.WhenAll(tasks);
var tasks = Enumerable.Range(0, numMessages).Select(i => producer.ProduceAsync(new DataPublished(i)));
await Task.WhenAll(tasks);

var pid = await firstCluster.GetAsync(ClusterIdentity.Create(topic, TopicActor.Kind), CancellationTokens.FromSeconds(2));
Assert.NotNull(pid);
var pid = await firstCluster.GetAsync(ClusterIdentity.Create(topic, TopicActor.Kind),
CancellationTokens.FromSeconds(2));
Assert.NotNull(pid);

await Task.Delay(TimeSpan.FromSeconds(5));
await Task.Delay(TimeSpan.FromSeconds(5));

var newPid = await firstCluster.GetAsync(ClusterIdentity.Create(topic, TopicActor.Kind), CancellationTokens.FromSeconds(2));
Assert.NotEqual(newPid, pid);
var newPid = await firstCluster.GetAsync(ClusterIdentity.Create(topic, TopicActor.Kind),
CancellationTokens.FromSeconds(2));
Assert.NotEqual(newPid, pid);
});
}

private void Log(string message) => _output.WriteLine($"[{DateTime.Now:hh:mm:ss.fff}] {message}");
2 changes: 1 addition & 1 deletion tests/Proto.Cluster.Tests/ClusterFixture.cs
Original file line number Diff line number Diff line change
@@ -185,7 +185,7 @@ public async Task RemoveNode(Cluster member, bool graceful = true)
}
}

public Task Trace(Func<Task> test, string testName = "")
public Task Trace(Func<Task> test, [CallerMemberName] string testName = "")
{
return _reporter.Run(test, testName);
}
46 changes: 23 additions & 23 deletions tests/Proto.Cluster.Tests/GithubActionsReporter.cs
Original file line number Diff line number Diff line change
@@ -26,48 +26,48 @@ public GithubActionsReporter(string reportName)
private static Activity? StartActivity([CallerMemberName] string callerName = "N/A") =>
ActivitySource.StartActivity(callerName);

private List<TestResult> _results = new();
private readonly List<TestResult> _results = new();

private record TestResult(string Name, string TraceId, TimeSpan Duration, Exception? Exception= null);

private readonly StringBuilder _output = new();

public async Task Run(Func<Task> test, [CallerMemberName]string testName="")
public async Task Run(Func<Task> test, [CallerMemberName] string testName = "")
{
await Task.Delay(1).ConfigureAwait(false);

using var activity = StartActivity(testName);
var traceId= activity?.Context.TraceId.ToString().ToUpperInvariant() ?? "N/A";
var traceId = activity?.Context.TraceId.ToString().ToUpperInvariant() ?? "N/A";
Logger.LogInformation("Test started");
Exception? exception = null;

var sw = Stopwatch.StartNew();

if (activity is not null)
try
{
traceId = activity.TraceId.ToString();
activity.AddTag("test.name", testName);

var traceViewUrl =
$"{TracingSettings.TraceViewUrl}/logs?traceId={traceId}";
if (activity is not null)
{
traceId = activity.TraceId.ToString();
activity.AddTag("test.name", testName);

Console.WriteLine($"Running test: {testName}");
Console.WriteLine(traceViewUrl);
}
var traceViewUrl =
$"{TracingSettings.TraceViewUrl}/logs?traceId={traceId}";

try
{
Console.WriteLine($"Running test: {testName}");
Console.WriteLine(traceViewUrl);
}

await test();
Logger.LogInformation("Test succeeded");
_results.Add(new TestResult(testName, traceId, sw.Elapsed));
}
catch(Exception x)
catch (Exception x)
{
exception = x;
Logger.LogError(x,"Test failed");
_results.Add(new TestResult(testName, traceId, sw.Elapsed, x));
Logger.LogError(x, "Test failed");
throw;
}
sw.Stop();
if (activity is not null)
finally
{
_results.Add(new TestResult(testName, traceId, sw.Elapsed, exception));
sw.Stop();
}
}

0 comments on commit f6963ed

Please sign in to comment.