From 1260faf6ef85321a5418b12b255f5cfb804461b9 Mon Sep 17 00:00:00 2001 From: "Kevin N." <6809505+kevinnoel-be@users.noreply.github.com> Date: Wed, 4 Dec 2024 02:20:49 +0100 Subject: [PATCH] [chore][exporter/googlecloudpubsub] Fix goroutines leak (#36591) #### Description Fixes goroutines leak by properly closing the underlying gRPC client which is only when we're using an insecure custom endpoint. Enables goleak in tests. #### Link to tracking issue Related to https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/30438 --- .chloggen/fix-goleak-gcppubsubexporter.yaml | 8 ++ .../googlecloudpubsubexporter/exporter.go | 38 ++---- .../exporter_test.go | 27 ---- .../generated_package_test.go | 6 +- exporter/googlecloudpubsubexporter/go.mod | 3 +- .../googlecloudpubsubexporter/metadata.yaml | 5 +- .../publisher_client.go | 82 ++++++++++++ .../publisher_client_test.go | 126 ++++++++++++++++++ .../testdata/gcp-fake-creds.json | 9 ++ 9 files changed, 242 insertions(+), 62 deletions(-) create mode 100644 .chloggen/fix-goleak-gcppubsubexporter.yaml create mode 100644 exporter/googlecloudpubsubexporter/publisher_client.go create mode 100644 exporter/googlecloudpubsubexporter/publisher_client_test.go create mode 100644 exporter/googlecloudpubsubexporter/testdata/gcp-fake-creds.json diff --git a/.chloggen/fix-goleak-gcppubsubexporter.yaml b/.chloggen/fix-goleak-gcppubsubexporter.yaml new file mode 100644 index 000000000000..9ba9a7d5a5ff --- /dev/null +++ b/.chloggen/fix-goleak-gcppubsubexporter.yaml @@ -0,0 +1,8 @@ +change_type: bug_fix +component: googlecloudpubsubexporter +note: Fix a goroutine leak during shutdown. +issues: [30438] +subtext: | + A goroutine leak was found in the googlecloudpubsubexporter. + The goroutine leak was caused by the exporter not closing the underlying created gRPC client when using an insecure custom endpoint. +change_logs: [] diff --git a/exporter/googlecloudpubsubexporter/exporter.go b/exporter/googlecloudpubsubexporter/exporter.go index c3c5a0ddacfb..420f98fc70f3 100644 --- a/exporter/googlecloudpubsubexporter/exporter.go +++ b/exporter/googlecloudpubsubexporter/exporter.go @@ -10,7 +10,6 @@ import ( "fmt" "time" - pubsub "cloud.google.com/go/pubsub/apiv1" "cloud.google.com/go/pubsub/apiv1/pubsubpb" "github.com/google/uuid" "go.opentelemetry.io/collector/component" @@ -18,16 +17,13 @@ import ( "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" "go.uber.org/zap" - "google.golang.org/api/option" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" ) const name = "googlecloudpubsub" type pubsubExporter struct { logger *zap.Logger - client *pubsub.PublisherClient + client publisherClient cancel context.CancelFunc userAgent string ceSource string @@ -71,8 +67,7 @@ func (ex *pubsubExporter) start(ctx context.Context, _ component.Host) error { ctx, ex.cancel = context.WithCancel(ctx) if ex.client == nil { - copts := ex.generateClientOptions() - client, err := pubsub.NewPublisherClient(ctx, copts...) + client, err := newPublisherClient(ctx, ex.config, ex.userAgent) if err != nil { return fmt.Errorf("failed creating the gRPC client to Pubsub: %w", err) } @@ -82,31 +77,14 @@ func (ex *pubsubExporter) start(ctx context.Context, _ component.Host) error { return nil } -func (ex *pubsubExporter) shutdown(context.Context) error { - if ex.client != nil { - ex.client.Close() - ex.client = nil +func (ex *pubsubExporter) shutdown(_ context.Context) error { + if ex.client == nil { + return nil } - return nil -} -func (ex *pubsubExporter) generateClientOptions() (copts []option.ClientOption) { - if ex.userAgent != "" { - copts = append(copts, option.WithUserAgent(ex.userAgent)) - } - if ex.config.Endpoint != "" { - if ex.config.Insecure { - var dialOpts []grpc.DialOption - if ex.userAgent != "" { - dialOpts = append(dialOpts, grpc.WithUserAgent(ex.userAgent)) - } - conn, _ := grpc.NewClient(ex.config.Endpoint, append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))...) - copts = append(copts, option.WithGRPCConn(conn)) - } else { - copts = append(copts, option.WithEndpoint(ex.config.Endpoint)) - } - } - return copts + client := ex.client + ex.client = nil + return client.Close() } func (ex *pubsubExporter) publishMessage(ctx context.Context, encoding encoding, data []byte, watermark time.Time) error { diff --git a/exporter/googlecloudpubsubexporter/exporter_test.go b/exporter/googlecloudpubsubexporter/exporter_test.go index 44b057c51a38..54565cddf203 100644 --- a/exporter/googlecloudpubsubexporter/exporter_test.go +++ b/exporter/googlecloudpubsubexporter/exporter_test.go @@ -16,7 +16,6 @@ import ( "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" - "google.golang.org/api/option" ) func TestName(t *testing.T) { @@ -24,32 +23,6 @@ func TestName(t *testing.T) { assert.Equal(t, "googlecloudpubsub", exporter.Name()) } -func TestGenerateClientOptions(t *testing.T) { - // Start a fake server running locally. - srv := pstest.NewServer() - defer srv.Close() - factory := NewFactory() - cfg := factory.CreateDefaultConfig() - exporterConfig := cfg.(*Config) - exporterConfig.Endpoint = srv.Addr - exporterConfig.UserAgent = "test-user-agent" - exporterConfig.Insecure = true - exporterConfig.ProjectID = "my-project" - exporterConfig.Topic = "projects/my-project/topics/otlp" - exporterConfig.TimeoutSettings = exporterhelper.TimeoutConfig{ - Timeout: 12 * time.Second, - } - exporter := ensureExporter(exportertest.NewNopSettings(), exporterConfig) - - options := exporter.generateClientOptions() - assert.Equal(t, option.WithUserAgent("test-user-agent"), options[0]) - - exporter.config.Insecure = false - options = exporter.generateClientOptions() - assert.Equal(t, option.WithUserAgent("test-user-agent"), options[0]) - assert.Equal(t, option.WithEndpoint(srv.Addr), options[1]) -} - func TestExporterDefaultSettings(t *testing.T) { ctx := context.Background() // Start a fake server running locally. diff --git a/exporter/googlecloudpubsubexporter/generated_package_test.go b/exporter/googlecloudpubsubexporter/generated_package_test.go index 4ef5c9f0b7f2..d2986bed895b 100644 --- a/exporter/googlecloudpubsubexporter/generated_package_test.go +++ b/exporter/googlecloudpubsubexporter/generated_package_test.go @@ -3,11 +3,11 @@ package googlecloudpubsubexporter import ( - "os" "testing" + + "go.uber.org/goleak" ) func TestMain(m *testing.M) { - // skipping goleak test as per metadata.yml configuration - os.Exit(m.Run()) + goleak.VerifyTestMain(m, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start")) } diff --git a/exporter/googlecloudpubsubexporter/go.mod b/exporter/googlecloudpubsubexporter/go.mod index 37c8639e6169..ed1dd321e481 100644 --- a/exporter/googlecloudpubsubexporter/go.mod +++ b/exporter/googlecloudpubsubexporter/go.mod @@ -5,6 +5,7 @@ go 1.22.0 require ( cloud.google.com/go/pubsub v1.45.1 github.com/google/uuid v1.6.0 + github.com/googleapis/gax-go/v2 v2.13.0 github.com/stretchr/testify v1.10.0 go.opentelemetry.io/collector/component v0.114.1-0.20241202231142-b9ff1bc54c99 go.opentelemetry.io/collector/component/componenttest v0.114.1-0.20241202231142-b9ff1bc54c99 @@ -14,6 +15,7 @@ require ( go.opentelemetry.io/collector/exporter v0.114.1-0.20241202231142-b9ff1bc54c99 go.opentelemetry.io/collector/exporter/exportertest v0.114.1-0.20241202231142-b9ff1bc54c99 go.opentelemetry.io/collector/pdata v1.20.1-0.20241202231142-b9ff1bc54c99 + go.uber.org/goleak v1.3.0 go.uber.org/zap v1.27.0 google.golang.org/api v0.205.0 google.golang.org/grpc v1.67.1 @@ -36,7 +38,6 @@ require ( github.com/google/go-cmp v0.6.0 // indirect github.com/google/s2a-go v0.1.8 // indirect github.com/googleapis/enterprise-certificate-proxy v0.3.4 // indirect - github.com/googleapis/gax-go/v2 v2.13.0 // indirect github.com/hashicorp/go-version v1.7.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/knadh/koanf/maps v0.1.1 // indirect diff --git a/exporter/googlecloudpubsubexporter/metadata.yaml b/exporter/googlecloudpubsubexporter/metadata.yaml index d9aceefafc8c..49a90e104b92 100644 --- a/exporter/googlecloudpubsubexporter/metadata.yaml +++ b/exporter/googlecloudpubsubexporter/metadata.yaml @@ -12,4 +12,7 @@ status: tests: skip_lifecycle: true goleak: - skip: true + ignore: + top: + # See https://github.com/census-instrumentation/opencensus-go/issues/1191 for more information. + - "go.opencensus.io/stats/view.(*worker).start" diff --git a/exporter/googlecloudpubsubexporter/publisher_client.go b/exporter/googlecloudpubsubexporter/publisher_client.go new file mode 100644 index 000000000000..959136ef5649 --- /dev/null +++ b/exporter/googlecloudpubsubexporter/publisher_client.go @@ -0,0 +1,82 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package googlecloudpubsubexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/googlecloudpubsubexporter" + +import ( + "context" + "fmt" + + pubsub "cloud.google.com/go/pubsub/apiv1" + "cloud.google.com/go/pubsub/apiv1/pubsubpb" + "github.com/googleapis/gax-go/v2" + "google.golang.org/api/option" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +// publisherClient subset of `pubsub.PublisherClient` +type publisherClient interface { + Close() error + Publish(ctx context.Context, req *pubsubpb.PublishRequest, opts ...gax.CallOption) (*pubsubpb.PublishResponse, error) +} + +// wrappedPublisherClient allows to override the close function +type wrappedPublisherClient struct { + publisherClient + closeFn func() error +} + +func (c *wrappedPublisherClient) Close() error { + if c.closeFn != nil { + return c.closeFn() + } + return c.publisherClient.Close() +} + +func newPublisherClient(ctx context.Context, config *Config, userAgent string) (publisherClient, error) { + clientOptions, closeFn, err := generateClientOptions(config, userAgent) + if err != nil { + return nil, fmt.Errorf("failed preparing the gRPC client options to PubSub: %w", err) + } + + client, err := pubsub.NewPublisherClient(ctx, clientOptions...) + if err != nil { + return nil, fmt.Errorf("failed creating the gRPC client to PubSub: %w", err) + } + + if closeFn == nil { + return client, nil + } + + return &wrappedPublisherClient{ + publisherClient: client, + closeFn: closeFn, + }, nil +} + +func generateClientOptions(config *Config, userAgent string) ([]option.ClientOption, func() error, error) { + var copts []option.ClientOption + var closeFn func() error + + if userAgent != "" { + copts = append(copts, option.WithUserAgent(userAgent)) + } + if config.Endpoint != "" { + if config.Insecure { + var dialOpts []grpc.DialOption + if userAgent != "" { + dialOpts = append(dialOpts, grpc.WithUserAgent(userAgent)) + } + client, err := grpc.NewClient(config.Endpoint, append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))...) + if err != nil { + return nil, nil, err + } + copts = append(copts, option.WithGRPCConn(client)) + closeFn = client.Close // we need to be able to properly close the grpc client otherwise it'll leak goroutines + } else { + copts = append(copts, option.WithEndpoint(config.Endpoint)) + } + } + return copts, closeFn, nil +} diff --git a/exporter/googlecloudpubsubexporter/publisher_client_test.go b/exporter/googlecloudpubsubexporter/publisher_client_test.go new file mode 100644 index 000000000000..ff3370a5f264 --- /dev/null +++ b/exporter/googlecloudpubsubexporter/publisher_client_test.go @@ -0,0 +1,126 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package googlecloudpubsubexporter + +import ( + "context" + "testing" + + pubsub "cloud.google.com/go/pubsub/apiv1" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/api/option" +) + +func TestGenerateClientOptions(t *testing.T) { + factory := NewFactory() + + t.Run("defaults", func(t *testing.T) { + cfg := factory.CreateDefaultConfig().(*Config) + cfg.ProjectID = "my-project" + cfg.Topic = "projects/my-project/topics/otlp" + + require.NoError(t, cfg.Validate()) + + gotOptions, closeConnFn, err := generateClientOptions(cfg, "test-user-agent 6789") + assert.NoError(t, err) + assert.Empty(t, closeConnFn) + + expectedOptions := []option.ClientOption{ + option.WithUserAgent("test-user-agent 6789"), + } + assert.ElementsMatch(t, expectedOptions, gotOptions) + }) + + t.Run("secure custom endpoint", func(t *testing.T) { + cfg := factory.CreateDefaultConfig().(*Config) + cfg.ProjectID = "my-project" + cfg.Topic = "projects/my-project/topics/otlp" + cfg.Endpoint = "defg" + + require.NoError(t, cfg.Validate()) + + gotOptions, closeConnFn, err := generateClientOptions(cfg, "test-user-agent 4321") + assert.NoError(t, err) + assert.Empty(t, closeConnFn) + + expectedOptions := []option.ClientOption{ + option.WithUserAgent("test-user-agent 4321"), + option.WithEndpoint("defg"), + } + assert.ElementsMatch(t, expectedOptions, gotOptions) + }) + + t.Run("insecure endpoint", func(t *testing.T) { + cfg := factory.CreateDefaultConfig().(*Config) + cfg.ProjectID = "my-project" + cfg.Topic = "projects/my-project/topics/otlp" + cfg.Endpoint = "abcd" + cfg.Insecure = true + + require.NoError(t, cfg.Validate()) + + gotOptions, closeConnFn, err := generateClientOptions(cfg, "test-user-agent 1234") + assert.NoError(t, err) + assert.NotEmpty(t, closeConnFn) + assert.NoError(t, closeConnFn()) + + require.Len(t, gotOptions, 2) + assert.Equal(t, option.WithUserAgent("test-user-agent 1234"), gotOptions[0]) + assert.IsType(t, option.WithGRPCConn(nil), gotOptions[1]) + }) +} + +func TestNewPublisherClient(t *testing.T) { + // The publisher client checks for credentials during init + t.Setenv("GOOGLE_APPLICATION_CREDENTIALS", "testdata/gcp-fake-creds.json") + + ctx := context.Background() + factory := NewFactory() + + t.Run("defaults", func(t *testing.T) { + cfg := factory.CreateDefaultConfig().(*Config) + cfg.ProjectID = "my-project" + cfg.Topic = "projects/my-project/topics/otlp" + + require.NoError(t, cfg.Validate()) + + client, err := newPublisherClient(ctx, cfg, "test-user-agent 6789") + assert.NoError(t, err) + require.NotEmpty(t, client) + assert.IsType(t, &pubsub.PublisherClient{}, client) + assert.NoError(t, client.Close()) + }) + + t.Run("secure custom endpoint", func(t *testing.T) { + cfg := factory.CreateDefaultConfig().(*Config) + cfg.ProjectID = "my-project" + cfg.Topic = "projects/my-project/topics/otlp" + cfg.Endpoint = "xyz" + + require.NoError(t, cfg.Validate()) + + client, err := newPublisherClient(ctx, cfg, "test-user-agent 6789") + assert.NoError(t, err) + require.NotEmpty(t, client) + assert.IsType(t, &pubsub.PublisherClient{}, client) + assert.NoError(t, client.Close()) + }) + + t.Run("insecure endpoint", func(t *testing.T) { + cfg := factory.CreateDefaultConfig().(*Config) + cfg.ProjectID = "my-project" + cfg.Topic = "projects/my-project/topics/otlp" + cfg.Endpoint = "abc" + cfg.Insecure = true + + require.NoError(t, cfg.Validate()) + + client, err := newPublisherClient(ctx, cfg, "test-user-agent 6789") + assert.NoError(t, err) + require.NotEmpty(t, client) + assert.IsType(t, &wrappedPublisherClient{}, client) + assert.NoError(t, client.Close()) + }) +} diff --git a/exporter/googlecloudpubsubexporter/testdata/gcp-fake-creds.json b/exporter/googlecloudpubsubexporter/testdata/gcp-fake-creds.json new file mode 100644 index 000000000000..f3eac606f048 --- /dev/null +++ b/exporter/googlecloudpubsubexporter/testdata/gcp-fake-creds.json @@ -0,0 +1,9 @@ +{ + "type": "service_account", + "private_key_id": "abc", + "private_key": "-----BEGIN PRIVATE KEY-----\nMIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQDY3E8o1NEFcjMM\nHW/5ZfFJw29/8NEqpViNjQIx95Xx5KDtJ+nWn9+OW0uqsSqKlKGhAdAo+Q6bjx2c\nuXVsXTu7XrZUY5Kltvj94DvUa1wjNXs606r/RxWTJ58bfdC+gLLxBfGnB6CwK0YQ\nxnfpjNbkUfVVzO0MQD7UP0Hl5ZcY0Puvxd/yHuONQn/rIAieTHH1pqgW+zrH/y3c\n59IGThC9PPtugI9ea8RSnVj3PWz1bX2UkCDpy9IRh9LzJLaYYX9RUd7++dULUlat\nAaXBh1U6emUDzhrIsgApjDVtimOPbmQWmX1S60mqQikRpVYZ8u+NDD+LNw+/Eovn\nxCj2Y3z1AgMBAAECggEAWDBzoqO1IvVXjBA2lqId10T6hXmN3j1ifyH+aAqK+FVl\nGjyWjDj0xWQcJ9ync7bQ6fSeTeNGzP0M6kzDU1+w6FgyZqwdmXWI2VmEizRjwk+/\n/uLQUcL7I55Dxn7KUoZs/rZPmQDxmGLoue60Gg6z3yLzVcKiDc7cnhzhdBgDc8vd\nQorNAlqGPRnm3EqKQ6VQp6fyQmCAxrr45kspRXNLddat3AMsuqImDkqGKBmF3Q1y\nxWGe81LphUiRqvqbyUlh6cdSZ8pLBpc9m0c3qWPKs9paqBIvgUPlvOZMqec6x4S6\nChbdkkTRLnbsRr0Yg/nDeEPlkhRBhasXpxpMUBgPywKBgQDs2axNkFjbU94uXvd5\nznUhDVxPFBuxyUHtsJNqW4p/ujLNimGet5E/YthCnQeC2P3Ym7c3fiz68amM6hiA\nOnW7HYPZ+jKFnefpAtjyOOs46AkftEg07T9XjwWNPt8+8l0DYawPoJgbM5iE0L2O\nx8TU1Vs4mXc+ql9F90GzI0x3VwKBgQDqZOOqWw3hTnNT07Ixqnmd3dugV9S7eW6o\nU9OoUgJB4rYTpG+yFqNqbRT8bkx37iKBMEReppqonOqGm4wtuRR6LSLlgcIU9Iwx\nyfH12UWqVmFSHsgZFqM/cK3wGev38h1WBIOx3/djKn7BdlKVh8kWyx6uC8bmV+E6\nOoK0vJD6kwKBgHAySOnROBZlqzkiKW8c+uU2VATtzJSydrWm0J4wUPJifNBa/hVW\ndcqmAzXC9xznt5AVa3wxHBOfyKaE+ig8CSsjNyNZ3vbmr0X04FoV1m91k2TeXNod\njMTobkPThaNm4eLJMN2SQJuaHGTGERWC0l3T18t+/zrDMDCPiSLX1NAvAoGBAN1T\nVLJYdjvIMxf1bm59VYcepbK7HLHFkRq6xMJMZbtG0ryraZjUzYvB4q4VjHk2UDiC\nlhx13tXWDZH7MJtABzjyg+AI7XWSEQs2cBXACos0M4Myc6lU+eL+iA+OuoUOhmrh\nqmT8YYGu76/IBWUSqWuvcpHPpwl7871i4Ga/I3qnAoGBANNkKAcMoeAbJQK7a/Rn\nwPEJB+dPgNDIaboAsh1nZhVhN5cvdvCWuEYgOGCPQLYQF0zmTLcM+sVxOYgfy8mV\nfbNgPgsP5xmu6dw2COBKdtozw0HrWSRjACd1N4yGu75+wPCcX/gQarcjRcXXZeEa\nNtBLSfcqPULqD+h7br9lEJio\n-----END PRIVATE KEY-----\n", + "client_email": "123-abc@developer.gserviceaccount.com", + "client_id": "123-abc.apps.googleusercontent.com", + "auth_uri": "https://accounts.google.com/o/oauth2/auth", + "token_uri": "http://localhost:8080/token" +} \ No newline at end of file