Skip to content

Commit

Permalink
Merge pull request #1211 from ydb-platform/containedctx
Browse files Browse the repository at this point in the history
Enabled `containedctx` linter
  • Loading branch information
asmyasnikov authored Apr 23, 2024
2 parents 9293016 + 6bbf708 commit d716a3b
Show file tree
Hide file tree
Showing 31 changed files with 176 additions and 143 deletions.
1 change: 0 additions & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,6 @@ linters-settings:
linters:
enable-all: true
disable:
- containedctx
- contextcheck
- cyclop
- depguard
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
* Refactored internals for enabling `containedctx` linter
* Fixed the hanging semaphore issue on coordination session reconnect

## v3.65.3
Expand Down
2 changes: 0 additions & 2 deletions driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ var _ Connection = (*Driver)(nil)

// Driver type provide access to YDB service clients
type Driver struct {
ctx context.Context // cancel while Driver.Close called.
ctxCancel context.CancelFunc

userInfo *dsn.UserInfo
Expand Down Expand Up @@ -311,7 +310,6 @@ func newConnectionFromOptions(ctx context.Context, opts ...Option) (_ *Driver, e

d := &Driver{
children: make(map[uint64]*Driver),
ctx: ctx,
ctxCancel: driverCtxCancel,
}

Expand Down
4 changes: 2 additions & 2 deletions examples/topic/topicreader/topicreader_trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func ExplicitPartitionStartStopHandler(ctx context.Context, db *ydb.Driver) {
) func(
trace.TopicReaderPartitionReadStartResponseDoneInfo,
) {
err := externalSystemLock(info.PartitionContext, info.Topic, info.PartitionID)
err := externalSystemLock(*info.PartitionContext, info.Topic, info.PartitionID)
if err != nil {
stopReader()
}
Expand Down Expand Up @@ -105,7 +105,7 @@ func PartitionStartStopHandlerAndOwnReadProgressStorage(ctx context.Context, db
) func(
trace.TopicReaderPartitionReadStartResponseDoneInfo,
) {
err := externalSystemLock(info.PartitionContext, info.Topic, info.PartitionID)
err := externalSystemLock(*info.PartitionContext, info.Topic, info.PartitionID)
if err != nil {
stopReader()
}
Expand Down
2 changes: 1 addition & 1 deletion internal/background/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ var (

// A Worker must not be copied after first use
type Worker struct {
ctx context.Context
ctx context.Context //nolint:containedctx
workers sync.WaitGroup
closeReason error
tasksCompleted empty.Chan
Expand Down
1 change: 0 additions & 1 deletion internal/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,6 @@ func (c *conn) NewStream(

return &grpcClientStream{
ClientStream: s,
ctx: ctx,
c: c,
wrapping: useWrapping,
traceID: traceID,
Expand Down
29 changes: 17 additions & 12 deletions internal/conn/grpc_client_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (

type grpcClientStream struct {
grpc.ClientStream
ctx context.Context
c *conn
wrapping bool
traceID string
Expand All @@ -25,9 +24,11 @@ type grpcClientStream struct {
}

func (s *grpcClientStream) CloseSend() (err error) {
ctx := s.ctx
onDone := trace.DriverOnConnStreamCloseSend(s.c.config.Trace(), &ctx,
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/conn.(*grpcClientStream).CloseSend"),
var (
ctx = s.Context()
onDone = trace.DriverOnConnStreamCloseSend(s.c.config.Trace(), &ctx,
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/conn.(*grpcClientStream).CloseSend"),
)
)
defer func() {
onDone(err)
Expand Down Expand Up @@ -60,9 +61,11 @@ func (s *grpcClientStream) CloseSend() (err error) {
}

func (s *grpcClientStream) SendMsg(m interface{}) (err error) {
ctx := s.ctx
onDone := trace.DriverOnConnStreamSendMsg(s.c.config.Trace(), &ctx,
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/conn.(*grpcClientStream).SendMsg"),
var (
ctx = s.Context()
onDone = trace.DriverOnConnStreamSendMsg(s.c.config.Trace(), &ctx,
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/conn.(*grpcClientStream).SendMsg"),
)
)
defer func() {
onDone(err)
Expand All @@ -79,7 +82,7 @@ func (s *grpcClientStream) SendMsg(m interface{}) (err error) {
}

defer func() {
s.c.onTransportError(s.Context(), err)
s.c.onTransportError(ctx, err)
}()

if s.wrapping {
Expand All @@ -103,9 +106,11 @@ func (s *grpcClientStream) SendMsg(m interface{}) (err error) {
}

func (s *grpcClientStream) RecvMsg(m interface{}) (err error) {
ctx := s.ctx
onDone := trace.DriverOnConnStreamRecvMsg(s.c.config.Trace(), &ctx,
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/conn.(*grpcClientStream).RecvMsg"),
var (
ctx = s.Context()
onDone = trace.DriverOnConnStreamRecvMsg(s.c.config.Trace(), &ctx,
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/conn.(*grpcClientStream).RecvMsg"),
)
)
defer func() {
onDone(err)
Expand All @@ -130,7 +135,7 @@ func (s *grpcClientStream) RecvMsg(m interface{}) (err error) {

defer func() {
if !xerrors.Is(err, io.EOF) {
s.c.onTransportError(s.Context(), err)
s.c.onTransportError(ctx, err)
}
}()

Expand Down
2 changes: 1 addition & 1 deletion internal/coordination/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func TestDescribeNodeRequest(t *testing.T) {
func TestOperationParams(t *testing.T) {
for _, tt := range []struct {
name string
ctx context.Context
ctx context.Context //nolint:containedctx
config interface {
OperationTimeout() time.Duration
OperationCancelAfter() time.Duration
Expand Down
4 changes: 2 additions & 2 deletions internal/coordination/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type session struct {
options *options.CreateSessionOptions
client *Client

ctx context.Context
ctx context.Context //nolint:containedctx
cancel context.CancelFunc
sessionClosedChan chan struct{}
controller *conversation.Controller
Expand All @@ -37,7 +37,7 @@ type session struct {
type lease struct {
session *session
name string
ctx context.Context
ctx context.Context //nolint:containedctx
cancel context.CancelFunc
}

Expand Down
2 changes: 1 addition & 1 deletion internal/meta/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
func TestContext(t *testing.T) {
for _, tt := range []struct {
name string
ctx context.Context
ctx context.Context //nolint:containedctx
header string
values []string
}{
Expand Down
2 changes: 1 addition & 1 deletion internal/operation/params_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (

func TestParams(t *testing.T) {
for _, tt := range []struct {
ctx context.Context
ctx context.Context //nolint:containedctx
preferContextTimeout bool
timeout time.Duration
cancelAfter time.Duration
Expand Down
23 changes: 16 additions & 7 deletions internal/query/row.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
var _ query.Row = (*row)(nil)

type row struct {
ctx context.Context
ctx context.Context //nolint:containedctx
trace *trace.Query

indexedScanner scanner.IndexedScanner
Expand All @@ -35,8 +35,11 @@ func newRow(ctx context.Context, columns []*Ydb.Column, v *Ydb.Value, t *trace.Q
}

func (r row) Scan(dst ...interface{}) (err error) {
onDone := trace.QueryOnRowScan(r.trace, &r.ctx,
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/query.row.Scan"),
var (
ctx = r.ctx
onDone = trace.QueryOnRowScan(r.trace, &ctx,
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/query.row.Scan"),
)
)
defer func() {
onDone(err)
Expand All @@ -46,8 +49,11 @@ func (r row) Scan(dst ...interface{}) (err error) {
}

func (r row) ScanNamed(dst ...scanner.NamedDestination) (err error) {
onDone := trace.QueryOnRowScanNamed(r.trace, &r.ctx,
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/query.row.ScanNamed"),
var (
ctx = r.ctx
onDone = trace.QueryOnRowScanNamed(r.trace, &ctx,
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/query.row.ScanNamed"),
)
)
defer func() {
onDone(err)
Expand All @@ -57,8 +63,11 @@ func (r row) ScanNamed(dst ...scanner.NamedDestination) (err error) {
}

func (r row) ScanStruct(dst interface{}, opts ...scanner.ScanStructOption) (err error) {
onDone := trace.QueryOnRowScanStruct(r.trace, &r.ctx,
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/query.row.ScanStruct"),
var (
ctx = r.ctx
onDone = trace.QueryOnRowScanStruct(r.trace, &ctx,
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/query.row.ScanStruct"),
)
)
defer func() {
onDone(err)
Expand Down
2 changes: 1 addition & 1 deletion internal/table/scanner/result_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func NewResultSet(a *allocator.Allocator, opts ...ResultSetOption) *Ydb.ResultSe

func TestNewStreamWithRecvFirstResultSet(t *testing.T) {
for _, tt := range []struct {
ctx context.Context
ctx context.Context //nolint:containedctx
recvCounter int
err error
}{
Expand Down
2 changes: 1 addition & 1 deletion internal/topic/topicreaderinternal/partition_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type partitionSession struct {
readerID int64
connectionID string

ctx context.Context
ctx context.Context //nolint:containedctx
ctxCancel context.CancelFunc
partitionSessionID rawtopicreader.PartitionSessionID

Expand Down
1 change: 0 additions & 1 deletion internal/topic/topicreaderinternal/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ func NewReader(
cfg.OperationTimeout(),
cfg.RetrySettings,
cfg.Trace,
cfg.BaseContext,
),
defaultBatchConfig: cfg.DefaultBatchConfig,
tracer: cfg.Trace,
Expand Down
46 changes: 26 additions & 20 deletions internal/topic/topicreaderinternal/stream_reader_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type partitionSessionID = rawtopicreader.PartitionSessionID

type topicStreamReaderImpl struct {
cfg topicStreamReaderConfig
ctx context.Context
ctx context.Context //nolint:containedctx
cancel context.CancelFunc

freeBytes chan int
Expand All @@ -60,7 +60,7 @@ type topicStreamReaderImpl struct {
type topicStreamReaderConfig struct {
CommitterBatchTimeLag time.Duration
CommitterBatchCounterTrigger int
BaseContext context.Context
BaseContext context.Context //nolint:containedctx
BufferSizeProtoBytes int
Cred credentials.Credentials
CredUpdateInterval time.Duration
Expand Down Expand Up @@ -179,7 +179,7 @@ func (r *topicStreamReaderImpl) ReadMessageBatch(
) (batch *PublicBatch, err error) {
onDone := trace.TopicOnReaderReadMessages(
r.cfg.Trace,
ctx,
&ctx,
opts.MinCount,
opts.MaxCount,
r.getRestBufferBytes(),
Expand Down Expand Up @@ -295,15 +295,18 @@ func (r *topicStreamReaderImpl) onStopPartitionSessionRequestFromBuffer(
return err
}

onDone := trace.TopicOnReaderPartitionReadStopResponse(
r.cfg.Trace,
r.readConnectionID,
session.Context(),
session.Topic,
session.PartitionID,
session.partitionSessionID.ToInt64(),
msg.CommittedOffset.ToInt64(),
msg.Graceful,
var (
ctx = session.Context()
onDone = trace.TopicOnReaderPartitionReadStopResponse(
r.cfg.Trace,
r.readConnectionID,
&ctx,
session.Topic,
session.PartitionID,
session.partitionSessionID.ToInt64(),
msg.CommittedOffset.ToInt64(),
msg.Graceful,
)
)
defer func() {
onDone(err)
Expand Down Expand Up @@ -357,7 +360,7 @@ func (r *topicStreamReaderImpl) Commit(ctx context.Context, commitRange commitRa
session := commitRange.partitionSession
onDone := trace.TopicOnReaderCommit(
r.cfg.Trace,
ctx,
&ctx,
session.Topic,
session.PartitionID,
session.partitionSessionID.ToInt64(),
Expand Down Expand Up @@ -768,13 +771,16 @@ func (r *topicStreamReaderImpl) onStartPartitionSessionRequestFromBuffer(
return err
}

onDone := trace.TopicOnReaderPartitionReadStartResponse(
r.cfg.Trace,
r.readConnectionID,
session.Context(),
session.Topic,
session.PartitionID,
session.partitionSessionID.ToInt64(),
var (
ctx = session.Context()
onDone = trace.TopicOnReaderPartitionReadStartResponse(
r.cfg.Trace,
r.readConnectionID,
&ctx,
session.Topic,
session.PartitionID,
session.partitionSessionID.ToInt64(),
)
)

respMessage := &rawtopicreader.StartPartitionSessionResponse{
Expand Down
10 changes: 5 additions & 5 deletions internal/topic/topicreaderinternal/stream_reader_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ func TestStreamReaderImpl_OnPartitionCloseHandle(t *testing.T) {
e.reader.cfg.Trace.OnReaderPartitionReadStopResponse = func(info trace.TopicReaderPartitionReadStopResponseStartInfo) func(doneInfo trace.TopicReaderPartitionReadStopResponseDoneInfo) { //nolint:lll
expected := trace.TopicReaderPartitionReadStopResponseStartInfo{
ReaderConnectionID: e.reader.readConnectionID,
PartitionContext: e.partitionSession.ctx,
PartitionContext: &e.partitionSession.ctx,
Topic: e.partitionSession.Topic,
PartitionID: e.partitionSession.PartitionID,
PartitionSessionID: e.partitionSession.partitionSessionID.ToInt64(),
Expand All @@ -388,7 +388,7 @@ func TestStreamReaderImpl_OnPartitionCloseHandle(t *testing.T) {
}
require.Equal(t, expected, info)

require.NoError(t, info.PartitionContext.Err())
require.NoError(t, (*info.PartitionContext).Err())

readMessagesCtxCancel()

Expand Down Expand Up @@ -424,15 +424,15 @@ func TestStreamReaderImpl_OnPartitionCloseHandle(t *testing.T) {
e.reader.cfg.Trace.OnReaderPartitionReadStopResponse = func(info trace.TopicReaderPartitionReadStopResponseStartInfo) func(doneInfo trace.TopicReaderPartitionReadStopResponseDoneInfo) { //nolint:lll
expected := trace.TopicReaderPartitionReadStopResponseStartInfo{
ReaderConnectionID: e.reader.readConnectionID,
PartitionContext: e.partitionSession.ctx,
PartitionContext: &e.partitionSession.ctx,
Topic: e.partitionSession.Topic,
PartitionID: e.partitionSession.PartitionID,
PartitionSessionID: e.partitionSession.partitionSessionID.ToInt64(),
CommittedOffset: committedOffset,
Graceful: false,
}
require.Equal(t, expected, info)
require.Error(t, info.PartitionContext.Err())
require.Error(t, (*info.PartitionContext).Err())

readMessagesCtxCancel()

Expand Down Expand Up @@ -954,7 +954,7 @@ func TestTopicStreamReadImpl_CommitWithBadSession(t *testing.T) {
}

type streamEnv struct {
ctx context.Context
ctx context.Context //nolint:containedctx
t testing.TB
reader *topicStreamReaderImpl
stopReadEvents empty.Chan
Expand Down
Loading

0 comments on commit d716a3b

Please sign in to comment.