Skip to content

Commit

Permalink
Move updating connectivity state outside of subchannel lock
Browse files Browse the repository at this point in the history
  • Loading branch information
JamesNK committed Feb 24, 2025
1 parent 60d9d2c commit 35edd39
Show file tree
Hide file tree
Showing 2 changed files with 193 additions and 2 deletions.
13 changes: 11 additions & 2 deletions src/Grpc.Net.Client/Balancer/Subchannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ public void UpdateAddresses(IReadOnlyList<BalancerAddress> addresses)
/// </summary>
public void RequestConnection()
{
var connectionRequested = false;
lock (Lock)
{
switch (_state)
Expand All @@ -245,7 +246,8 @@ public void RequestConnection()
SubchannelLog.ConnectionRequested(_logger, Id);

// Only start connecting underlying transport if in an idle state.
UpdateConnectivityState(ConnectivityState.Connecting, "Connection requested.");
// Update connectivity state outside of subchannel lock to avoid deadlock.
connectionRequested = true;
break;
case ConnectivityState.Connecting:
case ConnectivityState.Ready:
Expand All @@ -264,6 +266,11 @@ public void RequestConnection()
}
}

if (connectionRequested)
{
UpdateConnectivityState(ConnectivityState.Connecting, "Connection requested.");
}

// Don't capture the current ExecutionContext and its AsyncLocals onto the connect
var restoreFlow = false;
if (!ExecutionContext.IsFlowSuppressed())
Expand Down Expand Up @@ -448,6 +455,8 @@ internal bool UpdateConnectivityState(ConnectivityState state, string successDet

internal bool UpdateConnectivityState(ConnectivityState state, Status status)
{
Debug.Assert(!Monitor.IsEntered(Lock), "Ensure the subchannel lock isn't held here. Updating channel state with the subchannel lock can cause a deadlock.");

lock (Lock)
{
// Don't update subchannel state if the state is the same or the subchannel has been shutdown.
Expand All @@ -462,7 +471,7 @@ internal bool UpdateConnectivityState(ConnectivityState state, Status status)
}
_state = state;
}

// Notify channel outside of lock to avoid deadlocks.
_manager.OnSubchannelStateChange(this, state, status);
return true;
Expand Down
182 changes: 182 additions & 0 deletions test/Grpc.Net.Client.Tests/Balancer/ConnectionManagerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#endregion

#if SUPPORT_LOAD_BALANCING
using System.Diagnostics;
using System.Net;
using System.Threading.Channels;
using Greet;
Expand All @@ -29,6 +30,7 @@
using Grpc.Tests.Shared;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Logging.Testing;
using NUnit.Framework;
using ChannelState = Grpc.Net.Client.Balancer.ChannelState;
Expand Down Expand Up @@ -535,6 +537,186 @@ public async Task PickAsync_DoesNotDeadlockAfterReconnect_WithZeroAddressResolve
await pickTask.DefaultTimeout();
}

[Test]
public async Task PickAsync_UpdateAddressesWhileRequestingConnection_DoesNotDeadlock()
{
var services = new ServiceCollection();
services.AddNUnitLogger();

var testSink = new TestSink();
var testProvider = new TestLoggerProvider(testSink);

services.AddLogging(b =>
{
b.AddProvider(testProvider);
});

await using var serviceProvider = services.BuildServiceProvider();
var loggerFactory = serviceProvider.GetRequiredService<ILoggerFactory>();

var resolver = new TestResolver(loggerFactory);
resolver.UpdateAddresses(new List<BalancerAddress>
{
new BalancerAddress("localhost", 80)
});

var channelOptions = new GrpcChannelOptions();

var transportFactory = new TestSubchannelTransportFactory();
var clientChannel = CreateConnectionManager(loggerFactory, resolver, transportFactory, new[] { new PickFirstBalancerFactory() });
// Configure balancer similar to how GrpcChannel constructor does it
clientChannel.ConfigureBalancer(c => new ChildHandlerLoadBalancer(
c,
channelOptions.ServiceConfig,
clientChannel));

await clientChannel.ConnectAsync(waitForReady: true, cancellationToken: CancellationToken.None);

transportFactory.Transports.ForEach(t => t.Disconnect());

var requestConnectionSyncPoint = new SyncPoint(runContinuationsAsynchronously: true);
testSink.MessageLogged += (w) =>
{
if (w.EventId.Name == "ConnectionRequested")
{
requestConnectionSyncPoint.WaitToContinue().Wait();
}
};

// Task should pause when requesting connection because of the logger sink.
var pickTask = Task.Run(() => clientChannel.PickAsync(
new PickContext { Request = new HttpRequestMessage() },
waitForReady: true,
CancellationToken.None).AsTask());

// Wait until we're paused on requesting a connection.
await requestConnectionSyncPoint.WaitForSyncPoint().DefaultTimeout();

// Update addresses while requesting a connection.
var updateAddressesTcs = new TaskCompletionSource<object?>(TaskCreationOptions.RunContinuationsAsynchronously);
var updateAddressesTask = Task.Run(() =>
{
updateAddressesTcs.TrySetResult(null);
resolver.UpdateAddresses(new List<BalancerAddress>
{
new BalancerAddress("localhost", 81)
});
});

// There isn't a clean way to wait for UpdateAddresses to be waiting for the subchannel lock.
// Use a long delay to ensure we're waiting for the lock and are in the right state.
await updateAddressesTcs.Task.DefaultTimeout();
await Task.Delay(500);
requestConnectionSyncPoint.Continue();

// Ensure the pick completes without deadlock.
try
{
await pickTask.DefaultTimeout();
}
catch (TimeoutException ex)
{
throw new InvalidOperationException("Likely deadlock when picking subchannel.", ex);
}
}

[Test]
public async Task PickAsync_ResolverUpdating_DoesNotDeadlock()
{
// Arrange
var services = new ServiceCollection();
//services.AddNUnitLogger();
await using var serviceProvider = services.BuildServiceProvider();
var loggerFactory = NullLoggerFactory.Instance;// serviceProvider.GetRequiredService<ILoggerFactory>();

var resolver = new TestResolver(loggerFactory);

var channelOptions = new GrpcChannelOptions();

var transportFactory = new TestSubchannelTransportFactory();
var clientChannel = CreateConnectionManager(loggerFactory, resolver, transportFactory, new[] { new PickFirstBalancerFactory() });
// Configure balancer similar to how GrpcChannel constructor does it
clientChannel.ConfigureBalancer(c => new ChildHandlerLoadBalancer(
c,
channelOptions.ServiceConfig,
clientChannel));

// Act
var connectTask = clientChannel.ConnectAsync(waitForReady: true, cancellationToken: CancellationToken.None);

var t = Task.Run(async () =>
{
while (true)
{
await Task.Delay(1000);
}
});

// continiously update addresses
_ = Task.Run(async () =>
{
var a1 = new List<BalancerAddress>
{
new BalancerAddress("localhost", 80)
};
var a2 = new List<BalancerAddress>
{
new BalancerAddress("localhost", 81)
};
var current = a1;

var count = 0;
while (true)
{
current = count % 2 == 0 ? a1 : a2;
//if (count > 10_000)
//{
// updateAddressesTcs.TrySetResult(null);
//}

resolver.UpdateAddresses(current);
count++;

//updateAddressChannel.Writer.TryWrite(true);

await Task.Delay(1);
}
});

// Simulate transport/network issue (with resolver reporting no addresses)
//transportFactory.Transports.ForEach(t => t.Disconnect());

try
{
for (int i = 0; i < 10_000; i++)
{

var pickTask = clientChannel.PickAsync(
new PickContext { Request = new HttpRequestMessage() },
waitForReady: true,
CancellationToken.None).AsTask();
//resolver.UpdateAddresses(new List<BalancerAddress>
//{
// new BalancerAddress("localhost", 80)
//});

// Assert
// Should not timeout (deadlock)
await pickTask.DefaultTimeout();
await Task.Delay(1);
}

}
catch (Exception ex)
{
_ = ex;
Debugger.Launch();
throw;
}

await t;
}

[Test]
public async Task PickAsync_ExecutionContext_DoesNotCaptureAsyncLocalsInConnect()
{
Expand Down

0 comments on commit 35edd39

Please sign in to comment.