Skip to content

Commit

Permalink
seed node work
Browse files Browse the repository at this point in the history
  • Loading branch information
rogeralsing committed Jan 31, 2024
1 parent e73ea0f commit ac62b31
Show file tree
Hide file tree
Showing 11 changed files with 282 additions and 215 deletions.
2 changes: 2 additions & 0 deletions benchmarks/GossipBenchmark/Node1/Node1.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
<LangVersion>10</LangVersion>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\..\..\src\Proto.Cluster.Consul\Proto.Cluster.Consul.csproj" />
<ProjectReference Include="..\..\..\src\Proto.Cluster.SeedNode.Redis\Proto.Cluster.SeedNode.Redis.csproj" />
<ProjectReference Include="..\..\..\src\Proto.Cluster\Proto.Cluster.csproj" />
<ProjectReference Include="..\..\..\src\Proto.Remote\Proto.Remote.csproj" />
<ProjectReference Include="..\Messages\Messages.csproj" />
Expand Down
5 changes: 4 additions & 1 deletion benchmarks/GossipBenchmark/Node1/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,14 @@
using Microsoft.Extensions.Logging;
using Proto;
using Proto.Cluster;
using Proto.Cluster.Consul;
using Proto.Cluster.Partition;
using Proto.Cluster.Seed;
using Proto.Cluster.SeedNode.Redis;
using Proto.Context;
using Proto.Remote;
using Proto.Remote.GrpcNet;
using StackExchange.Redis;
using static Proto.CancellationTokens;
using ProtosReflection = ClusterHelloWorld.Messages.ProtosReflection;

Expand Down Expand Up @@ -45,7 +48,7 @@ private static async Task Main()
.WithCluster(
ClusterConfig.Setup(
"MyCluster",
new SeedNodeClusterProvider(new(("127.0.0.1", 8090))),
new ConsulProvider(new ConsulProviderConfig()),
new PartitionIdentityLookup()
)
);
Expand Down
2 changes: 2 additions & 0 deletions benchmarks/GossipBenchmark/Node2/Node2.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
<LangVersion>10</LangVersion>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\..\..\src\Proto.Cluster.Consul\Proto.Cluster.Consul.csproj" />
<ProjectReference Include="..\..\..\src\Proto.Cluster.SeedNode.Redis\Proto.Cluster.SeedNode.Redis.csproj" />
<ProjectReference Include="..\..\..\src\Proto.Cluster\Proto.Cluster.csproj" />
<ProjectReference Include="..\..\..\src\Proto.Remote\Proto.Remote.csproj" />
<ProjectReference Include="..\Messages\Messages.csproj" />
Expand Down
5 changes: 4 additions & 1 deletion benchmarks/GossipBenchmark/Node2/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,14 @@
using Microsoft.Extensions.Logging;
using Proto;
using Proto.Cluster;
using Proto.Cluster.Consul;
using Proto.Cluster.Partition;
using Proto.Cluster.Seed;
using Proto.Cluster.SeedNode.Redis;
using Proto.Context;
using Proto.Remote;
using Proto.Remote.GrpcNet;
using StackExchange.Redis;
using static System.Threading.Tasks.Task;
using ProtosReflection = ClusterHelloWorld.Messages.ProtosReflection;

Expand Down Expand Up @@ -67,7 +70,7 @@ private static async Task Main()
ClusterConfig
.Setup(
"MyCluster",
new SeedNodeClusterProvider(),
new ConsulProvider(new ConsulProviderConfig()),
new PartitionIdentityLookup()
)
.WithClusterKind(
Expand Down
7 changes: 5 additions & 2 deletions src/Proto.Cluster/Seed/ISeedNodeDiscovery.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
using System.Threading.Tasks;
using JetBrains.Annotations;

namespace Proto.Cluster.Seed;


[PublicAPI]
public interface ISeedNodeDiscovery
{
Task Register(string memberId, string host, int port);
Task Remove(string memberId);
Task<(string memberId,string host, int port)[]> GetAll();
}
Task<(string memberId, string host, int port)[]> GetAll();
}
4 changes: 2 additions & 2 deletions src/Proto.Cluster/Seed/Messages.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ namespace Proto.Cluster.Seed;

public record Connect;

public record Connected(Member Member);
public record Connected;

public record FailedToConnect;
public record FailedToConnect;
125 changes: 74 additions & 51 deletions src/Proto.Cluster/Seed/SeedClientNodeActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,61 +14,84 @@ namespace Proto.Cluster.Seed;

public class SeedClientNodeActor : IActor
{
public const string Name = "$client_seed";
private static readonly ILogger Logger = Log.CreateLogger<SeedClientNodeActor>();
public const string Name = "$premium_client_seed";
private readonly ILogger _logger;
private readonly SeedNodeClusterProviderOptions _options;
private ClusterTopology? _clusterTopology;
private ImmutableDictionary<string, Member> _members = ImmutableDictionary<string, Member>.Empty;

private SeedClientNodeActor(SeedNodeClusterProviderOptions options)
private ImmutableDictionary<string, Member> _members = ImmutableDictionary<
string,
Member
>.Empty;

private SeedClientNodeActor(
SeedNodeClusterProviderOptions options,
ILogger logger
)
{
_options = options;
_logger = logger;
}

public Task ReceiveAsync(IContext context) =>
context.Message switch
public Task ReceiveAsync(IContext context)
{
return context.Message switch
{
Started => OnStarted(),
Connect _ => OnConnect(context),
Started => OnStarted(),
Connect _ => OnConnect(context),
ClusterTopology clusterTopology => OnClusterTopology(context, clusterTopology),
_ => Task.CompletedTask
_ => Task.CompletedTask
};
}

public static Props Props(SeedNodeClusterProviderOptions options) =>
Proto.Props.FromProducer(() => new SeedClientNodeActor(options));
public static Props Props(SeedNodeClusterProviderOptions options, ILogger logger)
{
return Proto.Props.FromProducer(() => new SeedClientNodeActor(options, logger));
}

private async Task OnConnect(IContext context)
{
var (selfHost, selfPort) = context.System.GetAddress();

foreach (var (host, port) in _options.SeedNodes)
bool connected = false;
// foreach (var (host, port) in _options.SeedNodes)
// {
// //never connect to yourself
// if (host == selfHost && port == selfPort)
// continue;
//
// try
// {
// var pid = PID.FromAddress(host + ":" + port, PremiumSeedNodeActor.Name);
//
// var res = await context.System.Root
// .RequestAsync<JoinResponse>(
// pid,
// new JoinAsClientRequest { SystemId = context.System.Id }
// )
// .ConfigureAwait(false);
//
// _members = _members.Add(res.Member.Id, res.Member);
//
// connected = true;
// _logger.LogInformation("Connected to seed node {Host}:{Port}", host, port);
//
// break;
// }
// catch (Exception x)
// {
// x.CheckFailFast();
// _logger.LogError(x, "Failed to connect to seed node {Host}:{Port}", host, port);
// }
// }

if (connected)
{
//never connect to yourself
if (host == selfHost && port == selfPort)
{
continue;
}

try
{
var pid = PID.FromAddress(host + ":" + port, SeedNodeActor.Name);

var res = await context.System.Root.RequestAsync<JoinResponse>(pid,
new JoinAsClientRequest { SystemId = context.System.Id }).ConfigureAwait(false);

_members = _members.Add(res.Member.Id, res.Member);
context.Respond(new Connected(res.Member));

break;
}
catch (Exception x)
{
x.CheckFailFast();
Logger.LogError(x, "Failed to connect to seed node {Host}:{Port}", host, port);
}
context.Respond(new Connected());
}
else
{
context.Respond(new FailedToConnect());
}

context.Respond(new FailedToConnect());
}

private async Task OnClusterTopology(IContext context, ClusterTopology clusterTopology)
Expand All @@ -80,46 +103,46 @@ private async Task OnClusterTopology(IContext context, ClusterTopology clusterTo
foreach (var member in clusterTopology.Left)
{
if (_members.ContainsKey(member.Id))
{
_members = _members.Remove(member.Id);
}

Logger.LogInformation("Removed member {member}", member);
_logger.LogInformation("Removed member {Member}", member);
}

foreach (var member in clusterTopology.Members)
{
if (_members.ContainsKey(member.Id))
{
continue;
}

var pid = PID.FromAddress(member.Address, SeedNodeActor.Name);

try
{
var res = await context.RequestAsync<JoinResponse>(pid,
new JoinAsClientRequest { SystemId = context.System.Id },
new CancellationTokenSource(5000).Token).ConfigureAwait(false);
var res = await context
.RequestAsync<JoinResponse>(
pid,
new JoinAsClientRequest { SystemId = context.System.Id },
new CancellationTokenSource(5000).Token
)
.ConfigureAwait(false);

_members = _members.Add(res.Member.Id, res.Member);
Logger.LogInformation("Connected to seed node {Member}", member.Address);
_logger.LogInformation("Connected to seed node {Member}", member.Address);
}
catch (Exception e)
{
e.CheckFailFast();
Logger.LogError(e, "Failed to connect to seed node {Member}", member.Address);
_logger.LogError(e, "Failed to connect to seed node {Member}", member.Address);
}
}

context.System.EventStream.Publish(clusterTopology);
}
}

private static Task OnStarted()
private Task OnStarted()
{
Logger.LogInformation("Started SeedClientNodeActor");
_logger.LogInformation("Started SeedClientNodeActor");

return Task.CompletedTask;
}
}
}
Loading

0 comments on commit ac62b31

Please sign in to comment.