Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enabled containedctx linter #1211

Merged
merged 2 commits into from
Apr 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,6 @@ linters-settings:
linters:
enable-all: true
disable:
- containedctx
- contextcheck
- cyclop
- depguard
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
* Refactored internals for enabling `containedctx` linter
* Fixed the hanging semaphore issue on coordination session reconnect

## v3.65.3
Expand Down
2 changes: 0 additions & 2 deletions driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ var _ Connection = (*Driver)(nil)

// Driver type provide access to YDB service clients
type Driver struct {
ctx context.Context // cancel while Driver.Close called.
ctxCancel context.CancelFunc

userInfo *dsn.UserInfo
Expand Down Expand Up @@ -311,7 +310,6 @@ func newConnectionFromOptions(ctx context.Context, opts ...Option) (_ *Driver, e

d := &Driver{
children: make(map[uint64]*Driver),
ctx: ctx,
ctxCancel: driverCtxCancel,
}

Expand Down
4 changes: 2 additions & 2 deletions examples/topic/topicreader/topicreader_trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func ExplicitPartitionStartStopHandler(ctx context.Context, db *ydb.Driver) {
) func(
trace.TopicReaderPartitionReadStartResponseDoneInfo,
) {
err := externalSystemLock(info.PartitionContext, info.Topic, info.PartitionID)
err := externalSystemLock(*info.PartitionContext, info.Topic, info.PartitionID)
if err != nil {
stopReader()
}
Expand Down Expand Up @@ -105,7 +105,7 @@ func PartitionStartStopHandlerAndOwnReadProgressStorage(ctx context.Context, db
) func(
trace.TopicReaderPartitionReadStartResponseDoneInfo,
) {
err := externalSystemLock(info.PartitionContext, info.Topic, info.PartitionID)
err := externalSystemLock(*info.PartitionContext, info.Topic, info.PartitionID)
if err != nil {
stopReader()
}
Expand Down
2 changes: 1 addition & 1 deletion internal/background/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ var (

// A Worker must not be copied after first use
type Worker struct {
ctx context.Context
ctx context.Context //nolint:containedctx
workers sync.WaitGroup
closeReason error
tasksCompleted empty.Chan
Expand Down
1 change: 0 additions & 1 deletion internal/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,6 @@ func (c *conn) NewStream(

return &grpcClientStream{
ClientStream: s,
ctx: ctx,
c: c,
wrapping: useWrapping,
traceID: traceID,
Expand Down
29 changes: 17 additions & 12 deletions internal/conn/grpc_client_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (

type grpcClientStream struct {
grpc.ClientStream
ctx context.Context
c *conn
wrapping bool
traceID string
Expand All @@ -25,9 +24,11 @@ type grpcClientStream struct {
}

func (s *grpcClientStream) CloseSend() (err error) {
ctx := s.ctx
onDone := trace.DriverOnConnStreamCloseSend(s.c.config.Trace(), &ctx,
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/conn.(*grpcClientStream).CloseSend"),
var (
ctx = s.Context()
onDone = trace.DriverOnConnStreamCloseSend(s.c.config.Trace(), &ctx,
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/conn.(*grpcClientStream).CloseSend"),
)
)
defer func() {
onDone(err)
Expand Down Expand Up @@ -60,9 +61,11 @@ func (s *grpcClientStream) CloseSend() (err error) {
}

func (s *grpcClientStream) SendMsg(m interface{}) (err error) {
ctx := s.ctx
onDone := trace.DriverOnConnStreamSendMsg(s.c.config.Trace(), &ctx,
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/conn.(*grpcClientStream).SendMsg"),
var (
ctx = s.Context()
onDone = trace.DriverOnConnStreamSendMsg(s.c.config.Trace(), &ctx,
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/conn.(*grpcClientStream).SendMsg"),
)
)
defer func() {
onDone(err)
Expand All @@ -79,7 +82,7 @@ func (s *grpcClientStream) SendMsg(m interface{}) (err error) {
}

defer func() {
s.c.onTransportError(s.Context(), err)
s.c.onTransportError(ctx, err)
}()

if s.wrapping {
Expand All @@ -103,9 +106,11 @@ func (s *grpcClientStream) SendMsg(m interface{}) (err error) {
}

func (s *grpcClientStream) RecvMsg(m interface{}) (err error) {
ctx := s.ctx
onDone := trace.DriverOnConnStreamRecvMsg(s.c.config.Trace(), &ctx,
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/conn.(*grpcClientStream).RecvMsg"),
var (
ctx = s.Context()
onDone = trace.DriverOnConnStreamRecvMsg(s.c.config.Trace(), &ctx,
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/conn.(*grpcClientStream).RecvMsg"),
)
)
defer func() {
onDone(err)
Expand All @@ -130,7 +135,7 @@ func (s *grpcClientStream) RecvMsg(m interface{}) (err error) {

defer func() {
if !xerrors.Is(err, io.EOF) {
s.c.onTransportError(s.Context(), err)
s.c.onTransportError(ctx, err)
}
}()

Expand Down
2 changes: 1 addition & 1 deletion internal/coordination/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func TestDescribeNodeRequest(t *testing.T) {
func TestOperationParams(t *testing.T) {
for _, tt := range []struct {
name string
ctx context.Context
ctx context.Context //nolint:containedctx
config interface {
OperationTimeout() time.Duration
OperationCancelAfter() time.Duration
Expand Down
4 changes: 2 additions & 2 deletions internal/coordination/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type session struct {
options *options.CreateSessionOptions
client *Client

ctx context.Context
ctx context.Context //nolint:containedctx
cancel context.CancelFunc
sessionClosedChan chan struct{}
controller *conversation.Controller
Expand All @@ -37,7 +37,7 @@ type session struct {
type lease struct {
session *session
name string
ctx context.Context
ctx context.Context //nolint:containedctx
cancel context.CancelFunc
}

Expand Down
2 changes: 1 addition & 1 deletion internal/meta/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
func TestContext(t *testing.T) {
for _, tt := range []struct {
name string
ctx context.Context
ctx context.Context //nolint:containedctx
header string
values []string
}{
Expand Down
2 changes: 1 addition & 1 deletion internal/operation/params_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (

func TestParams(t *testing.T) {
for _, tt := range []struct {
ctx context.Context
ctx context.Context //nolint:containedctx
preferContextTimeout bool
timeout time.Duration
cancelAfter time.Duration
Expand Down
23 changes: 16 additions & 7 deletions internal/query/row.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
var _ query.Row = (*row)(nil)

type row struct {
ctx context.Context
ctx context.Context //nolint:containedctx
trace *trace.Query

indexedScanner scanner.IndexedScanner
Expand All @@ -35,8 +35,11 @@ func newRow(ctx context.Context, columns []*Ydb.Column, v *Ydb.Value, t *trace.Q
}

func (r row) Scan(dst ...interface{}) (err error) {
onDone := trace.QueryOnRowScan(r.trace, &r.ctx,
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/query.row.Scan"),
var (
ctx = r.ctx
onDone = trace.QueryOnRowScan(r.trace, &ctx,
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/query.row.Scan"),
)
)
defer func() {
onDone(err)
Expand All @@ -46,8 +49,11 @@ func (r row) Scan(dst ...interface{}) (err error) {
}

func (r row) ScanNamed(dst ...scanner.NamedDestination) (err error) {
onDone := trace.QueryOnRowScanNamed(r.trace, &r.ctx,
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/query.row.ScanNamed"),
var (
ctx = r.ctx
onDone = trace.QueryOnRowScanNamed(r.trace, &ctx,
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/query.row.ScanNamed"),
)
)
defer func() {
onDone(err)
Expand All @@ -57,8 +63,11 @@ func (r row) ScanNamed(dst ...scanner.NamedDestination) (err error) {
}

func (r row) ScanStruct(dst interface{}, opts ...scanner.ScanStructOption) (err error) {
onDone := trace.QueryOnRowScanStruct(r.trace, &r.ctx,
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/query.row.ScanStruct"),
var (
ctx = r.ctx
onDone = trace.QueryOnRowScanStruct(r.trace, &ctx,
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/query.row.ScanStruct"),
)
)
defer func() {
onDone(err)
Expand Down
2 changes: 1 addition & 1 deletion internal/table/scanner/result_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func NewResultSet(a *allocator.Allocator, opts ...ResultSetOption) *Ydb.ResultSe

func TestNewStreamWithRecvFirstResultSet(t *testing.T) {
for _, tt := range []struct {
ctx context.Context
ctx context.Context //nolint:containedctx
recvCounter int
err error
}{
Expand Down
2 changes: 1 addition & 1 deletion internal/topic/topicreaderinternal/partition_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type partitionSession struct {
readerID int64
connectionID string

ctx context.Context
ctx context.Context //nolint:containedctx
ctxCancel context.CancelFunc
partitionSessionID rawtopicreader.PartitionSessionID

Expand Down
1 change: 0 additions & 1 deletion internal/topic/topicreaderinternal/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ func NewReader(
cfg.OperationTimeout(),
cfg.RetrySettings,
cfg.Trace,
cfg.BaseContext,
),
defaultBatchConfig: cfg.DefaultBatchConfig,
tracer: cfg.Trace,
Expand Down
46 changes: 26 additions & 20 deletions internal/topic/topicreaderinternal/stream_reader_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type partitionSessionID = rawtopicreader.PartitionSessionID

type topicStreamReaderImpl struct {
cfg topicStreamReaderConfig
ctx context.Context
ctx context.Context //nolint:containedctx
cancel context.CancelFunc

freeBytes chan int
Expand All @@ -60,7 +60,7 @@ type topicStreamReaderImpl struct {
type topicStreamReaderConfig struct {
CommitterBatchTimeLag time.Duration
CommitterBatchCounterTrigger int
BaseContext context.Context
BaseContext context.Context //nolint:containedctx
BufferSizeProtoBytes int
Cred credentials.Credentials
CredUpdateInterval time.Duration
Expand Down Expand Up @@ -179,7 +179,7 @@ func (r *topicStreamReaderImpl) ReadMessageBatch(
) (batch *PublicBatch, err error) {
onDone := trace.TopicOnReaderReadMessages(
r.cfg.Trace,
ctx,
&ctx,
opts.MinCount,
opts.MaxCount,
r.getRestBufferBytes(),
Expand Down Expand Up @@ -295,15 +295,18 @@ func (r *topicStreamReaderImpl) onStopPartitionSessionRequestFromBuffer(
return err
}

onDone := trace.TopicOnReaderPartitionReadStopResponse(
r.cfg.Trace,
r.readConnectionID,
session.Context(),
session.Topic,
session.PartitionID,
session.partitionSessionID.ToInt64(),
msg.CommittedOffset.ToInt64(),
msg.Graceful,
var (
ctx = session.Context()
onDone = trace.TopicOnReaderPartitionReadStopResponse(
r.cfg.Trace,
r.readConnectionID,
&ctx,
session.Topic,
session.PartitionID,
session.partitionSessionID.ToInt64(),
msg.CommittedOffset.ToInt64(),
msg.Graceful,
)
)
defer func() {
onDone(err)
Expand Down Expand Up @@ -357,7 +360,7 @@ func (r *topicStreamReaderImpl) Commit(ctx context.Context, commitRange commitRa
session := commitRange.partitionSession
onDone := trace.TopicOnReaderCommit(
r.cfg.Trace,
ctx,
&ctx,
session.Topic,
session.PartitionID,
session.partitionSessionID.ToInt64(),
Expand Down Expand Up @@ -768,13 +771,16 @@ func (r *topicStreamReaderImpl) onStartPartitionSessionRequestFromBuffer(
return err
}

onDone := trace.TopicOnReaderPartitionReadStartResponse(
r.cfg.Trace,
r.readConnectionID,
session.Context(),
session.Topic,
session.PartitionID,
session.partitionSessionID.ToInt64(),
var (
ctx = session.Context()
onDone = trace.TopicOnReaderPartitionReadStartResponse(
r.cfg.Trace,
r.readConnectionID,
&ctx,
session.Topic,
session.PartitionID,
session.partitionSessionID.ToInt64(),
)
)

respMessage := &rawtopicreader.StartPartitionSessionResponse{
Expand Down
10 changes: 5 additions & 5 deletions internal/topic/topicreaderinternal/stream_reader_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ func TestStreamReaderImpl_OnPartitionCloseHandle(t *testing.T) {
e.reader.cfg.Trace.OnReaderPartitionReadStopResponse = func(info trace.TopicReaderPartitionReadStopResponseStartInfo) func(doneInfo trace.TopicReaderPartitionReadStopResponseDoneInfo) { //nolint:lll
expected := trace.TopicReaderPartitionReadStopResponseStartInfo{
ReaderConnectionID: e.reader.readConnectionID,
PartitionContext: e.partitionSession.ctx,
PartitionContext: &e.partitionSession.ctx,
asmyasnikov marked this conversation as resolved.
Show resolved Hide resolved
Topic: e.partitionSession.Topic,
PartitionID: e.partitionSession.PartitionID,
PartitionSessionID: e.partitionSession.partitionSessionID.ToInt64(),
Expand All @@ -388,7 +388,7 @@ func TestStreamReaderImpl_OnPartitionCloseHandle(t *testing.T) {
}
require.Equal(t, expected, info)

require.NoError(t, info.PartitionContext.Err())
require.NoError(t, (*info.PartitionContext).Err())

readMessagesCtxCancel()

Expand Down Expand Up @@ -424,15 +424,15 @@ func TestStreamReaderImpl_OnPartitionCloseHandle(t *testing.T) {
e.reader.cfg.Trace.OnReaderPartitionReadStopResponse = func(info trace.TopicReaderPartitionReadStopResponseStartInfo) func(doneInfo trace.TopicReaderPartitionReadStopResponseDoneInfo) { //nolint:lll
expected := trace.TopicReaderPartitionReadStopResponseStartInfo{
ReaderConnectionID: e.reader.readConnectionID,
PartitionContext: e.partitionSession.ctx,
PartitionContext: &e.partitionSession.ctx,
asmyasnikov marked this conversation as resolved.
Show resolved Hide resolved
Topic: e.partitionSession.Topic,
PartitionID: e.partitionSession.PartitionID,
PartitionSessionID: e.partitionSession.partitionSessionID.ToInt64(),
CommittedOffset: committedOffset,
Graceful: false,
}
require.Equal(t, expected, info)
require.Error(t, info.PartitionContext.Err())
require.Error(t, (*info.PartitionContext).Err())

readMessagesCtxCancel()

Expand Down Expand Up @@ -954,7 +954,7 @@ func TestTopicStreamReadImpl_CommitWithBadSession(t *testing.T) {
}

type streamEnv struct {
ctx context.Context
ctx context.Context //nolint:containedctx
t testing.TB
reader *topicStreamReaderImpl
stopReadEvents empty.Chan
Expand Down
Loading
Loading