From 35edd39192b8aafc185bfb924c914a817312e5d2 Mon Sep 17 00:00:00 2001 From: James Newton-King Date: Mon, 24 Feb 2025 23:38:15 +0800 Subject: [PATCH] Move updating connectivity state outside of subchannel lock --- src/Grpc.Net.Client/Balancer/Subchannel.cs | 13 +- .../Balancer/ConnectionManagerTests.cs | 182 ++++++++++++++++++ 2 files changed, 193 insertions(+), 2 deletions(-) diff --git a/src/Grpc.Net.Client/Balancer/Subchannel.cs b/src/Grpc.Net.Client/Balancer/Subchannel.cs index db0a2ee42..d80d5adeb 100644 --- a/src/Grpc.Net.Client/Balancer/Subchannel.cs +++ b/src/Grpc.Net.Client/Balancer/Subchannel.cs @@ -237,6 +237,7 @@ public void UpdateAddresses(IReadOnlyList addresses) /// public void RequestConnection() { + var connectionRequested = false; lock (Lock) { switch (_state) @@ -245,7 +246,8 @@ public void RequestConnection() SubchannelLog.ConnectionRequested(_logger, Id); // Only start connecting underlying transport if in an idle state. - UpdateConnectivityState(ConnectivityState.Connecting, "Connection requested."); + // Update connectivity state outside of subchannel lock to avoid deadlock. + connectionRequested = true; break; case ConnectivityState.Connecting: case ConnectivityState.Ready: @@ -264,6 +266,11 @@ public void RequestConnection() } } + if (connectionRequested) + { + UpdateConnectivityState(ConnectivityState.Connecting, "Connection requested."); + } + // Don't capture the current ExecutionContext and its AsyncLocals onto the connect var restoreFlow = false; if (!ExecutionContext.IsFlowSuppressed()) @@ -448,6 +455,8 @@ internal bool UpdateConnectivityState(ConnectivityState state, string successDet internal bool UpdateConnectivityState(ConnectivityState state, Status status) { + Debug.Assert(!Monitor.IsEntered(Lock), "Ensure the subchannel lock isn't held here. Updating channel state with the subchannel lock can cause a deadlock."); + lock (Lock) { // Don't update subchannel state if the state is the same or the subchannel has been shutdown. @@ -462,7 +471,7 @@ internal bool UpdateConnectivityState(ConnectivityState state, Status status) } _state = state; } - + // Notify channel outside of lock to avoid deadlocks. _manager.OnSubchannelStateChange(this, state, status); return true; diff --git a/test/Grpc.Net.Client.Tests/Balancer/ConnectionManagerTests.cs b/test/Grpc.Net.Client.Tests/Balancer/ConnectionManagerTests.cs index 743639886..3dbf36beb 100644 --- a/test/Grpc.Net.Client.Tests/Balancer/ConnectionManagerTests.cs +++ b/test/Grpc.Net.Client.Tests/Balancer/ConnectionManagerTests.cs @@ -17,6 +17,7 @@ #endregion #if SUPPORT_LOAD_BALANCING +using System.Diagnostics; using System.Net; using System.Threading.Channels; using Greet; @@ -29,6 +30,7 @@ using Grpc.Tests.Shared; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Extensions.Logging.Testing; using NUnit.Framework; using ChannelState = Grpc.Net.Client.Balancer.ChannelState; @@ -535,6 +537,186 @@ public async Task PickAsync_DoesNotDeadlockAfterReconnect_WithZeroAddressResolve await pickTask.DefaultTimeout(); } + [Test] + public async Task PickAsync_UpdateAddressesWhileRequestingConnection_DoesNotDeadlock() + { + var services = new ServiceCollection(); + services.AddNUnitLogger(); + + var testSink = new TestSink(); + var testProvider = new TestLoggerProvider(testSink); + + services.AddLogging(b => + { + b.AddProvider(testProvider); + }); + + await using var serviceProvider = services.BuildServiceProvider(); + var loggerFactory = serviceProvider.GetRequiredService(); + + var resolver = new TestResolver(loggerFactory); + resolver.UpdateAddresses(new List + { + new BalancerAddress("localhost", 80) + }); + + var channelOptions = new GrpcChannelOptions(); + + var transportFactory = new TestSubchannelTransportFactory(); + var clientChannel = CreateConnectionManager(loggerFactory, resolver, transportFactory, new[] { new PickFirstBalancerFactory() }); + // Configure balancer similar to how GrpcChannel constructor does it + clientChannel.ConfigureBalancer(c => new ChildHandlerLoadBalancer( + c, + channelOptions.ServiceConfig, + clientChannel)); + + await clientChannel.ConnectAsync(waitForReady: true, cancellationToken: CancellationToken.None); + + transportFactory.Transports.ForEach(t => t.Disconnect()); + + var requestConnectionSyncPoint = new SyncPoint(runContinuationsAsynchronously: true); + testSink.MessageLogged += (w) => + { + if (w.EventId.Name == "ConnectionRequested") + { + requestConnectionSyncPoint.WaitToContinue().Wait(); + } + }; + + // Task should pause when requesting connection because of the logger sink. + var pickTask = Task.Run(() => clientChannel.PickAsync( + new PickContext { Request = new HttpRequestMessage() }, + waitForReady: true, + CancellationToken.None).AsTask()); + + // Wait until we're paused on requesting a connection. + await requestConnectionSyncPoint.WaitForSyncPoint().DefaultTimeout(); + + // Update addresses while requesting a connection. + var updateAddressesTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var updateAddressesTask = Task.Run(() => + { + updateAddressesTcs.TrySetResult(null); + resolver.UpdateAddresses(new List + { + new BalancerAddress("localhost", 81) + }); + }); + + // There isn't a clean way to wait for UpdateAddresses to be waiting for the subchannel lock. + // Use a long delay to ensure we're waiting for the lock and are in the right state. + await updateAddressesTcs.Task.DefaultTimeout(); + await Task.Delay(500); + requestConnectionSyncPoint.Continue(); + + // Ensure the pick completes without deadlock. + try + { + await pickTask.DefaultTimeout(); + } + catch (TimeoutException ex) + { + throw new InvalidOperationException("Likely deadlock when picking subchannel.", ex); + } + } + + [Test] + public async Task PickAsync_ResolverUpdating_DoesNotDeadlock() + { + // Arrange + var services = new ServiceCollection(); + //services.AddNUnitLogger(); + await using var serviceProvider = services.BuildServiceProvider(); + var loggerFactory = NullLoggerFactory.Instance;// serviceProvider.GetRequiredService(); + + var resolver = new TestResolver(loggerFactory); + + var channelOptions = new GrpcChannelOptions(); + + var transportFactory = new TestSubchannelTransportFactory(); + var clientChannel = CreateConnectionManager(loggerFactory, resolver, transportFactory, new[] { new PickFirstBalancerFactory() }); + // Configure balancer similar to how GrpcChannel constructor does it + clientChannel.ConfigureBalancer(c => new ChildHandlerLoadBalancer( + c, + channelOptions.ServiceConfig, + clientChannel)); + + // Act + var connectTask = clientChannel.ConnectAsync(waitForReady: true, cancellationToken: CancellationToken.None); + + var t = Task.Run(async () => + { + while (true) + { + await Task.Delay(1000); + } + }); + + // continiously update addresses + _ = Task.Run(async () => + { + var a1 = new List + { + new BalancerAddress("localhost", 80) + }; + var a2 = new List + { + new BalancerAddress("localhost", 81) + }; + var current = a1; + + var count = 0; + while (true) + { + current = count % 2 == 0 ? a1 : a2; + //if (count > 10_000) + //{ + // updateAddressesTcs.TrySetResult(null); + //} + + resolver.UpdateAddresses(current); + count++; + + //updateAddressChannel.Writer.TryWrite(true); + + await Task.Delay(1); + } + }); + + // Simulate transport/network issue (with resolver reporting no addresses) + //transportFactory.Transports.ForEach(t => t.Disconnect()); + + try + { + for (int i = 0; i < 10_000; i++) + { + + var pickTask = clientChannel.PickAsync( + new PickContext { Request = new HttpRequestMessage() }, + waitForReady: true, + CancellationToken.None).AsTask(); + //resolver.UpdateAddresses(new List + //{ + // new BalancerAddress("localhost", 80) + //}); + + // Assert + // Should not timeout (deadlock) + await pickTask.DefaultTimeout(); + await Task.Delay(1); + } + + } + catch (Exception ex) + { + _ = ex; + Debugger.Launch(); + throw; + } + + await t; + } + [Test] public async Task PickAsync_ExecutionContext_DoesNotCaptureAsyncLocalsInConnect() {