diff --git a/CHANGELOG.md b/CHANGELOG.md index a88da1ee8..51f05d457 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,5 @@ +* Fixed goroutines leak within topic reader on network problems + ## v3.67.0 * Added `ydb.WithNodeAddressMutator` experimental option for mutate node addresses from `discovery.ListEndpoints` response * Added type assertion checks to enhance type safety and prevent unexpected panics in critical sections of the codebase diff --git a/internal/background/worker.go b/internal/background/worker.go index c41ad4b0a..7c1389b04 100644 --- a/internal/background/worker.go +++ b/internal/background/worker.go @@ -20,6 +20,7 @@ var ( // A Worker must not be copied after first use type Worker struct { ctx context.Context //nolint:containedctx + name string workers sync.WaitGroup closeReason error tasksCompleted empty.Chan @@ -32,8 +33,10 @@ type Worker struct { type CallbackFunc func(ctx context.Context) -func NewWorker(parent context.Context) *Worker { - w := Worker{} +func NewWorker(parent context.Context, name string) *Worker { + w := Worker{ + name: name, + } w.ctx, w.stop = xcontext.WithCancel(parent) return &w @@ -122,11 +125,14 @@ func (b *Worker) init() { } b.tasks = make(chan backgroundTask) b.tasksCompleted = make(empty.Chan) - go b.starterLoop() + + pprof.Do(b.ctx, pprof.Labels("worker-name", b.name), func(ctx context.Context) { + go b.starterLoop(ctx) + }) }) } -func (b *Worker) starterLoop() { +func (b *Worker) starterLoop(ctx context.Context) { defer close(b.tasksCompleted) for bgTask := range b.tasks { @@ -135,7 +141,7 @@ func (b *Worker) starterLoop() { go func(task backgroundTask) { defer b.workers.Done() - pprof.Do(b.ctx, pprof.Labels("background", task.name), task.callback) + pprof.Do(ctx, pprof.Labels("background", task.name), task.callback) }(bgTask) } } diff --git a/internal/background/worker_test.go b/internal/background/worker_test.go index 2a188fb00..c149da4fb 100644 --- a/internal/background/worker_test.go +++ b/internal/background/worker_test.go @@ -25,7 +25,7 @@ func TestWorkerContext(t *testing.T) { t.Run("Dedicated", func(t *testing.T) { type ctxkey struct{} ctx := context.WithValue(context.Background(), ctxkey{}, "2") - w := NewWorker(ctx) + w := NewWorker(ctx, "test-worker, "+t.Name()) require.Equal(t, "2", w.Context().Value(ctxkey{})) }) @@ -41,7 +41,7 @@ func TestWorkerContext(t *testing.T) { func TestWorkerStart(t *testing.T) { t.Run("Started", func(t *testing.T) { - w := NewWorker(xtest.Context(t)) + w := NewWorker(xtest.Context(t), "test-worker, "+t.Name()) started := make(empty.Chan) w.Start("test", func(ctx context.Context) { close(started) @@ -50,7 +50,7 @@ func TestWorkerStart(t *testing.T) { }) t.Run("Stopped", func(t *testing.T) { ctx := xtest.Context(t) - w := NewWorker(ctx) + w := NewWorker(ctx, "test-worker, "+t.Name()) _ = w.Close(ctx, nil) started := make(empty.Chan) @@ -72,7 +72,7 @@ func TestWorkerStart(t *testing.T) { func TestWorkerClose(t *testing.T) { t.Run("StopBackground", func(t *testing.T) { ctx := xtest.Context(t) - w := NewWorker(ctx) + w := NewWorker(ctx, "test-worker, "+t.Name()) started := make(empty.Chan) stopped := atomic.Bool{} @@ -89,7 +89,7 @@ func TestWorkerClose(t *testing.T) { t.Run("DoubleClose", func(t *testing.T) { ctx := xtest.Context(t) - w := NewWorker(ctx) + w := NewWorker(ctx, "test-worker, "+t.Name()) require.NoError(t, w.Close(ctx, nil)) require.Error(t, w.Close(ctx, nil)) }) @@ -104,7 +104,7 @@ func TestWorkerConcurrentStartAndClose(t *testing.T) { var counter atomic.Int64 ctx := xtest.Context(t) - w := NewWorker(ctx) + w := NewWorker(ctx, "test-worker, "+t.Name()) stopNewStarts := atomic.Bool{} var wgStarters sync.WaitGroup @@ -144,7 +144,7 @@ func TestWorkerConcurrentStartAndClose(t *testing.T) { func TestWorkerStartCompletedWhileLongWait(t *testing.T) { xtest.TestManyTimes(t, func(t testing.TB) { ctx := xtest.Context(t) - w := NewWorker(ctx) + w := NewWorker(ctx, "test-worker, "+t.Name()) allowStop := make(empty.Chan) closeStarted := make(empty.Chan) diff --git a/internal/topic/topicreaderinternal/committer.go b/internal/topic/topicreaderinternal/committer.go index f0fa1c425..67940df1d 100644 --- a/internal/topic/topicreaderinternal/committer.go +++ b/internal/topic/topicreaderinternal/committer.go @@ -3,6 +3,7 @@ package topicreaderinternal import ( "context" "errors" + "fmt" "sync/atomic" "time" @@ -52,16 +53,15 @@ type committer struct { commits CommitRanges } -func newCommitter(tracer *trace.Topic, lifeContext context.Context, mode PublicCommitMode, send sendMessageToServerFunc) *committer { //nolint:lll,revive +func newCommitterStopped(tracer *trace.Topic, lifeContext context.Context, mode PublicCommitMode, send sendMessageToServerFunc, readerID int64) *committer { //nolint:lll,revive res := &committer{ mode: mode, clock: clockwork.NewRealClock(), send: send, - backgroundWorker: *background.NewWorker(lifeContext), + backgroundWorker: *background.NewWorker(lifeContext, fmt.Sprintf("ydb-topic-reader-committer: %v", readerID)), tracer: tracer, } res.initChannels() - res.start() return res } @@ -70,7 +70,7 @@ func (c *committer) initChannels() { c.commitLoopSignal = make(empty.Chan, 1) } -func (c *committer) start() { +func (c *committer) Start() { c.backgroundWorker.Start("commit pusher", c.pushCommitsLoop) } diff --git a/internal/topic/topicreaderinternal/committer_test.go b/internal/topic/topicreaderinternal/committer_test.go index 7bf444d62..2c3b9d2af 100644 --- a/internal/topic/topicreaderinternal/committer_test.go +++ b/internal/topic/topicreaderinternal/committer_test.go @@ -377,11 +377,12 @@ func TestCommitterBuffer(t *testing.T) { } func newTestCommitter(ctx context.Context, t testing.TB) *committer { - res := newCommitter(&trace.Topic{}, ctx, CommitModeAsync, func(msg rawtopicreader.ClientMessage) error { + res := newCommitterStopped(&trace.Topic{}, ctx, CommitModeAsync, func(msg rawtopicreader.ClientMessage) error { return nil - }) + }, -1) + res.Start() t.Cleanup(func() { - if err := res.Close(ctx, errors.New("test comitter closed")); err != nil { + if err := res.Close(ctx, errors.New("test committer closed")); err != nil { require.ErrorIs(t, err, background.ErrAlreadyClosed) } }) diff --git a/internal/topic/topicreaderinternal/stream_reader_impl.go b/internal/topic/topicreaderinternal/stream_reader_impl.go index 8eb91e0ac..8e80575b9 100644 --- a/internal/topic/topicreaderinternal/stream_reader_impl.go +++ b/internal/topic/topicreaderinternal/stream_reader_impl.go @@ -123,7 +123,7 @@ func newTopicStreamReader( if err = reader.initSession(); err != nil { return nil, err } - if err = reader.startLoops(); err != nil { + if err = reader.startBackgroundWorkers(); err != nil { return nil, err } @@ -150,13 +150,17 @@ func newTopicStreamReaderStopped( stream: &syncedStream{stream: stream}, cancel: cancel, batcher: newBatcher(), - backgroundWorkers: *background.NewWorker(stopPump), readConnectionID: "preinitID-" + readerConnectionID.String(), readerID: readerID, rawMessagesFromBuffer: make(chan rawtopicreader.ServerMessage, 1), } - res.committer = newCommitter(cfg.Trace, labeledContext, cfg.CommitMode, res.send) + res.backgroundWorkers = *background.NewWorker(stopPump, fmt.Sprintf( + "topic-reader-stream-background: %v", + res.readerID, + )) + + res.committer = newCommitterStopped(cfg.Trace, labeledContext, cfg.CommitMode, res.send, res.readerID) res.committer.BufferTimeLagTrigger = cfg.CommitterBatchTimeLag res.committer.BufferCountTrigger = cfg.CommitterBatchCounterTrigger res.sessionController.init() @@ -413,11 +417,13 @@ func (r *topicStreamReaderImpl) send(msg rawtopicreader.ClientMessage) error { return err } -func (r *topicStreamReaderImpl) startLoops() error { +func (r *topicStreamReaderImpl) startBackgroundWorkers() error { if err := r.setStarted(); err != nil { return err } + r.committer.Start() + r.backgroundWorkers.Start("readMessagesLoop", r.readMessagesLoop) r.backgroundWorkers.Start("dataRequestLoop", r.dataRequestLoop) r.backgroundWorkers.Start("updateTokenLoop", r.updateTokenLoop) diff --git a/internal/topic/topicreaderinternal/stream_reader_impl_test.go b/internal/topic/topicreaderinternal/stream_reader_impl_test.go index 22fb2997a..b7ce7b9d0 100644 --- a/internal/topic/topicreaderinternal/stream_reader_impl_test.go +++ b/internal/topic/topicreaderinternal/stream_reader_impl_test.go @@ -1055,7 +1055,7 @@ func newTopicReaderTestEnv(t testing.TB) streamEnv { } func (e *streamEnv) Start() { - require.NoError(e.t, e.reader.startLoops()) + require.NoError(e.t, e.reader.startBackgroundWorkers()) xtest.SpinWaitCondition(e.t, nil, func() bool { return e.reader.restBufferSizeBytes.Load() == e.initialBufferSizeBytes }) diff --git a/internal/topic/topicreaderinternal/stream_reconnector.go b/internal/topic/topicreaderinternal/stream_reconnector.go index f43044624..403c81f93 100644 --- a/internal/topic/topicreaderinternal/stream_reconnector.go +++ b/internal/topic/topicreaderinternal/stream_reconnector.go @@ -15,7 +15,6 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/internal/empty" "github.com/ydb-platform/ydb-go-sdk/v3/internal/topic" "github.com/ydb-platform/ydb-go-sdk/v3/internal/value" - "github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xsync" "github.com/ydb-platform/ydb-go-sdk/v3/trace" @@ -24,6 +23,7 @@ import ( var ( errReconnectRequestOutdated = xerrors.Wrap(errors.New("ydb: reconnect request outdated")) errReconnect = xerrors.Wrap(errors.New("ydb: reconnect to topic grpc stream")) + errConnectionTimeout = xerrors.Wrap(errors.New("ydb: topic reader connection timeout for stream")) ) type readerConnectFunc func(ctx context.Context) (batchedStreamReader, error) @@ -33,6 +33,7 @@ type readerReconnector struct { clock clockwork.Clock retrySettings topic.RetrySettings streamVal batchedStreamReader + streamContextCancel context.CancelCauseFunc streamErr error closedErr error initErr error @@ -148,6 +149,7 @@ func (r *readerReconnector) CloseWithError(ctx context.Context, err error) error if r.streamVal != nil { streamCloseErr := r.streamVal.CloseWithError(ctx, xerrors.WithStackTrace(errReaderClosed)) + r.streamContextCancel(errReaderClosed) if closeErr == nil { closeErr = streamCloseErr } @@ -267,7 +269,7 @@ func (r *readerReconnector) reconnect(ctx context.Context, reason error, oldRead _ = oldReader.CloseWithError(ctx, xerrors.WithStackTrace(errReconnect)) } - newStream, err := r.connectWithTimeout() + newStream, newStreamClose, err := r.connectWithTimeout() if r.isRetriableError(err) { go func(reason error) { @@ -281,6 +283,7 @@ func (r *readerReconnector) reconnect(ctx context.Context, reason error, oldRead r.streamErr = err if err == nil { r.streamVal = newStream + r.streamContextCancel = newStreamClose if !r.initDone { r.initDone = true close(r.initDoneCh) @@ -304,14 +307,14 @@ func (r *readerReconnector) checkErrRetryMode(err error, retriesDuration time.Du return topic.CheckRetryMode(err, r.retrySettings, retriesDuration) } -func (r *readerReconnector) connectWithTimeout() (_ batchedStreamReader, err error) { +func (r *readerReconnector) connectWithTimeout() (_ batchedStreamReader, _ context.CancelCauseFunc, err error) { bgContext := r.background.Context() if err = bgContext.Err(); err != nil { - return nil, err + return nil, nil, err } - connectionContext, cancel := xcontext.WithCancel(context.Background()) + connectionContext, cancel := context.WithCancelCause(context.WithoutCancel(bgContext)) type connectResult struct { stream batchedStreamReader @@ -332,17 +335,17 @@ func (r *readerReconnector) connectWithTimeout() (_ batchedStreamReader, err err case <-connectionTimoutTimer.Chan(): // cancel connection context only if timeout exceed while connection // because if cancel context after connect - it will break - cancel() + cancel(xerrors.WithStackTrace(errConnectionTimeout)) res = <-result case res = <-result: // pass } if res.err == nil { - return res.stream, nil + return res.stream, cancel, nil } - return nil, res.err + return nil, nil, res.err } func (r *readerReconnector) WaitInit(ctx context.Context) error { diff --git a/internal/topic/topicreaderinternal/stream_reconnector_test.go b/internal/topic/topicreaderinternal/stream_reconnector_test.go index d9034066e..6583014da 100644 --- a/internal/topic/topicreaderinternal/stream_reconnector_test.go +++ b/internal/topic/topicreaderinternal/stream_reconnector_test.go @@ -34,8 +34,9 @@ func TestTopicReaderReconnectorReadMessageBatch(t *testing.T) { baseReader.EXPECT().ReadMessageBatch(gomock.Any(), opts).Return(batch, nil) reader := &readerReconnector{ - streamVal: baseReader, - tracer: &trace.Topic{}, + streamVal: baseReader, + streamContextCancel: func(cause error) {}, + tracer: &trace.Topic{}, } reader.initChannelsAndClock() res, err := reader.ReadMessageBatch(context.Background(), opts) @@ -163,7 +164,11 @@ func TestTopicReaderReconnectorCommit(t *testing.T) { require.Equal(t, "v", ctx.Value(k{})) require.Equal(t, expectedCommitRange, offset) }) - reconnector := &readerReconnector{streamVal: stream, tracer: &trace.Topic{}} + reconnector := &readerReconnector{ + streamVal: stream, + streamContextCancel: func(cause error) {}, + tracer: &trace.Topic{}, + } reconnector.initChannelsAndClock() require.NoError(t, reconnector.Commit(ctx, expectedCommitRange)) }) @@ -174,7 +179,11 @@ func TestTopicReaderReconnectorCommit(t *testing.T) { require.Equal(t, "v", ctx.Value(k{})) require.Equal(t, expectedCommitRange, offset) }).Return(testErr) - reconnector := &readerReconnector{streamVal: stream, tracer: &trace.Topic{}} + reconnector := &readerReconnector{ + streamVal: stream, + streamContextCancel: func(cause error) {}, + tracer: &trace.Topic{}, + } reconnector.initChannelsAndClock() require.ErrorIs(t, reconnector.Commit(ctx, expectedCommitRange), testErr) }) @@ -209,7 +218,7 @@ func TestTopicReaderReconnectorConnectionLoop(t *testing.T) { reconnector := &readerReconnector{ connectTimeout: value.InfiniteDuration, - background: *background.NewWorker(ctx), + background: *background.NewWorker(ctx, "test-worker, "+t.Name()), tracer: &trace.Topic{}, } reconnector.initChannelsAndClock() diff --git a/internal/topic/topicwriterinternal/writer_options.go b/internal/topic/topicwriterinternal/writer_options.go index 5c88bdbb3..215ff46d3 100644 --- a/internal/topic/topicwriterinternal/writer_options.go +++ b/internal/topic/topicwriterinternal/writer_options.go @@ -3,6 +3,8 @@ package topicwriterinternal import ( "time" + "github.com/jonboulle/clockwork" + "github.com/ydb-platform/ydb-go-sdk/v3/credentials" "github.com/ydb-platform/ydb-go-sdk/v3/internal/config" "github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopiccommon" @@ -152,3 +154,10 @@ func WithTopic(topic string) PublicWriterOption { cfg.topic = topic } } + +// WithClock is private option for tests +func WithClock(clock clockwork.Clock) PublicWriterOption { + return func(cfg *WriterReconnectorConfig) { + cfg.clock = clock + } +} diff --git a/internal/topic/topicwriterinternal/writer_reconnector.go b/internal/topic/topicwriterinternal/writer_reconnector.go index 66d646291..01ce7433c 100644 --- a/internal/topic/topicwriterinternal/writer_reconnector.go +++ b/internal/topic/topicwriterinternal/writer_reconnector.go @@ -118,7 +118,6 @@ type WriterReconnector struct { queue messageQueue background background.Worker retrySettings topic.RetrySettings - clock clockwork.Clock writerInstanceID string sessionID string semaphore *semaphore.Weighted @@ -149,7 +148,6 @@ func newWriterReconnectorStopped( cfg: cfg, semaphore: semaphore.NewWeighted(int64(cfg.MaxQueueLen)), queue: newMessageQueue(), - clock: clockwork.NewRealClock(), lastSeqNo: -1, firstInitResponseProcessedChan: make(empty.Chan), encodersMap: NewEncoderMap(), @@ -189,7 +187,7 @@ func (w *WriterReconnector) fillFields(messages []messageWithDataContent) error if w.cfg.AutoSetCreatedTime { if msg.CreatedAt.IsZero() { if now.IsZero() { - now = w.clock.Now() + now = w.cfg.clock.Now() } msg.CreatedAt = now } else { @@ -391,17 +389,17 @@ func (w *WriterReconnector) connectionLoop(ctx context.Context) { now := time.Now() if startOfRetries.IsZero() || topic.CheckResetReconnectionCounters(prevAttemptTime, now, w.cfg.connectTimeout) { attempt = 0 - startOfRetries = w.clock.Now() + startOfRetries = w.cfg.clock.Now() } else { attempt++ } prevAttemptTime = now if reconnectReason != nil { - retryDuration := w.clock.Since(startOfRetries) + retryDuration := w.cfg.clock.Since(startOfRetries) if backoff, retry := topic.CheckRetryMode(reconnectReason, w.retrySettings, retryDuration); retry { delay := backoff.Delay(attempt) - delayTimer := w.clock.NewTimer(delay) + delayTimer := w.cfg.clock.NewTimer(delay) select { case <-doneCtx: delayTimer.Stop() diff --git a/internal/topic/topicwriterinternal/writer_reconnector_test.go b/internal/topic/topicwriterinternal/writer_reconnector_test.go index 70630d74d..1675fedbb 100644 --- a/internal/topic/topicwriterinternal/writer_reconnector_test.go +++ b/internal/topic/topicwriterinternal/writer_reconnector_test.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "io" + "math" "sort" "sync" "sync/atomic" @@ -426,7 +427,7 @@ func TestWriterImpl_Reconnect(t *testing.T) { xtest.TestManyTimesWithName(t, "ReconnectOnErrors", func(t testing.TB) { ctx := xtest.Context(t) - w := newTestWriterStopped() + w := newTestWriterStopped(WithClock(xtest.FastClock(t)), WithTokenUpdateInterval(time.Duration(math.MaxInt64))) mc := gomock.NewController(t) diff --git a/internal/topic/topicwriterinternal/writer_single_stream.go b/internal/topic/topicwriterinternal/writer_single_stream.go index 4f01c56d1..4643ad3dc 100644 --- a/internal/topic/topicwriterinternal/writer_single_stream.go +++ b/internal/topic/topicwriterinternal/writer_single_stream.go @@ -82,8 +82,11 @@ func newSingleStreamWriterStopped( cfg SingleStreamWriterConfig, //nolint:gocritic ) *SingleStreamWriter { return &SingleStreamWriter{ - cfg: cfg, - background: *background.NewWorker(xcontext.ValueOnly(ctxForPProfLabelsOnly)), + cfg: cfg, + background: *background.NewWorker(xcontext.ValueOnly(ctxForPProfLabelsOnly), fmt.Sprintf( + "ydb-topic-stream-writer-background: %v", + cfg.reconnectorInstanceID, + )), closeCompleted: make(empty.Chan), } } diff --git a/internal/xtest/clock.go b/internal/xtest/clock.go new file mode 100644 index 000000000..2cc2b9a6d --- /dev/null +++ b/internal/xtest/clock.go @@ -0,0 +1,37 @@ +package xtest + +import ( + "sync/atomic" + "testing" + "time" + + "github.com/jonboulle/clockwork" +) + +// FastClock returns fake clock with very fast time speed advanced until end of test +// the clock stops advance at end of test +func FastClock(t testing.TB) clockwork.FakeClock { + clock := clockwork.NewFakeClock() + var needStop atomic.Bool + clockStopped := make(chan struct{}) + + go func() { + defer close(clockStopped) + + for { + if needStop.Load() { + return + } + + clock.Advance(time.Second) + time.Sleep(time.Microsecond) + } + }() + + t.Cleanup(func() { + needStop.Store(true) + <-clockStopped + }) + + return clock +} diff --git a/internal/xtest/manytimes.go b/internal/xtest/manytimes.go index 4dc7f1ccb..9d021eb23 100644 --- a/internal/xtest/manytimes.go +++ b/internal/xtest/manytimes.go @@ -38,10 +38,11 @@ func TestManyTimes(t testing.TB, test TestFunc, opts ...TestManyTimesOption) { } start := time.Now() + var testMutex sync.Mutex for { testCounter++ // run test, then check stopAfter for guarantee run test least once - runTest(t, test) + runTest(t, test, &testMutex) if time.Since(start) > options.stopAfter || t.Failed() { return @@ -60,11 +61,12 @@ func TestManyTimesWithName(t *testing.T, name string, test TestFunc) { type TestFunc func(t testing.TB) -func runTest(t testing.TB, test TestFunc) { +func runTest(t testing.TB, test TestFunc, testMutex *sync.Mutex) { t.Helper() tw := &testWrapper{ TB: t, + m: testMutex, } defer tw.doCleanup() @@ -75,7 +77,7 @@ func runTest(t testing.TB, test TestFunc) { type testWrapper struct { testing.TB - m sync.Mutex + m *sync.Mutex logs []logRecord cleanup []func() }