diff --git a/.golangci.yml b/.golangci.yml index e2100ce23..102d0e3ed 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -218,7 +218,6 @@ linters-settings: linters: enable-all: true disable: - - containedctx - contextcheck - cyclop - depguard diff --git a/CHANGELOG.md b/CHANGELOG.md index a24369100..ac6c9f81d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,5 @@ +* Refactored internals for enabling `containedctx` linter + ## v3.65.3 * Fixed data race in `internal/conn.grpcClientStream` diff --git a/driver.go b/driver.go index 592466443..170e2f86d 100644 --- a/driver.go +++ b/driver.go @@ -51,7 +51,6 @@ var _ Connection = (*Driver)(nil) // Driver type provide access to YDB service clients type Driver struct { - ctx context.Context // cancel while Driver.Close called. ctxCancel context.CancelFunc userInfo *dsn.UserInfo @@ -311,7 +310,6 @@ func newConnectionFromOptions(ctx context.Context, opts ...Option) (_ *Driver, e d := &Driver{ children: make(map[uint64]*Driver), - ctx: ctx, ctxCancel: driverCtxCancel, } diff --git a/internal/background/worker.go b/internal/background/worker.go index 91c0d1686..c41ad4b0a 100644 --- a/internal/background/worker.go +++ b/internal/background/worker.go @@ -19,7 +19,7 @@ var ( // A Worker must not be copied after first use type Worker struct { - ctx context.Context + ctx context.Context //nolint:containedctx workers sync.WaitGroup closeReason error tasksCompleted empty.Chan diff --git a/internal/conn/conn.go b/internal/conn/conn.go index 8192e38e7..d7e05bd42 100644 --- a/internal/conn/conn.go +++ b/internal/conn/conn.go @@ -462,7 +462,6 @@ func (c *conn) NewStream( return &grpcClientStream{ ClientStream: s, - ctx: ctx, c: c, wrapping: useWrapping, traceID: traceID, diff --git a/internal/conn/grpc_client_stream.go b/internal/conn/grpc_client_stream.go index 96ab744fc..ea7c08cfe 100644 --- a/internal/conn/grpc_client_stream.go +++ b/internal/conn/grpc_client_stream.go @@ -16,7 +16,6 @@ import ( type grpcClientStream struct { grpc.ClientStream - ctx context.Context c *conn wrapping bool traceID string @@ -25,9 +24,11 @@ type grpcClientStream struct { } func (s *grpcClientStream) CloseSend() (err error) { - ctx := s.ctx - onDone := trace.DriverOnConnStreamCloseSend(s.c.config.Trace(), &ctx, - stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/conn.(*grpcClientStream).CloseSend"), + var ( + ctx = s.Context() + onDone = trace.DriverOnConnStreamCloseSend(s.c.config.Trace(), &ctx, + stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/conn.(*grpcClientStream).CloseSend"), + ) ) defer func() { onDone(err) @@ -60,9 +61,11 @@ func (s *grpcClientStream) CloseSend() (err error) { } func (s *grpcClientStream) SendMsg(m interface{}) (err error) { - ctx := s.ctx - onDone := trace.DriverOnConnStreamSendMsg(s.c.config.Trace(), &ctx, - stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/conn.(*grpcClientStream).SendMsg"), + var ( + ctx = s.Context() + onDone = trace.DriverOnConnStreamSendMsg(s.c.config.Trace(), &ctx, + stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/conn.(*grpcClientStream).SendMsg"), + ) ) defer func() { onDone(err) @@ -79,7 +82,7 @@ func (s *grpcClientStream) SendMsg(m interface{}) (err error) { } defer func() { - s.c.onTransportError(s.Context(), err) + s.c.onTransportError(ctx, err) }() if s.wrapping { @@ -103,9 +106,11 @@ func (s *grpcClientStream) SendMsg(m interface{}) (err error) { } func (s *grpcClientStream) RecvMsg(m interface{}) (err error) { - ctx := s.ctx - onDone := trace.DriverOnConnStreamRecvMsg(s.c.config.Trace(), &ctx, - stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/conn.(*grpcClientStream).RecvMsg"), + var ( + ctx = s.Context() + onDone = trace.DriverOnConnStreamRecvMsg(s.c.config.Trace(), &ctx, + stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/conn.(*grpcClientStream).RecvMsg"), + ) ) defer func() { onDone(err) @@ -130,7 +135,7 @@ func (s *grpcClientStream) RecvMsg(m interface{}) (err error) { defer func() { if !xerrors.Is(err, io.EOF) { - s.c.onTransportError(s.Context(), err) + s.c.onTransportError(ctx, err) } }() diff --git a/internal/coordination/client_test.go b/internal/coordination/client_test.go index d7778be95..dd1b46f76 100644 --- a/internal/coordination/client_test.go +++ b/internal/coordination/client_test.go @@ -128,7 +128,7 @@ func TestDescribeNodeRequest(t *testing.T) { func TestOperationParams(t *testing.T) { for _, tt := range []struct { name string - ctx context.Context + ctx context.Context //nolint:containedctx config interface { OperationTimeout() time.Duration OperationCancelAfter() time.Duration diff --git a/internal/coordination/session.go b/internal/coordination/session.go index 9e7e2bb23..5c6a17ac5 100644 --- a/internal/coordination/session.go +++ b/internal/coordination/session.go @@ -23,7 +23,7 @@ type session struct { options *options.CreateSessionOptions client *Client - ctx context.Context + ctx context.Context //nolint:containedctx cancel context.CancelFunc sessionClosedChan chan struct{} controller *conversation.Controller @@ -37,7 +37,7 @@ type session struct { type lease struct { session *session name string - ctx context.Context + ctx context.Context //nolint:containedctx cancel context.CancelFunc } diff --git a/internal/meta/context_test.go b/internal/meta/context_test.go index 1bfdabcc5..6d3ae817e 100644 --- a/internal/meta/context_test.go +++ b/internal/meta/context_test.go @@ -11,7 +11,7 @@ import ( func TestContext(t *testing.T) { for _, tt := range []struct { name string - ctx context.Context + ctx context.Context //nolint:containedctx header string values []string }{ diff --git a/internal/operation/params_test.go b/internal/operation/params_test.go index 605453279..74b136b96 100644 --- a/internal/operation/params_test.go +++ b/internal/operation/params_test.go @@ -14,7 +14,7 @@ import ( func TestParams(t *testing.T) { for _, tt := range []struct { - ctx context.Context + ctx context.Context //nolint:containedctx preferContextTimeout bool timeout time.Duration cancelAfter time.Duration diff --git a/internal/query/row.go b/internal/query/row.go index 476b6aa14..a9db3f61b 100644 --- a/internal/query/row.go +++ b/internal/query/row.go @@ -14,7 +14,7 @@ import ( var _ query.Row = (*row)(nil) type row struct { - ctx context.Context + ctx context.Context //nolint:containedctx trace *trace.Query indexedScanner scanner.IndexedScanner @@ -35,8 +35,11 @@ func newRow(ctx context.Context, columns []*Ydb.Column, v *Ydb.Value, t *trace.Q } func (r row) Scan(dst ...interface{}) (err error) { - onDone := trace.QueryOnRowScan(r.trace, &r.ctx, - stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/query.row.Scan"), + var ( + ctx = r.ctx + onDone = trace.QueryOnRowScan(r.trace, &ctx, + stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/query.row.Scan"), + ) ) defer func() { onDone(err) @@ -46,8 +49,11 @@ func (r row) Scan(dst ...interface{}) (err error) { } func (r row) ScanNamed(dst ...scanner.NamedDestination) (err error) { - onDone := trace.QueryOnRowScanNamed(r.trace, &r.ctx, - stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/query.row.ScanNamed"), + var ( + ctx = r.ctx + onDone = trace.QueryOnRowScanNamed(r.trace, &ctx, + stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/query.row.ScanNamed"), + ) ) defer func() { onDone(err) @@ -57,8 +63,11 @@ func (r row) ScanNamed(dst ...scanner.NamedDestination) (err error) { } func (r row) ScanStruct(dst interface{}, opts ...scanner.ScanStructOption) (err error) { - onDone := trace.QueryOnRowScanStruct(r.trace, &r.ctx, - stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/query.row.ScanStruct"), + var ( + ctx = r.ctx + onDone = trace.QueryOnRowScanStruct(r.trace, &ctx, + stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/query.row.ScanStruct"), + ) ) defer func() { onDone(err) diff --git a/internal/table/scanner/result_test.go b/internal/table/scanner/result_test.go index dc81b0203..b7619cc11 100644 --- a/internal/table/scanner/result_test.go +++ b/internal/table/scanner/result_test.go @@ -207,7 +207,7 @@ func NewResultSet(a *allocator.Allocator, opts ...ResultSetOption) *Ydb.ResultSe func TestNewStreamWithRecvFirstResultSet(t *testing.T) { for _, tt := range []struct { - ctx context.Context + ctx context.Context //nolint:containedctx recvCounter int err error }{ diff --git a/internal/topic/topicreaderinternal/partition_session.go b/internal/topic/topicreaderinternal/partition_session.go index 9e23c5495..781de1782 100644 --- a/internal/topic/topicreaderinternal/partition_session.go +++ b/internal/topic/topicreaderinternal/partition_session.go @@ -24,7 +24,7 @@ type partitionSession struct { readerID int64 connectionID string - ctx context.Context + ctx context.Context //nolint:containedctx ctxCancel context.CancelFunc partitionSessionID rawtopicreader.PartitionSessionID diff --git a/internal/topic/topicreaderinternal/reader.go b/internal/topic/topicreaderinternal/reader.go index b499221d1..bc2dc865c 100644 --- a/internal/topic/topicreaderinternal/reader.go +++ b/internal/topic/topicreaderinternal/reader.go @@ -102,7 +102,6 @@ func NewReader( cfg.OperationTimeout(), cfg.RetrySettings, cfg.Trace, - cfg.BaseContext, ), defaultBatchConfig: cfg.DefaultBatchConfig, tracer: cfg.Trace, diff --git a/internal/topic/topicreaderinternal/stream_reader_impl.go b/internal/topic/topicreaderinternal/stream_reader_impl.go index fec186ffc..8eb91e0ac 100644 --- a/internal/topic/topicreaderinternal/stream_reader_impl.go +++ b/internal/topic/topicreaderinternal/stream_reader_impl.go @@ -34,7 +34,7 @@ type partitionSessionID = rawtopicreader.PartitionSessionID type topicStreamReaderImpl struct { cfg topicStreamReaderConfig - ctx context.Context + ctx context.Context //nolint:containedctx cancel context.CancelFunc freeBytes chan int @@ -60,7 +60,7 @@ type topicStreamReaderImpl struct { type topicStreamReaderConfig struct { CommitterBatchTimeLag time.Duration CommitterBatchCounterTrigger int - BaseContext context.Context + BaseContext context.Context //nolint:containedctx BufferSizeProtoBytes int Cred credentials.Credentials CredUpdateInterval time.Duration @@ -179,7 +179,7 @@ func (r *topicStreamReaderImpl) ReadMessageBatch( ) (batch *PublicBatch, err error) { onDone := trace.TopicOnReaderReadMessages( r.cfg.Trace, - ctx, + &ctx, opts.MinCount, opts.MaxCount, r.getRestBufferBytes(), @@ -295,15 +295,18 @@ func (r *topicStreamReaderImpl) onStopPartitionSessionRequestFromBuffer( return err } - onDone := trace.TopicOnReaderPartitionReadStopResponse( - r.cfg.Trace, - r.readConnectionID, - session.Context(), - session.Topic, - session.PartitionID, - session.partitionSessionID.ToInt64(), - msg.CommittedOffset.ToInt64(), - msg.Graceful, + var ( + ctx = session.Context() + onDone = trace.TopicOnReaderPartitionReadStopResponse( + r.cfg.Trace, + r.readConnectionID, + &ctx, + session.Topic, + session.PartitionID, + session.partitionSessionID.ToInt64(), + msg.CommittedOffset.ToInt64(), + msg.Graceful, + ) ) defer func() { onDone(err) @@ -357,7 +360,7 @@ func (r *topicStreamReaderImpl) Commit(ctx context.Context, commitRange commitRa session := commitRange.partitionSession onDone := trace.TopicOnReaderCommit( r.cfg.Trace, - ctx, + &ctx, session.Topic, session.PartitionID, session.partitionSessionID.ToInt64(), @@ -768,13 +771,16 @@ func (r *topicStreamReaderImpl) onStartPartitionSessionRequestFromBuffer( return err } - onDone := trace.TopicOnReaderPartitionReadStartResponse( - r.cfg.Trace, - r.readConnectionID, - session.Context(), - session.Topic, - session.PartitionID, - session.partitionSessionID.ToInt64(), + var ( + ctx = session.Context() + onDone = trace.TopicOnReaderPartitionReadStartResponse( + r.cfg.Trace, + r.readConnectionID, + &ctx, + session.Topic, + session.PartitionID, + session.partitionSessionID.ToInt64(), + ) ) respMessage := &rawtopicreader.StartPartitionSessionResponse{ diff --git a/internal/topic/topicreaderinternal/stream_reader_impl_test.go b/internal/topic/topicreaderinternal/stream_reader_impl_test.go index a518c2058..22fb2997a 100644 --- a/internal/topic/topicreaderinternal/stream_reader_impl_test.go +++ b/internal/topic/topicreaderinternal/stream_reader_impl_test.go @@ -379,7 +379,7 @@ func TestStreamReaderImpl_OnPartitionCloseHandle(t *testing.T) { e.reader.cfg.Trace.OnReaderPartitionReadStopResponse = func(info trace.TopicReaderPartitionReadStopResponseStartInfo) func(doneInfo trace.TopicReaderPartitionReadStopResponseDoneInfo) { //nolint:lll expected := trace.TopicReaderPartitionReadStopResponseStartInfo{ ReaderConnectionID: e.reader.readConnectionID, - PartitionContext: e.partitionSession.ctx, + PartitionContext: &e.partitionSession.ctx, Topic: e.partitionSession.Topic, PartitionID: e.partitionSession.PartitionID, PartitionSessionID: e.partitionSession.partitionSessionID.ToInt64(), @@ -388,7 +388,7 @@ func TestStreamReaderImpl_OnPartitionCloseHandle(t *testing.T) { } require.Equal(t, expected, info) - require.NoError(t, info.PartitionContext.Err()) + require.NoError(t, (*info.PartitionContext).Err()) readMessagesCtxCancel() @@ -424,7 +424,7 @@ func TestStreamReaderImpl_OnPartitionCloseHandle(t *testing.T) { e.reader.cfg.Trace.OnReaderPartitionReadStopResponse = func(info trace.TopicReaderPartitionReadStopResponseStartInfo) func(doneInfo trace.TopicReaderPartitionReadStopResponseDoneInfo) { //nolint:lll expected := trace.TopicReaderPartitionReadStopResponseStartInfo{ ReaderConnectionID: e.reader.readConnectionID, - PartitionContext: e.partitionSession.ctx, + PartitionContext: &e.partitionSession.ctx, Topic: e.partitionSession.Topic, PartitionID: e.partitionSession.PartitionID, PartitionSessionID: e.partitionSession.partitionSessionID.ToInt64(), @@ -432,7 +432,7 @@ func TestStreamReaderImpl_OnPartitionCloseHandle(t *testing.T) { Graceful: false, } require.Equal(t, expected, info) - require.Error(t, info.PartitionContext.Err()) + require.Error(t, (*info.PartitionContext).Err()) readMessagesCtxCancel() @@ -954,7 +954,7 @@ func TestTopicStreamReadImpl_CommitWithBadSession(t *testing.T) { } type streamEnv struct { - ctx context.Context + ctx context.Context //nolint:containedctx t testing.TB reader *topicStreamReaderImpl stopReadEvents empty.Chan diff --git a/internal/topic/topicreaderinternal/stream_reconnector.go b/internal/topic/topicreaderinternal/stream_reconnector.go index ec601ba67..f43044624 100644 --- a/internal/topic/topicreaderinternal/stream_reconnector.go +++ b/internal/topic/topicreaderinternal/stream_reconnector.go @@ -31,7 +31,6 @@ type readerConnectFunc func(ctx context.Context) (batchedStreamReader, error) type readerReconnector struct { background background.Worker clock clockwork.Clock - baseContext context.Context retrySettings topic.RetrySettings streamVal batchedStreamReader streamErr error @@ -49,14 +48,12 @@ type readerReconnector struct { initDone bool } -//nolint:revive func newReaderReconnector( readerID int64, connector readerConnectFunc, connectTimeout time.Duration, retrySettings topic.RetrySettings, tracer *trace.Topic, - baseContext context.Context, ) *readerReconnector { res := &readerReconnector{ readerID: readerID, @@ -65,7 +62,6 @@ func newReaderReconnector( streamErr: errUnconnected, connectTimeout: connectTimeout, tracer: tracer, - baseContext: baseContext, retrySettings: retrySettings, } diff --git a/internal/topic/topicwriterinternal/writer_reconnector_test.go b/internal/topic/topicwriterinternal/writer_reconnector_test.go index 4bf18c6ff..68eae0466 100644 --- a/internal/topic/topicwriterinternal/writer_reconnector_test.go +++ b/internal/topic/topicwriterinternal/writer_reconnector_test.go @@ -821,7 +821,7 @@ func isClosed(ch <-chan struct{}) bool { } type testEnv struct { - ctx context.Context + ctx context.Context //nolint:containedctx stream *MockRawTopicWriterStream writer *WriterReconnector sendFromServerChannel chan sendFromServerResponse diff --git a/internal/xcontext/context_with_cancel.go b/internal/xcontext/context_with_cancel.go index 1deeac4bb..f8cc68923 100644 --- a/internal/xcontext/context_with_cancel.go +++ b/internal/xcontext/context_with_cancel.go @@ -16,8 +16,8 @@ func WithCancel(ctx context.Context) (context.Context, context.CancelFunc) { } type cancelCtx struct { - parentCtx context.Context - ctx context.Context + parentCtx context.Context //nolint:containedctx + ctx context.Context //nolint:containedctx ctxCancel context.CancelFunc m sync.Mutex diff --git a/internal/xcontext/context_with_timeout.go b/internal/xcontext/context_with_timeout.go index 5342798fe..b120fbd85 100644 --- a/internal/xcontext/context_with_timeout.go +++ b/internal/xcontext/context_with_timeout.go @@ -2,6 +2,7 @@ package xcontext import ( "context" + "errors" "sync" "time" @@ -19,8 +20,8 @@ func WithTimeout(ctx context.Context, t time.Duration) (context.Context, context } type timeoutCtx struct { - parentCtx context.Context - ctx context.Context + parentCtx context.Context //nolint:containedctx + ctx context.Context //nolint:containedctx ctxCancel context.CancelFunc from string @@ -44,7 +45,7 @@ func (ctx *timeoutCtx) Err() error { return ctx.err } - if ctx.ctx.Err() == context.DeadlineExceeded && ctx.parentCtx.Err() == nil { //nolint:errorlint + if errors.Is(ctx.ctx.Err(), context.DeadlineExceeded) && ctx.parentCtx.Err() == nil { ctx.err = errFrom(context.DeadlineExceeded, ctx.from) return ctx.err diff --git a/internal/xcontext/without_deadline.go b/internal/xcontext/without_deadline.go index 2e80a284f..4e14655f9 100644 --- a/internal/xcontext/without_deadline.go +++ b/internal/xcontext/without_deadline.go @@ -5,7 +5,9 @@ import ( "time" ) -type valueOnlyContext struct{ context.Context } +type valueOnlyContext struct { + context.Context //nolint:containedctx +} func (valueOnlyContext) Deadline() (deadline time.Time, ok bool) { return } diff --git a/internal/xsql/conn.go b/internal/xsql/conn.go index c5d670517..71404066a 100644 --- a/internal/xsql/conn.go +++ b/internal/xsql/conn.go @@ -68,7 +68,7 @@ func withTrace(t *trace.DatabaseSQL) connOption { type beginTxFunc func(ctx context.Context, txOptions driver.TxOptions) (currentTx, error) type conn struct { - openConnCtx context.Context + ctx context.Context //nolint:containedctx connector *Connector trace *trace.DatabaseSQL @@ -129,9 +129,9 @@ var ( func newConn(ctx context.Context, c *Connector, s table.ClosableSession, opts ...connOption) *conn { cc := &conn{ - openConnCtx: ctx, - connector: c, - session: s, + ctx: ctx, + connector: c, + session: s, } cc.beginTxFuncs = map[QueryMode]beginTxFunc{ DataQueryMode: cc.beginTx, @@ -169,7 +169,7 @@ func (c *conn) PrepareContext(ctx context.Context, query string) (_ driver.Stmt, return &stmt{ conn: c, processor: c, - stmtCtx: ctx, + ctx: ctx, query: query, trace: c.trace, }, nil @@ -414,9 +414,12 @@ func (c *conn) Ping(ctx context.Context) (finalErr error) { func (c *conn) Close() (finalErr error) { if c.closed.CompareAndSwap(false, true) { c.connector.detach(c) - onDone := trace.DatabaseSQLOnConnClose( - c.trace, &c.openConnCtx, - stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/xsql.(*conn).Close"), + var ( + ctx = c.ctx + onDone = trace.DatabaseSQLOnConnClose( + c.trace, &ctx, + stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/xsql.(*conn).Close"), + ) ) defer func() { onDone(finalErr) @@ -424,7 +427,7 @@ func (c *conn) Close() (finalErr error) { if c.currentTx != nil { _ = c.currentTx.Rollback() } - err := c.session.Close(xcontext.ValueOnly(c.openConnCtx)) + err := c.session.Close(xcontext.ValueOnly(ctx)) if err != nil { return badconn.Map(xerrors.WithStackTrace(err)) } diff --git a/internal/xsql/stmt.go b/internal/xsql/stmt.go index 75b571acd..6909e55ef 100644 --- a/internal/xsql/stmt.go +++ b/internal/xsql/stmt.go @@ -17,8 +17,8 @@ type stmt struct { driver.ExecerContext driver.QueryerContext } - query string - stmtCtx context.Context + query string + ctx context.Context //nolint:containedctx trace *trace.DatabaseSQL } @@ -29,51 +29,54 @@ var ( _ driver.StmtExecContext = &stmt{} ) -func (s *stmt) QueryContext(ctx context.Context, args []driver.NamedValue) (_ driver.Rows, finalErr error) { - onDone := trace.DatabaseSQLOnStmtQuery(s.trace, &ctx, +func (stmt *stmt) QueryContext(ctx context.Context, args []driver.NamedValue) (_ driver.Rows, finalErr error) { + onDone := trace.DatabaseSQLOnStmtQuery(stmt.trace, &ctx, stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/xsql.(*stmt).QueryContext"), - s.stmtCtx, s.query, + stmt.ctx, stmt.query, ) defer func() { onDone(finalErr) }() - if !s.conn.isReady() { + if !stmt.conn.isReady() { return nil, badconn.Map(xerrors.WithStackTrace(errNotReadyConn)) } - switch m := queryModeFromContext(ctx, s.conn.defaultQueryMode); m { + switch m := queryModeFromContext(ctx, stmt.conn.defaultQueryMode); m { case DataQueryMode: - return s.processor.QueryContext(s.conn.withKeepInCache(ctx), s.query, args) + return stmt.processor.QueryContext(stmt.conn.withKeepInCache(ctx), stmt.query, args) default: return nil, fmt.Errorf("unsupported query mode '%s' for execute query on prepared statement", m) } } -func (s *stmt) ExecContext(ctx context.Context, args []driver.NamedValue) (_ driver.Result, finalErr error) { - onDone := trace.DatabaseSQLOnStmtExec(s.trace, &ctx, +func (stmt *stmt) ExecContext(ctx context.Context, args []driver.NamedValue) (_ driver.Result, finalErr error) { + onDone := trace.DatabaseSQLOnStmtExec(stmt.trace, &ctx, stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/xsql.(*stmt).ExecContext"), - s.stmtCtx, s.query, + stmt.ctx, stmt.query, ) defer func() { onDone(finalErr) }() - if !s.conn.isReady() { + if !stmt.conn.isReady() { return nil, badconn.Map(xerrors.WithStackTrace(errNotReadyConn)) } - switch m := queryModeFromContext(ctx, s.conn.defaultQueryMode); m { + switch m := queryModeFromContext(ctx, stmt.conn.defaultQueryMode); m { case DataQueryMode: - return s.processor.ExecContext(s.conn.withKeepInCache(ctx), s.query, args) + return stmt.processor.ExecContext(stmt.conn.withKeepInCache(ctx), stmt.query, args) default: return nil, fmt.Errorf("unsupported query mode '%s' for execute query on prepared statement", m) } } -func (s *stmt) NumInput() int { +func (stmt *stmt) NumInput() int { return -1 } -func (s *stmt) Close() (finalErr error) { - onDone := trace.DatabaseSQLOnStmtClose(s.trace, &s.stmtCtx, - stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/xsql.(*stmt).Close"), +func (stmt *stmt) Close() (finalErr error) { + var ( + ctx = stmt.ctx + onDone = trace.DatabaseSQLOnStmtClose(stmt.trace, &ctx, + stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/xsql.(*stmt).Close"), + ) ) defer func() { onDone(finalErr) @@ -82,10 +85,10 @@ func (s *stmt) Close() (finalErr error) { return nil } -func (s *stmt) Exec([]driver.Value) (driver.Result, error) { +func (stmt *stmt) Exec([]driver.Value) (driver.Result, error) { return nil, errDeprecated } -func (s *stmt) Query([]driver.Value) (driver.Rows, error) { +func (stmt *stmt) Query([]driver.Value) (driver.Rows, error) { return nil, errDeprecated } diff --git a/internal/xsql/tx.go b/internal/xsql/tx.go index d67813437..2a58e2728 100644 --- a/internal/xsql/tx.go +++ b/internal/xsql/tx.go @@ -14,9 +14,9 @@ import ( ) type tx struct { - conn *conn - txCtx context.Context - tx table.Transaction + conn *conn + ctx context.Context //nolint:containedctx + tx table.Transaction } var ( @@ -45,9 +45,9 @@ func (c *conn) beginTx(ctx context.Context, txOptions driver.TxOptions) (current return nil, badconn.Map(xerrors.WithStackTrace(err)) } c.currentTx = &tx{ - conn: c, - txCtx: ctx, - tx: transaction, + conn: c, + ctx: ctx, + tx: transaction, } return c.currentTx, nil @@ -73,9 +73,12 @@ func (tx *tx) checkTxState() error { } func (tx *tx) Commit() (finalErr error) { - onDone := trace.DatabaseSQLOnTxCommit(tx.conn.trace, &tx.txCtx, - stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/xsql.(*tx).Commit"), - tx, + var ( + ctx = tx.ctx + onDone = trace.DatabaseSQLOnTxCommit(tx.conn.trace, &ctx, + stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/xsql.(*tx).Commit"), + tx, + ) ) defer func() { onDone(finalErr) @@ -86,7 +89,7 @@ func (tx *tx) Commit() (finalErr error) { defer func() { tx.conn.currentTx = nil }() - _, err := tx.tx.CommitTx(tx.txCtx) + _, err := tx.tx.CommitTx(tx.ctx) if err != nil { return badconn.Map(xerrors.WithStackTrace(err)) } @@ -95,9 +98,12 @@ func (tx *tx) Commit() (finalErr error) { } func (tx *tx) Rollback() (finalErr error) { - onDone := trace.DatabaseSQLOnTxRollback(tx.conn.trace, &tx.txCtx, - stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/xsql.(*tx).Rollback"), - tx, + var ( + ctx = tx.ctx + onDone = trace.DatabaseSQLOnTxRollback(tx.conn.trace, &ctx, + stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/xsql.(*tx).Rollback"), + tx, + ) ) defer func() { onDone(finalErr) @@ -108,7 +114,7 @@ func (tx *tx) Rollback() (finalErr error) { defer func() { tx.conn.currentTx = nil }() - err := tx.tx.Rollback(tx.txCtx) + err := tx.tx.Rollback(tx.ctx) if err != nil { return badconn.Map(xerrors.WithStackTrace(err)) } @@ -121,7 +127,7 @@ func (tx *tx) QueryContext(ctx context.Context, query string, args []driver.Name ) { onDone := trace.DatabaseSQLOnTxQuery(tx.conn.trace, &ctx, stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/xsql.(*tx).QueryContext"), - tx.txCtx, tx, query, + tx.ctx, tx, query, ) defer func() { onDone(finalErr) @@ -163,7 +169,7 @@ func (tx *tx) ExecContext(ctx context.Context, query string, args []driver.Named ) { onDone := trace.DatabaseSQLOnTxExec(tx.conn.trace, &ctx, stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/xsql.(*tx).ExecContext"), - tx.txCtx, tx, query, + tx.ctx, tx, query, ) defer func() { onDone(finalErr) @@ -197,7 +203,7 @@ func (tx *tx) ExecContext(ctx context.Context, query string, args []driver.Named func (tx *tx) PrepareContext(ctx context.Context, query string) (_ driver.Stmt, finalErr error) { onDone := trace.DatabaseSQLOnTxPrepare(tx.conn.trace, &ctx, stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/xsql.(*tx).PrepareContext"), - &tx.txCtx, tx, query, + tx.ctx, tx, query, ) defer func() { onDone(finalErr) @@ -209,7 +215,7 @@ func (tx *tx) PrepareContext(ctx context.Context, query string) (_ driver.Stmt, return &stmt{ conn: tx.conn, processor: tx, - stmtCtx: ctx, + ctx: ctx, query: query, trace: tx.conn.trace, }, nil diff --git a/internal/xsql/tx_fake.go b/internal/xsql/tx_fake.go index 459aba718..0fc3efd27 100644 --- a/internal/xsql/tx_fake.go +++ b/internal/xsql/tx_fake.go @@ -12,15 +12,15 @@ import ( ) type txFake struct { - beginCtx context.Context + beginCtx context.Context //nolint:containedctx conn *conn - ctx context.Context + ctx context.Context //nolint:containedctx } func (tx *txFake) PrepareContext(ctx context.Context, query string) (_ driver.Stmt, finalErr error) { onDone := trace.DatabaseSQLOnTxPrepare(tx.conn.trace, &ctx, stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/xsql.(*txFake).PrepareContext"), - &tx.beginCtx, tx, query, + tx.beginCtx, tx, query, ) defer func() { onDone(finalErr) @@ -32,7 +32,7 @@ func (tx *txFake) PrepareContext(ctx context.Context, query string) (_ driver.St return &stmt{ conn: tx.conn, processor: tx, - stmtCtx: ctx, + ctx: ctx, query: query, trace: tx.conn.trace, }, nil @@ -57,9 +57,12 @@ func (tx *txFake) ID() string { } func (tx *txFake) Commit() (err error) { - onDone := trace.DatabaseSQLOnTxCommit(tx.conn.trace, &tx.ctx, - stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/xsql.(*txFake).Commit"), - tx, + var ( + ctx = tx.ctx + onDone = trace.DatabaseSQLOnTxCommit(tx.conn.trace, &ctx, + stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/xsql.(*txFake).Commit"), + tx, + ) ) defer func() { onDone(err) @@ -75,9 +78,12 @@ func (tx *txFake) Commit() (err error) { } func (tx *txFake) Rollback() (err error) { - onDone := trace.DatabaseSQLOnTxRollback(tx.conn.trace, &tx.ctx, - stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/xsql.(*txFake).Rollback"), - tx, + var ( + ctx = tx.ctx + onDone = trace.DatabaseSQLOnTxRollback(tx.conn.trace, &ctx, + stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/xsql.(*txFake).Rollback"), + tx, + ) ) defer func() { onDone(err) diff --git a/log/context_test.go b/log/context_test.go index 92882b887..0f71d9b38 100644 --- a/log/context_test.go +++ b/log/context_test.go @@ -13,7 +13,7 @@ import ( func TestLevelFromContext(t *testing.T) { for _, tt := range []struct { - ctx context.Context + ctx context.Context //nolint:containedctx lvl Level }{ { @@ -37,7 +37,7 @@ func TestLevelFromContext(t *testing.T) { func TestNamesFromContext(t *testing.T) { for _, tt := range []struct { - ctx context.Context + ctx context.Context //nolint:containedctx names []string }{ { diff --git a/trace/sql.go b/trace/sql.go index 9ee597a39..f18e53826 100644 --- a/trace/sql.go +++ b/trace/sql.go @@ -101,7 +101,7 @@ type ( // Safe replacement of context are provided only inside callback function Context *context.Context Call call - TxContext *context.Context + TxContext context.Context //nolint:containedctx Tx tableTransactionInfo Query string } @@ -193,7 +193,7 @@ type ( // Safe replacement of context are provided only inside callback function Context *context.Context Call call - TxContext context.Context + TxContext context.Context //nolint:containedctx Tx tableTransactionInfo Query string } @@ -209,7 +209,7 @@ type ( // Safe replacement of context are provided only inside callback function Context *context.Context Call call - TxContext context.Context + TxContext context.Context //nolint:containedctx Tx tableTransactionInfo Query string } @@ -262,7 +262,7 @@ type ( // Safe replacement of context are provided only inside callback function Context *context.Context Call call - StmtContext context.Context + StmtContext context.Context //nolint:containedctx Query string } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals @@ -277,7 +277,7 @@ type ( // Safe replacement of context are provided only inside callback function Context *context.Context Call call - StmtContext context.Context + StmtContext context.Context //nolint:containedctx Query string } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals diff --git a/trace/sql_gtrace.go b/trace/sql_gtrace.go index c4a686e1d..b9fad4194 100644 --- a/trace/sql_gtrace.go +++ b/trace/sql_gtrace.go @@ -914,6 +914,7 @@ func (t *DatabaseSQL) onDoTx(d DatabaseSQLDoTxStartInfo) func(DatabaseSQLDoTxInt return res } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func DatabaseSQLOnConnectorConnect(t *DatabaseSQL, c *context.Context, call call) func(_ error, session tableSessionInfo) { var p DatabaseSQLConnectorConnectStartInfo @@ -927,6 +928,7 @@ func DatabaseSQLOnConnectorConnect(t *DatabaseSQL, c *context.Context, call call res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func DatabaseSQLOnConnPing(t *DatabaseSQL, c *context.Context, call call) func(error) { var p DatabaseSQLConnPingStartInfo @@ -939,6 +941,7 @@ func DatabaseSQLOnConnPing(t *DatabaseSQL, c *context.Context, call call) func(e res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func DatabaseSQLOnConnPrepare(t *DatabaseSQL, c *context.Context, call call, query string) func(error) { var p DatabaseSQLConnPrepareStartInfo @@ -952,6 +955,7 @@ func DatabaseSQLOnConnPrepare(t *DatabaseSQL, c *context.Context, call call, que res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func DatabaseSQLOnConnClose(t *DatabaseSQL, c *context.Context, call call) func(error) { var p DatabaseSQLConnCloseStartInfo @@ -964,6 +968,7 @@ func DatabaseSQLOnConnClose(t *DatabaseSQL, c *context.Context, call call) func( res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func DatabaseSQLOnConnBegin(t *DatabaseSQL, c *context.Context, call call) func(tx tableTransactionInfo, _ error) { var p DatabaseSQLConnBeginStartInfo @@ -977,6 +982,7 @@ func DatabaseSQLOnConnBegin(t *DatabaseSQL, c *context.Context, call call) func( res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func DatabaseSQLOnConnQuery(t *DatabaseSQL, c *context.Context, call call, query string, mode string, idempotent bool, idleTime time.Duration) func(error) { var p DatabaseSQLConnQueryStartInfo @@ -993,6 +999,7 @@ func DatabaseSQLOnConnQuery(t *DatabaseSQL, c *context.Context, call call, query res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func DatabaseSQLOnConnExec(t *DatabaseSQL, c *context.Context, call call, query string, mode string, idempotent bool, idleTime time.Duration) func(error) { var p DatabaseSQLConnExecStartInfo @@ -1009,6 +1016,7 @@ func DatabaseSQLOnConnExec(t *DatabaseSQL, c *context.Context, call call, query res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func DatabaseSQLOnConnIsTableExists(t *DatabaseSQL, c *context.Context, call call, tableName string) func(exists bool, _ error) { var p DatabaseSQLConnIsTableExistsStartInfo @@ -1023,6 +1031,7 @@ func DatabaseSQLOnConnIsTableExists(t *DatabaseSQL, c *context.Context, call cal res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func DatabaseSQLOnTxQuery(t *DatabaseSQL, c *context.Context, call call, txContext context.Context, tx tableTransactionInfo, query string) func(error) { var p DatabaseSQLTxQueryStartInfo @@ -1038,6 +1047,7 @@ func DatabaseSQLOnTxQuery(t *DatabaseSQL, c *context.Context, call call, txConte res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func DatabaseSQLOnTxExec(t *DatabaseSQL, c *context.Context, call call, txContext context.Context, tx tableTransactionInfo, query string) func(error) { var p DatabaseSQLTxExecStartInfo @@ -1053,8 +1063,9 @@ func DatabaseSQLOnTxExec(t *DatabaseSQL, c *context.Context, call call, txContex res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func DatabaseSQLOnTxPrepare(t *DatabaseSQL, c *context.Context, call call, txContext *context.Context, tx tableTransactionInfo, query string) func(error) { +func DatabaseSQLOnTxPrepare(t *DatabaseSQL, c *context.Context, call call, txContext context.Context, tx tableTransactionInfo, query string) func(error) { var p DatabaseSQLTxPrepareStartInfo p.Context = c p.Call = call @@ -1068,6 +1079,7 @@ func DatabaseSQLOnTxPrepare(t *DatabaseSQL, c *context.Context, call call, txCon res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func DatabaseSQLOnTxCommit(t *DatabaseSQL, c *context.Context, call call, tx tableTransactionInfo) func(error) { var p DatabaseSQLTxCommitStartInfo @@ -1081,6 +1093,7 @@ func DatabaseSQLOnTxCommit(t *DatabaseSQL, c *context.Context, call call, tx tab res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func DatabaseSQLOnTxRollback(t *DatabaseSQL, c *context.Context, call call, tx tableTransactionInfo) func(error) { var p DatabaseSQLTxRollbackStartInfo @@ -1094,6 +1107,7 @@ func DatabaseSQLOnTxRollback(t *DatabaseSQL, c *context.Context, call call, tx t res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func DatabaseSQLOnStmtQuery(t *DatabaseSQL, c *context.Context, call call, stmtContext context.Context, query string) func(error) { var p DatabaseSQLStmtQueryStartInfo @@ -1108,6 +1122,7 @@ func DatabaseSQLOnStmtQuery(t *DatabaseSQL, c *context.Context, call call, stmtC res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func DatabaseSQLOnStmtExec(t *DatabaseSQL, c *context.Context, call call, stmtContext context.Context, query string) func(error) { var p DatabaseSQLStmtExecStartInfo @@ -1122,6 +1137,7 @@ func DatabaseSQLOnStmtExec(t *DatabaseSQL, c *context.Context, call call, stmtCo res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func DatabaseSQLOnStmtClose(t *DatabaseSQL, stmtContext *context.Context, call call) func(error) { var p DatabaseSQLStmtCloseStartInfo @@ -1134,6 +1150,7 @@ func DatabaseSQLOnStmtClose(t *DatabaseSQL, stmtContext *context.Context, call c res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func DatabaseSQLOnDoTx(t *DatabaseSQL, c *context.Context, call call, iD string, idempotent bool) func(error) func(attempts int, _ error) { var p DatabaseSQLDoTxStartInfo diff --git a/trace/topic.go b/trace/topic.go index 53077a3b8..ad20bc980 100644 --- a/trace/topic.go +++ b/trace/topic.go @@ -96,7 +96,7 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicReaderPartitionReadStartResponseStartInfo struct { ReaderConnectionID string - PartitionContext context.Context + PartitionContext *context.Context Topic string PartitionID int64 PartitionSessionID int64 @@ -118,7 +118,7 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicReaderPartitionReadStopResponseStartInfo struct { ReaderConnectionID string - PartitionContext context.Context + PartitionContext *context.Context Topic string PartitionID int64 PartitionSessionID int64 @@ -197,7 +197,7 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicReaderReadMessagesStartInfo struct { - RequestContext context.Context + RequestContext *context.Context MinCount int MaxCount int FreeBufferCapacity int @@ -239,7 +239,7 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicReaderCommitStartInfo struct { - RequestContext context.Context + RequestContext *context.Context Topic string PartitionID int64 PartitionSessionID int64 diff --git a/trace/topic_gtrace.go b/trace/topic_gtrace.go index 5f9dcde80..3fad38d02 100644 --- a/trace/topic_gtrace.go +++ b/trace/topic_gtrace.go @@ -995,6 +995,7 @@ func (t *Topic) onWriterReadUnknownGrpcMessage(t1 TopicOnWriterReadUnknownGrpcMe } fn(t1) } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderStart(t *Topic, readerID int64, consumer string) { var p TopicReaderStartInfo @@ -1002,6 +1003,7 @@ func TopicOnReaderStart(t *Topic, readerID int64, consumer string) { p.Consumer = consumer t.onReaderStart(p) } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderReconnect(t *Topic, reason error) func(error) { var p TopicReaderReconnectStartInfo @@ -1013,6 +1015,7 @@ func TopicOnReaderReconnect(t *Topic, reason error) func(error) { res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderReconnectRequest(t *Topic, reason error, wasSent bool) { var p TopicReaderReconnectRequestInfo @@ -1020,8 +1023,9 @@ func TopicOnReaderReconnectRequest(t *Topic, reason error, wasSent bool) { p.WasSent = wasSent t.onReaderReconnectRequest(p) } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnReaderPartitionReadStartResponse(t *Topic, readerConnectionID string, partitionContext context.Context, topic string, partitionID int64, partitionSessionID int64) func(readOffset *int64, commitOffset *int64, _ error) { +func TopicOnReaderPartitionReadStartResponse(t *Topic, readerConnectionID string, partitionContext *context.Context, topic string, partitionID int64, partitionSessionID int64) func(readOffset *int64, commitOffset *int64, _ error) { var p TopicReaderPartitionReadStartResponseStartInfo p.ReaderConnectionID = readerConnectionID p.PartitionContext = partitionContext @@ -1037,8 +1041,9 @@ func TopicOnReaderPartitionReadStartResponse(t *Topic, readerConnectionID string res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnReaderPartitionReadStopResponse(t *Topic, readerConnectionID string, partitionContext context.Context, topic string, partitionID int64, partitionSessionID int64, committedOffset int64, graceful bool) func(error) { +func TopicOnReaderPartitionReadStopResponse(t *Topic, readerConnectionID string, partitionContext *context.Context, topic string, partitionID int64, partitionSessionID int64, committedOffset int64, graceful bool) func(error) { var p TopicReaderPartitionReadStopResponseStartInfo p.ReaderConnectionID = readerConnectionID p.PartitionContext = partitionContext @@ -1054,8 +1059,9 @@ func TopicOnReaderPartitionReadStopResponse(t *Topic, readerConnectionID string, res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnReaderCommit(t *Topic, requestContext context.Context, topic string, partitionID int64, partitionSessionID int64, startOffset int64, endOffset int64) func(error) { +func TopicOnReaderCommit(t *Topic, requestContext *context.Context, topic string, partitionID int64, partitionSessionID int64, startOffset int64, endOffset int64) func(error) { var p TopicReaderCommitStartInfo p.RequestContext = requestContext p.Topic = topic @@ -1070,6 +1076,7 @@ func TopicOnReaderCommit(t *Topic, requestContext context.Context, topic string, res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderSendCommitMessage(t *Topic, commitsInfo TopicReaderStreamSendCommitMessageStartMessageInfo) func(error) { var p TopicReaderSendCommitMessageStartInfo @@ -1081,6 +1088,7 @@ func TopicOnReaderSendCommitMessage(t *Topic, commitsInfo TopicReaderStreamSendC res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderCommittedNotify(t *Topic, readerConnectionID string, topic string, partitionID int64, partitionSessionID int64, committedOffset int64) { var p TopicReaderCommittedNotifyInfo @@ -1091,6 +1099,7 @@ func TopicOnReaderCommittedNotify(t *Topic, readerConnectionID string, topic str p.CommittedOffset = committedOffset t.onReaderCommittedNotify(p) } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderClose(t *Topic, readerConnectionID string, closeReason error) func(closeError error) { var p TopicReaderCloseStartInfo @@ -1103,6 +1112,7 @@ func TopicOnReaderClose(t *Topic, readerConnectionID string, closeReason error) res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderInit(t *Topic, preInitReaderConnectionID string, initRequestInfo TopicReadStreamInitRequestInfo) func(readerConnectionID string, _ error) { var p TopicReaderInitStartInfo @@ -1116,6 +1126,7 @@ func TopicOnReaderInit(t *Topic, preInitReaderConnectionID string, initRequestIn res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderError(t *Topic, readerConnectionID string, e error) { var p TopicReaderErrorInfo @@ -1123,6 +1134,7 @@ func TopicOnReaderError(t *Topic, readerConnectionID string, e error) { p.Error = e t.onReaderError(p) } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderUpdateToken(t *Topic, readerConnectionID string) func(tokenLen int, _ error) func(error) { var p OnReadUpdateTokenStartInfo @@ -1140,6 +1152,7 @@ func TopicOnReaderUpdateToken(t *Topic, readerConnectionID string) func(tokenLen } } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderSentDataRequest(t *Topic, readerConnectionID string, requestBytes int, localBufferSizeAfterSent int) { var p TopicReaderSentDataRequestInfo @@ -1148,6 +1161,7 @@ func TopicOnReaderSentDataRequest(t *Topic, readerConnectionID string, requestBy p.LocalBufferSizeAfterSent = localBufferSizeAfterSent t.onReaderSentDataRequest(p) } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderReceiveDataResponse(t *Topic, readerConnectionID string, localBufferSizeAfterReceive int, dataResponse TopicReaderDataResponseInfo) func(error) { var p TopicReaderReceiveDataResponseStartInfo @@ -1161,8 +1175,9 @@ func TopicOnReaderReceiveDataResponse(t *Topic, readerConnectionID string, local res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnReaderReadMessages(t *Topic, requestContext context.Context, minCount int, maxCount int, freeBufferCapacity int) func(messagesCount int, topic string, partitionID int64, partitionSessionID int64, offsetStart int64, offsetEnd int64, freeBufferCapacity int, _ error) { +func TopicOnReaderReadMessages(t *Topic, requestContext *context.Context, minCount int, maxCount int, freeBufferCapacity int) func(messagesCount int, topic string, partitionID int64, partitionSessionID int64, offsetStart int64, offsetEnd int64, freeBufferCapacity int, _ error) { var p TopicReaderReadMessagesStartInfo p.RequestContext = requestContext p.MinCount = minCount @@ -1182,6 +1197,7 @@ func TopicOnReaderReadMessages(t *Topic, requestContext context.Context, minCoun res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderUnknownGrpcMessage(t *Topic, readerConnectionID string, e error) { var p OnReadUnknownGrpcMessageInfo @@ -1189,6 +1205,7 @@ func TopicOnReaderUnknownGrpcMessage(t *Topic, readerConnectionID string, e erro p.Error = e t.onReaderUnknownGrpcMessage(p) } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnWriterReconnect(t *Topic, writerInstanceID string, topic string, producerID string, attempt int) func(error) { var p TopicWriterReconnectStartInfo @@ -1203,6 +1220,7 @@ func TopicOnWriterReconnect(t *Topic, writerInstanceID string, topic string, pro res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnWriterInitStream(t *Topic, writerInstanceID string, topic string, producerID string) func(sessionID string, _ error) { var p TopicWriterInitStreamStartInfo @@ -1217,6 +1235,7 @@ func TopicOnWriterInitStream(t *Topic, writerInstanceID string, topic string, pr res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnWriterClose(t *Topic, writerInstanceID string, reason error) func(error) { var p TopicWriterCloseStartInfo @@ -1229,6 +1248,7 @@ func TopicOnWriterClose(t *Topic, writerInstanceID string, reason error) func(er res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnWriterCompressMessages(t *Topic, writerInstanceID string, sessionID string, codec int32, firstSeqNo int64, messagesCount int, reason TopicWriterCompressMessagesReason) func(error) { var p TopicWriterCompressMessagesStartInfo @@ -1245,6 +1265,7 @@ func TopicOnWriterCompressMessages(t *Topic, writerInstanceID string, sessionID res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnWriterSendMessages(t *Topic, writerInstanceID string, sessionID string, codec int32, firstSeqNo int64, messagesCount int) func(error) { var p TopicWriterSendMessagesStartInfo @@ -1260,6 +1281,7 @@ func TopicOnWriterSendMessages(t *Topic, writerInstanceID string, sessionID stri res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnWriterReadUnknownGrpcMessage(t *Topic, writerInstanceID string, sessionID string, e error) { var p TopicOnWriterReadUnknownGrpcMessageInfo