diff --git a/pkg/remote/trans/nphttp2/client_conn.go b/pkg/remote/trans/nphttp2/client_conn.go index 34929daa2a..f61f10c5c7 100644 --- a/pkg/remote/trans/nphttp2/client_conn.go +++ b/pkg/remote/trans/nphttp2/client_conn.go @@ -25,6 +25,7 @@ import ( "strings" "time" + "github.com/cloudwego/kitex/pkg/gofunc" "github.com/cloudwego/kitex/pkg/remote" "github.com/cloudwego/kitex/pkg/remote/trans/nphttp2/codes" "github.com/cloudwego/kitex/pkg/remote/trans/nphttp2/grpc" @@ -100,6 +101,32 @@ func newClientConn(ctx context.Context, tr grpc.ClientTransport, addr string) (* if err != nil { return nil, err } + // gRPC unary do not need to monitor the stream ctx and transport ctx + // since it must invoke stream.Recv which would inspect the stream.ctx + if isStreaming { + gofunc.GoFunc(ctx, func() { + sCtx := s.Context() + select { + // For these scenarios, stream.ctx would be canceled: + // 1. user invoke cancel() + // 2. parent stream is done + case <-sCtx.Done(): + tr.CloseStream(s, grpc.ContextErr(sCtx.Err())) + return + // when http2Client.closeStream is called, stream.Done() would be closed. + // Important: http2Client.closeStream would not lead to stream.ctx canceled. + case <-s.Done(): + // since stream is closed, we just exit without doing anything + return + // For now, t.ctx would not be canceled. + // Pls check pkg/remote/trans/nphttp2/conn_pool for details. + case <-tr.Error(): + tr.CloseStream(s, grpc.ErrConnClosing) + return + } + }) + } + return &clientConn{ tr: tr, s: s,