From 8023a7adee2c7fa2fae583ad7f1044483f967ab1 Mon Sep 17 00:00:00 2001 From: Roger Johansson Date: Fri, 27 Jan 2023 19:46:18 +0100 Subject: [PATCH] Harden gossip consensus (#1911) * test larger clusters * remove redundant purge --- src/Proto.Cluster/Gossip/Gossip.cs | 6 ++---- src/Proto.Cluster/Gossip/GossipActor.cs | 4 +--- src/Proto.Cluster/Gossip/Gossiper.cs | 2 +- src/Proto.Cluster/GossipContracts.proto | 2 +- tests/Proto.Cluster.Tests/GossipCoreTests.cs | 4 ++-- 5 files changed, 7 insertions(+), 11 deletions(-) diff --git a/src/Proto.Cluster/Gossip/Gossip.cs b/src/Proto.Cluster/Gossip/Gossip.cs index 198b70aba3..924fd4bb55 100644 --- a/src/Proto.Cluster/Gossip/Gossip.cs +++ b/src/Proto.Cluster/Gossip/Gossip.cs @@ -90,9 +90,7 @@ public ImmutableList ReceiveState(GossipState remoteState) } _state = newState; - - //TODO: Optimize - Purge(); + CheckConsensus(updatedKeys); return updates.ToImmutableList(); @@ -253,7 +251,7 @@ private void Purge() { //find all members that have sent topology var members = _getMembers(); - + foreach (var memberId in _state.Members.Keys.ToArray()) { if (!members.Contains(memberId)) diff --git a/src/Proto.Cluster/Gossip/GossipActor.cs b/src/Proto.Cluster/Gossip/GossipActor.cs index 5921aa2a1c..012692e41a 100644 --- a/src/Proto.Cluster/Gossip/GossipActor.cs +++ b/src/Proto.Cluster/Gossip/GossipActor.cs @@ -16,7 +16,6 @@ public class GossipActor : IActor private static readonly ILogger Logger = Log.CreateLogger(); private readonly TimeSpan _gossipRequestTimeout; private readonly IGossip _internal; - private readonly ActorSystem _system; // lookup from state key -> consensus checks @@ -29,11 +28,10 @@ public GossipActor( int gossipMaxSend ) { - _system = system; _gossipRequestTimeout = gossipRequestTimeout; _internal = new Gossip(myId, gossipFanout, gossipMaxSend, instanceLogger, - () => _system.Cluster().MemberList.GetMembers()); + () => system.Cluster().MemberList.GetMembers()); } public Task ReceiveAsync(IContext context) => diff --git a/src/Proto.Cluster/Gossip/Gossiper.cs b/src/Proto.Cluster/Gossip/Gossiper.cs index 5a1e43d75d..61e3cf723f 100644 --- a/src/Proto.Cluster/Gossip/Gossiper.cs +++ b/src/Proto.Cluster/Gossip/Gossiper.cs @@ -114,7 +114,7 @@ public async Task> GetStateEntry(str try { var res = await _context.RequestAsync(_pid, - new GetGossipStateEntryRequest(key)); + new GetGossipStateEntryRequest(key),CancellationTokens.FromSeconds(5)); return res.State; } diff --git a/src/Proto.Cluster/GossipContracts.proto b/src/Proto.Cluster/GossipContracts.proto index 3e2273ac91..92957005e9 100644 --- a/src/Proto.Cluster/GossipContracts.proto +++ b/src/Proto.Cluster/GossipContracts.proto @@ -32,7 +32,7 @@ message GossipState { //we still know when _we_ as in this node, got this data. //and we can measure time from then til now. // -//if we got a hear-beat from another node, and X seconds pass, we can assume it to be dead +//if we got a heartbeat from another node, and X seconds pass, we can assume it to be dead message GossipKeyValue { int64 sequence_number = 2; //version is local to the owner member google.protobuf.Any value = 4; //value is any format diff --git a/tests/Proto.Cluster.Tests/GossipCoreTests.cs b/tests/Proto.Cluster.Tests/GossipCoreTests.cs index 047123082e..d8de25f45e 100644 --- a/tests/Proto.Cluster.Tests/GossipCoreTests.cs +++ b/tests/Proto.Cluster.Tests/GossipCoreTests.cs @@ -30,8 +30,8 @@ public GossipCoreTests(ITestOutputHelper output) [Fact] public async Task Large_cluster_should_get_topology_consensus() { - const int memberCount = 50; - const int fanout = 4; + const int memberCount = 300; + const int fanout = 3; var members = Enumerable