diff --git a/.golangci.yml b/.golangci.yml index e2100ce23..102d0e3ed 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -218,7 +218,6 @@ linters-settings: linters: enable-all: true disable: - - containedctx - contextcheck - cyclop - depguard diff --git a/CHANGELOG.md b/CHANGELOG.md index 03957b2fc..a6d38dea6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,14 @@ +## v3.66.0 +* Added experimental package `retry/budget` for limit second and subsequent retry attempts +* Refactored internals for enabling `containedctx` linter +* Fixed the hanging semaphore issue on coordination session reconnect + +## v3.65.3 +* Fixed data race in `internal/conn.grpcClientStream` + +## v3.65.2 +* Fixed data race using `log.WithNames` + ## v3.65.1 * Updated dependency `ydb-go-genproto` * Added processing of `Ydb.StatusIds_EXTERNAL_ERROR` in `retry.Retry` diff --git a/config/config.go b/config/config.go index 28ffd0f90..370dd283d 100644 --- a/config/config.go +++ b/config/config.go @@ -12,6 +12,7 @@ import ( balancerConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/balancer/config" "github.com/ydb-platform/ydb-go-sdk/v3/internal/config" "github.com/ydb-platform/ydb-go-sdk/v3/internal/meta" + "github.com/ydb-platform/ydb-go-sdk/v3/retry/budget" "github.com/ydb-platform/ydb-go-sdk/v3/trace" ) @@ -157,6 +158,13 @@ func WithTrace(t trace.Driver, opts ...trace.DriverComposeOption) Option { //nol } } +// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental +func WithRetryBudget(b budget.Budget) Option { + return func(c *Config) { + config.SetRetryBudget(&c.Common, b) + } +} + func WithTraceRetry(t *trace.Retry, opts ...trace.RetryComposeOption) Option { return func(c *Config) { config.SetTraceRetry(&c.Common, t, opts...) diff --git a/driver.go b/driver.go index 592466443..170e2f86d 100644 --- a/driver.go +++ b/driver.go @@ -51,7 +51,6 @@ var _ Connection = (*Driver)(nil) // Driver type provide access to YDB service clients type Driver struct { - ctx context.Context // cancel while Driver.Close called. ctxCancel context.CancelFunc userInfo *dsn.UserInfo @@ -311,7 +310,6 @@ func newConnectionFromOptions(ctx context.Context, opts ...Option) (_ *Driver, e d := &Driver{ children: make(map[uint64]*Driver), - ctx: ctx, ctxCancel: driverCtxCancel, } diff --git a/examples/go.mod b/examples/go.mod index 0e3ecce2c..29d8cfcfc 100644 --- a/examples/go.mod +++ b/examples/go.mod @@ -53,10 +53,10 @@ require ( github.com/yandex-cloud/go-genproto v0.0.0-20220815090733-4c139c0154e2 // indirect github.com/ydb-platform/ydb-go-genproto v0.0.0-20240316140903-4a47abca1cca // indirect github.com/ydb-platform/ydb-go-yc-metadata v0.5.4 // indirect - golang.org/x/crypto v0.17.0 // indirect + golang.org/x/crypto v0.21.0 // indirect golang.org/x/mod v0.11.0 // indirect - golang.org/x/net v0.17.0 // indirect - golang.org/x/sys v0.15.0 // indirect + golang.org/x/net v0.23.0 // indirect + golang.org/x/sys v0.18.0 // indirect golang.org/x/text v0.14.0 // indirect golang.org/x/tools v0.7.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20230525234035-dd9d682886f9 // indirect diff --git a/examples/go.sum b/examples/go.sum index f3fbe27f4..e65a7e091 100644 --- a/examples/go.sum +++ b/examples/go.sum @@ -1270,8 +1270,8 @@ golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa/go.mod h1:GvvjBRRGRdwPK5y golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw= golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= -golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k= -golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= +golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA= +golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -1401,8 +1401,9 @@ golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= -golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= +golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs= +golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -1552,8 +1553,8 @@ golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= -golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= +golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= diff --git a/examples/topic/topicreader/topicreader_trace.go b/examples/topic/topicreader/topicreader_trace.go index b31390fb2..0a332efa8 100644 --- a/examples/topic/topicreader/topicreader_trace.go +++ b/examples/topic/topicreader/topicreader_trace.go @@ -40,7 +40,7 @@ func ExplicitPartitionStartStopHandler(ctx context.Context, db *ydb.Driver) { ) func( trace.TopicReaderPartitionReadStartResponseDoneInfo, ) { - err := externalSystemLock(info.PartitionContext, info.Topic, info.PartitionID) + err := externalSystemLock(*info.PartitionContext, info.Topic, info.PartitionID) if err != nil { stopReader() } @@ -105,7 +105,7 @@ func PartitionStartStopHandlerAndOwnReadProgressStorage(ctx context.Context, db ) func( trace.TopicReaderPartitionReadStartResponseDoneInfo, ) { - err := externalSystemLock(info.PartitionContext, info.Topic, info.PartitionID) + err := externalSystemLock(*info.PartitionContext, info.Topic, info.PartitionID) if err != nil { stopReader() } diff --git a/go.mod b/go.mod index 6f67d5b64..6b27ea49d 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/google/uuid v1.3.0 github.com/jonboulle/clockwork v0.3.0 github.com/ydb-platform/ydb-go-genproto v0.0.0-20240316140903-4a47abca1cca - golang.org/x/net v0.17.0 + golang.org/x/net v0.23.0 golang.org/x/sync v0.3.0 golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 google.golang.org/grpc v1.57.1 @@ -25,8 +25,8 @@ require ( github.com/davecgh/go-spew v1.1.0 // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - golang.org/x/sys v0.13.0 // indirect - golang.org/x/text v0.13.0 // indirect + golang.org/x/sys v0.18.0 // indirect + golang.org/x/text v0.14.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19 // indirect gopkg.in/yaml.v3 v3.0.0 // indirect ) diff --git a/go.sum b/go.sum index 8d8988e36..c8dbabbb6 100644 --- a/go.sum +++ b/go.sum @@ -84,8 +84,8 @@ golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= -golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= +golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs= +golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -100,12 +100,12 @@ golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= -golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= +golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= -golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= diff --git a/internal/background/worker.go b/internal/background/worker.go index 91c0d1686..c41ad4b0a 100644 --- a/internal/background/worker.go +++ b/internal/background/worker.go @@ -19,7 +19,7 @@ var ( // A Worker must not be copied after first use type Worker struct { - ctx context.Context + ctx context.Context //nolint:containedctx workers sync.WaitGroup closeReason error tasksCompleted empty.Chan diff --git a/internal/backoff/delay.go b/internal/backoff/delay.go new file mode 100644 index 000000000..21b07522d --- /dev/null +++ b/internal/backoff/delay.go @@ -0,0 +1,43 @@ +package backoff + +import ( + "time" +) + +type ( + delayOptions struct { + fast Backoff + slow Backoff + } + delayOption func(o *delayOptions) +) + +func WithFastBackoff(fast Backoff) delayOption { + return func(o *delayOptions) { + o.fast = fast + } +} + +func WithSlowBackoff(slow Backoff) delayOption { + return func(o *delayOptions) { + o.slow = slow + } +} + +func Delay(t Type, i int, opts ...delayOption) time.Duration { + optionsHolder := delayOptions{ + fast: Fast, + slow: Slow, + } + for _, opt := range opts { + opt(&optionsHolder) + } + switch t { + case TypeFast: + return optionsHolder.fast.Delay(i) + case TypeSlow: + return optionsHolder.slow.Delay(i) + default: + return 0 + } +} diff --git a/internal/backoff/delay_test.go b/internal/backoff/delay_test.go new file mode 100644 index 000000000..7c813b89a --- /dev/null +++ b/internal/backoff/delay_test.go @@ -0,0 +1,92 @@ +package backoff + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/ydb-platform/ydb-go-sdk/v3/internal/xtest" +) + +func TestDelay(t *testing.T) { + for _, tt := range []struct { + name string + act time.Duration + exp time.Duration + }{ + { + name: xtest.CurrentFileLine(), + act: Delay(TypeNoBackoff, 0), + exp: 0, + }, + { + name: xtest.CurrentFileLine(), + act: Delay(TypeNoBackoff, 1), + exp: 0, + }, + { + name: xtest.CurrentFileLine(), + act: Delay(TypeNoBackoff, 2), + exp: 0, + }, + { + name: xtest.CurrentFileLine(), + act: Delay(TypeFast, 0, WithFastBackoff(New( + WithSlotDuration(fastSlot), + WithCeiling(6), + WithJitterLimit(1), + ))), + exp: 5 * time.Millisecond, + }, + { + name: xtest.CurrentFileLine(), + act: Delay(TypeFast, 1, WithFastBackoff(New( + WithSlotDuration(fastSlot), + WithCeiling(6), + WithJitterLimit(1), + ))), + exp: 10 * time.Millisecond, + }, + { + name: xtest.CurrentFileLine(), + act: Delay(TypeFast, 3, WithFastBackoff(New( + WithSlotDuration(fastSlot), + WithCeiling(6), + WithJitterLimit(1), + ))), + exp: 40 * time.Millisecond, + }, + { + name: xtest.CurrentFileLine(), + act: Delay(TypeSlow, 0, WithSlowBackoff(New( + WithSlotDuration(slowSlot), + WithCeiling(6), + WithJitterLimit(1), + ))), + exp: time.Second, + }, + { + name: xtest.CurrentFileLine(), + act: Delay(TypeSlow, 1, WithSlowBackoff(New( + WithSlotDuration(slowSlot), + WithCeiling(6), + WithJitterLimit(1), + ))), + exp: 2 * time.Second, + }, + { + name: xtest.CurrentFileLine(), + act: Delay(TypeSlow, 3, WithSlowBackoff(New( + WithSlotDuration(slowSlot), + WithCeiling(6), + WithJitterLimit(1), + ))), + exp: 8 * time.Second, + }, + } { + t.Run(tt.name, func(t *testing.T) { + require.Equal(t, tt.exp, tt.act) + }) + } +} diff --git a/internal/balancer/balancer.go b/internal/balancer/balancer.go index f69ec11e2..b33f7266a 100644 --- a/internal/balancer/balancer.go +++ b/internal/balancer/balancer.go @@ -89,6 +89,7 @@ func (b *Balancer) clusterDiscovery(ctx context.Context) (err error) { }, retry.WithIdempotent(true), retry.WithTrace(b.driverConfig.TraceRetry()), + retry.WithBudget(b.driverConfig.RetryBudget()), ) } diff --git a/internal/config/config.go b/internal/config/config.go index dcf89c4b7..051571bbd 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -3,14 +3,18 @@ package config import ( "time" + "github.com/ydb-platform/ydb-go-sdk/v3/retry/budget" "github.com/ydb-platform/ydb-go-sdk/v3/trace" ) +var defaultRetryBudget = budget.Limited(-1) + type Common struct { operationTimeout time.Duration operationCancelAfter time.Duration disableAutoRetry bool traceRetry trace.Retry + retryBudget budget.Budget panicCallback func(e interface{}) } @@ -48,6 +52,14 @@ func (c *Common) TraceRetry() *trace.Retry { return &c.traceRetry } +func (c *Common) RetryBudget() budget.Budget { + if c.retryBudget == nil { + return defaultRetryBudget + } + + return c.retryBudget +} + // SetOperationTimeout define the maximum amount of time a YDB server will process // an operation. After timeout exceeds YDB will try to cancel operation and // regardless of the cancellation appropriate error will be returned to @@ -81,3 +93,7 @@ func SetAutoRetry(c *Common, autoRetry bool) { func SetTraceRetry(c *Common, t *trace.Retry, opts ...trace.RetryComposeOption) { c.traceRetry = *c.traceRetry.Compose(t, opts...) } + +func SetRetryBudget(c *Common, b budget.Budget) { + c.retryBudget = b +} diff --git a/internal/conn/conn.go b/internal/conn/conn.go index 8192e38e7..d7e05bd42 100644 --- a/internal/conn/conn.go +++ b/internal/conn/conn.go @@ -462,7 +462,6 @@ func (c *conn) NewStream( return &grpcClientStream{ ClientStream: s, - ctx: ctx, c: c, wrapping: useWrapping, traceID: traceID, diff --git a/internal/conn/grpc_client_stream.go b/internal/conn/grpc_client_stream.go index 32377e5ab..ea7c08cfe 100644 --- a/internal/conn/grpc_client_stream.go +++ b/internal/conn/grpc_client_stream.go @@ -16,7 +16,6 @@ import ( type grpcClientStream struct { grpc.ClientStream - ctx context.Context c *conn wrapping bool traceID string @@ -25,8 +24,11 @@ type grpcClientStream struct { } func (s *grpcClientStream) CloseSend() (err error) { - onDone := trace.DriverOnConnStreamCloseSend(s.c.config.Trace(), &s.ctx, - stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/conn.(*grpcClientStream).CloseSend"), + var ( + ctx = s.Context() + onDone = trace.DriverOnConnStreamCloseSend(s.c.config.Trace(), &ctx, + stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/conn.(*grpcClientStream).CloseSend"), + ) ) defer func() { onDone(err) @@ -59,8 +61,11 @@ func (s *grpcClientStream) CloseSend() (err error) { } func (s *grpcClientStream) SendMsg(m interface{}) (err error) { - onDone := trace.DriverOnConnStreamSendMsg(s.c.config.Trace(), &s.ctx, - stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/conn.(*grpcClientStream).SendMsg"), + var ( + ctx = s.Context() + onDone = trace.DriverOnConnStreamSendMsg(s.c.config.Trace(), &ctx, + stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/conn.(*grpcClientStream).SendMsg"), + ) ) defer func() { onDone(err) @@ -77,7 +82,7 @@ func (s *grpcClientStream) SendMsg(m interface{}) (err error) { } defer func() { - s.c.onTransportError(s.Context(), err) + s.c.onTransportError(ctx, err) }() if s.wrapping { @@ -101,8 +106,11 @@ func (s *grpcClientStream) SendMsg(m interface{}) (err error) { } func (s *grpcClientStream) RecvMsg(m interface{}) (err error) { - onDone := trace.DriverOnConnStreamRecvMsg(s.c.config.Trace(), &s.ctx, - stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/conn.(*grpcClientStream).RecvMsg"), + var ( + ctx = s.Context() + onDone = trace.DriverOnConnStreamRecvMsg(s.c.config.Trace(), &ctx, + stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/conn.(*grpcClientStream).RecvMsg"), + ) ) defer func() { onDone(err) @@ -114,7 +122,7 @@ func (s *grpcClientStream) RecvMsg(m interface{}) (err error) { defer func() { if err != nil { md := s.ClientStream.Trailer() - s.onDone(s.ctx, md) + s.onDone(ctx, md) } }() @@ -127,7 +135,7 @@ func (s *grpcClientStream) RecvMsg(m interface{}) (err error) { defer func() { if !xerrors.Is(err, io.EOF) { - s.c.onTransportError(s.Context(), err) + s.c.onTransportError(ctx, err) } }() diff --git a/internal/coordination/client.go b/internal/coordination/client.go index e7c194049..c3685cc64 100644 --- a/internal/coordination/client.go +++ b/internal/coordination/client.go @@ -110,9 +110,15 @@ func (c *Client) CreateNode(ctx context.Context, path string, config coordinatio return createNode(ctx, c.client, request) } - return retry.Retry(ctx, func(ctx context.Context) error { - return createNode(ctx, c.client, request) - }, retry.WithStackTrace(), retry.WithIdempotent(true), retry.WithTrace(c.config.TraceRetry())) + return retry.Retry(ctx, + func(ctx context.Context) error { + return createNode(ctx, c.client, request) + }, + retry.WithStackTrace(), + retry.WithIdempotent(true), + retry.WithTrace(c.config.TraceRetry()), + retry.WithBudget(c.config.RetryBudget()), + ) } func (c *Client) AlterNode(ctx context.Context, path string, config coordination.NodeConfig) (finalErr error) { @@ -137,9 +143,15 @@ func (c *Client) AlterNode(ctx context.Context, path string, config coordination return xerrors.WithStackTrace(call(ctx)) } - return retry.Retry(ctx, func(ctx context.Context) (err error) { - return alterNode(ctx, c.client, request) - }, retry.WithStackTrace(), retry.WithIdempotent(true), retry.WithTrace(c.config.TraceRetry())) + return retry.Retry(ctx, + func(ctx context.Context) (err error) { + return alterNode(ctx, c.client, request) + }, + retry.WithStackTrace(), + retry.WithIdempotent(true), + retry.WithTrace(c.config.TraceRetry()), + retry.WithBudget(c.config.RetryBudget()), + ) } func alterNodeRequest( @@ -192,9 +204,15 @@ func (c *Client) DropNode(ctx context.Context, path string) (finalErr error) { return xerrors.WithStackTrace(call(ctx)) } - return retry.Retry(ctx, func(ctx context.Context) (err error) { - return dropNode(ctx, c.client, request) - }, retry.WithStackTrace(), retry.WithIdempotent(true), retry.WithTrace(c.config.TraceRetry())) + return retry.Retry(ctx, + func(ctx context.Context) (err error) { + return dropNode(ctx, c.client, request) + }, + retry.WithStackTrace(), + retry.WithIdempotent(true), + retry.WithTrace(c.config.TraceRetry()), + retry.WithBudget(c.config.RetryBudget()), + ) } func dropNodeRequest(path string, operationParams *Ydb_Operations.OperationParams) *Ydb_Coordination.DropNodeRequest { @@ -241,14 +259,20 @@ func (c *Client) DescribeNode( return describeNode(ctx, c.client, request) } - err := retry.Retry(ctx, func(ctx context.Context) (err error) { - entry, config, err = describeNode(ctx, c.client, request) - if err != nil { - return xerrors.WithStackTrace(err) - } + err := retry.Retry(ctx, + func(ctx context.Context) (err error) { + entry, config, err = describeNode(ctx, c.client, request) + if err != nil { + return xerrors.WithStackTrace(err) + } - return nil - }, retry.WithStackTrace(), retry.WithIdempotent(true), retry.WithTrace(c.config.TraceRetry())) + return nil + }, + retry.WithStackTrace(), + retry.WithIdempotent(true), + retry.WithTrace(c.config.TraceRetry()), + retry.WithBudget(c.config.RetryBudget()), + ) if err != nil { return nil, nil, xerrors.WithStackTrace(err) } diff --git a/internal/coordination/client_test.go b/internal/coordination/client_test.go index d7778be95..dd1b46f76 100644 --- a/internal/coordination/client_test.go +++ b/internal/coordination/client_test.go @@ -128,7 +128,7 @@ func TestDescribeNodeRequest(t *testing.T) { func TestOperationParams(t *testing.T) { for _, tt := range []struct { name string - ctx context.Context + ctx context.Context //nolint:containedctx config interface { OperationTimeout() time.Duration OperationCancelAfter() time.Duration diff --git a/internal/coordination/conversation/conversation.go b/internal/coordination/conversation/conversation.go index c3c8e1863..83e170532 100644 --- a/internal/coordination/conversation/conversation.go +++ b/internal/coordination/conversation/conversation.go @@ -424,7 +424,12 @@ func (c *Controller) OnAttach() { delete(c.conflicts, req.conflictKey) } - req.requestSent = nil + // If the request has been canceled, re-send the cancellation message, otherwise re-send the original one. + if req.canceled { + req.cancelRequestSent = nil + } else { + req.requestSent = nil + } notify = true } } diff --git a/internal/coordination/session.go b/internal/coordination/session.go index 9e7e2bb23..5c6a17ac5 100644 --- a/internal/coordination/session.go +++ b/internal/coordination/session.go @@ -23,7 +23,7 @@ type session struct { options *options.CreateSessionOptions client *Client - ctx context.Context + ctx context.Context //nolint:containedctx cancel context.CancelFunc sessionClosedChan chan struct{} controller *conversation.Controller @@ -37,7 +37,7 @@ type session struct { type lease struct { session *session name string - ctx context.Context + ctx context.Context //nolint:containedctx cancel context.CancelFunc } diff --git a/internal/meta/context_test.go b/internal/meta/context_test.go index 1bfdabcc5..6d3ae817e 100644 --- a/internal/meta/context_test.go +++ b/internal/meta/context_test.go @@ -11,7 +11,7 @@ import ( func TestContext(t *testing.T) { for _, tt := range []struct { name string - ctx context.Context + ctx context.Context //nolint:containedctx header string values []string }{ diff --git a/internal/operation/params_test.go b/internal/operation/params_test.go index 605453279..74b136b96 100644 --- a/internal/operation/params_test.go +++ b/internal/operation/params_test.go @@ -14,7 +14,7 @@ import ( func TestParams(t *testing.T) { for _, tt := range []struct { - ctx context.Context + ctx context.Context //nolint:containedctx preferContextTimeout bool timeout time.Duration cancelAfter time.Duration diff --git a/internal/query/options/retry.go b/internal/query/options/retry.go index b604152e3..a5fe4f8d2 100644 --- a/internal/query/options/retry.go +++ b/internal/query/options/retry.go @@ -3,16 +3,15 @@ package options import ( "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/tx" "github.com/ydb-platform/ydb-go-sdk/v3/retry" + "github.com/ydb-platform/ydb-go-sdk/v3/retry/budget" "github.com/ydb-platform/ydb-go-sdk/v3/trace" ) var ( - _ DoOption = idempotentOption{} - _ DoOption = labelOption("") + _ DoOption = retryOptionsOption(nil) _ DoOption = traceOption{} - _ DoTxOption = idempotentOption{} - _ DoTxOption = labelOption("") + _ DoTxOption = retryOptionsOption(nil) _ DoTxOption = traceOption{} _ DoTxOption = doTxSettingsOption{} ) @@ -36,9 +35,8 @@ type ( txSettings tx.Settings } - idempotentOption struct{} - labelOption string - traceOption struct { + retryOptionsOption []retry.Option + traceOption struct { t *trace.Query } doTxSettingsOption struct { @@ -62,14 +60,6 @@ func (s *doTxSettings) TxSettings() tx.Settings { return s.txSettings } -func (opt idempotentOption) applyDoTxOption(s *doTxSettings) { - s.doOpts = append(s.doOpts, opt) -} - -func (idempotentOption) applyDoOption(s *doSettings) { - s.retryOpts = append(s.retryOpts, retry.WithIdempotent(true)) -} - func (opt traceOption) applyDoOption(s *doSettings) { s.trace = s.trace.Compose(opt.t) } @@ -78,12 +68,12 @@ func (opt traceOption) applyDoTxOption(s *doTxSettings) { s.doOpts = append(s.doOpts, opt) } -func (opt labelOption) applyDoOption(s *doSettings) { - s.retryOpts = append(s.retryOpts, retry.WithLabel(string(opt))) +func (opts retryOptionsOption) applyDoOption(s *doSettings) { + s.retryOpts = append(s.retryOpts, opts...) } -func (opt labelOption) applyDoTxOption(s *doTxSettings) { - s.doOpts = append(s.doOpts, opt) +func (opts retryOptionsOption) applyDoTxOption(s *doTxSettings) { + s.doOpts = append(s.doOpts, opts) } func (opt doTxSettingsOption) applyDoTxOption(opts *doTxSettings) { @@ -94,18 +84,22 @@ func WithTxSettings(txSettings tx.Settings) doTxSettingsOption { return doTxSettingsOption{txSettings: txSettings} } -func WithIdempotent() idempotentOption { - return idempotentOption{} +func WithIdempotent() retryOptionsOption { + return []retry.Option{retry.WithIdempotent(true)} } -func WithLabel(lbl string) labelOption { - return labelOption(lbl) +func WithLabel(lbl string) retryOptionsOption { + return []retry.Option{retry.WithLabel(lbl)} } func WithTrace(t *trace.Query) traceOption { return traceOption{t: t} } +func WithRetryBudget(b budget.Budget) retryOptionsOption { + return []retry.Option{retry.WithBudget(b)} +} + func ParseDoOpts(t *trace.Query, opts ...DoOption) (s *doSettings) { s = &doSettings{ trace: t, diff --git a/internal/query/row.go b/internal/query/row.go index 476b6aa14..a9db3f61b 100644 --- a/internal/query/row.go +++ b/internal/query/row.go @@ -14,7 +14,7 @@ import ( var _ query.Row = (*row)(nil) type row struct { - ctx context.Context + ctx context.Context //nolint:containedctx trace *trace.Query indexedScanner scanner.IndexedScanner @@ -35,8 +35,11 @@ func newRow(ctx context.Context, columns []*Ydb.Column, v *Ydb.Value, t *trace.Q } func (r row) Scan(dst ...interface{}) (err error) { - onDone := trace.QueryOnRowScan(r.trace, &r.ctx, - stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/query.row.Scan"), + var ( + ctx = r.ctx + onDone = trace.QueryOnRowScan(r.trace, &ctx, + stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/query.row.Scan"), + ) ) defer func() { onDone(err) @@ -46,8 +49,11 @@ func (r row) Scan(dst ...interface{}) (err error) { } func (r row) ScanNamed(dst ...scanner.NamedDestination) (err error) { - onDone := trace.QueryOnRowScanNamed(r.trace, &r.ctx, - stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/query.row.ScanNamed"), + var ( + ctx = r.ctx + onDone = trace.QueryOnRowScanNamed(r.trace, &ctx, + stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/query.row.ScanNamed"), + ) ) defer func() { onDone(err) @@ -57,8 +63,11 @@ func (r row) ScanNamed(dst ...scanner.NamedDestination) (err error) { } func (r row) ScanStruct(dst interface{}, opts ...scanner.ScanStructOption) (err error) { - onDone := trace.QueryOnRowScanStruct(r.trace, &r.ctx, - stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/query.row.ScanStruct"), + var ( + ctx = r.ctx + onDone = trace.QueryOnRowScanStruct(r.trace, &ctx, + stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/query.row.ScanStruct"), + ) ) defer func() { onDone(err) diff --git a/internal/ratelimiter/client.go b/internal/ratelimiter/client.go index eec07b118..c1db4c0ce 100644 --- a/internal/ratelimiter/client.go +++ b/internal/ratelimiter/client.go @@ -63,6 +63,7 @@ func (c *Client) CreateResource( retry.WithStackTrace(), retry.WithIdempotent(true), retry.WithTrace(c.config.TraceRetry()), + retry.WithBudget(c.config.RetryBudget()), ) } @@ -112,6 +113,7 @@ func (c *Client) AlterResource( retry.WithStackTrace(), retry.WithIdempotent(true), retry.WithTrace(c.config.TraceRetry()), + retry.WithBudget(c.config.RetryBudget()), ) } @@ -161,6 +163,7 @@ func (c *Client) DropResource( retry.WithStackTrace(), retry.WithIdempotent(true), retry.WithTrace(c.config.TraceRetry()), + retry.WithBudget(c.config.RetryBudget()), ) } @@ -206,6 +209,7 @@ func (c *Client) ListResource( retry.WithIdempotent(true), retry.WithStackTrace(), retry.WithTrace(c.config.TraceRetry()), + retry.WithBudget(c.config.RetryBudget()), ) return list, err @@ -265,6 +269,7 @@ func (c *Client) DescribeResource( retry.WithIdempotent(true), retry.WithStackTrace(), retry.WithTrace(c.config.TraceRetry()), + retry.WithBudget(c.config.RetryBudget()), ) return @@ -333,6 +338,7 @@ func (c *Client) AcquireResource( return retry.Retry(ctx, call, retry.WithStackTrace(), retry.WithTrace(c.config.TraceRetry()), + retry.WithBudget(c.config.RetryBudget()), ) } diff --git a/internal/scheme/client.go b/internal/scheme/client.go index 6f46ed736..154c44eed 100644 --- a/internal/scheme/client.go +++ b/internal/scheme/client.go @@ -22,7 +22,7 @@ import ( var errNilClient = xerrors.Wrap(errors.New("scheme client is not initialized")) type Client struct { - config config.Config + config *config.Config service Ydb_Scheme_V1.SchemeServiceClient } @@ -38,7 +38,7 @@ func (c *Client) Close(_ context.Context) error { return nil } -func New(ctx context.Context, cc grpc.ClientConnInterface, config config.Config) *Client { +func New(ctx context.Context, cc grpc.ClientConnInterface, config *config.Config) *Client { return &Client{ config: config, service: Ydb_Scheme_V1.NewSchemeServiceClient(cc), @@ -64,6 +64,7 @@ func (c *Client) MakeDirectory(ctx context.Context, path string) (finalErr error retry.WithStackTrace(), retry.WithIdempotent(true), retry.WithTrace(c.config.TraceRetry()), + retry.WithBudget(c.config.RetryBudget()), ) } @@ -103,6 +104,7 @@ func (c *Client) RemoveDirectory(ctx context.Context, path string) (finalErr err retry.WithStackTrace(), retry.WithIdempotent(true), retry.WithTrace(c.config.TraceRetry()), + retry.WithBudget(c.config.RetryBudget()), ) } @@ -144,6 +146,7 @@ func (c *Client) ListDirectory(ctx context.Context, path string) (d scheme.Direc retry.WithIdempotent(true), retry.WithStackTrace(), retry.WithTrace(c.config.TraceRetry()), + retry.WithBudget(c.config.RetryBudget()), ) return d, xerrors.WithStackTrace(err) @@ -207,6 +210,7 @@ func (c *Client) DescribePath(ctx context.Context, path string) (e scheme.Entry, retry.WithIdempotent(true), retry.WithStackTrace(), retry.WithTrace(c.config.TraceRetry()), + retry.WithBudget(c.config.RetryBudget()), ) return e, xerrors.WithStackTrace(err) @@ -268,6 +272,7 @@ func (c *Client) ModifyPermissions( retry.WithStackTrace(), retry.WithIdempotent(true), retry.WithTrace(c.config.TraceRetry()), + retry.WithBudget(c.config.RetryBudget()), ) } diff --git a/internal/scheme/config/config.go b/internal/scheme/config/config.go index a684fc83e..13865ffba 100644 --- a/internal/scheme/config/config.go +++ b/internal/scheme/config/config.go @@ -14,12 +14,12 @@ type Config struct { } // Trace returns trace over scheme client calls -func (c Config) Trace() *trace.Scheme { +func (c *Config) Trace() *trace.Scheme { return c.trace } // Database returns database name -func (c Config) Database() string { +func (c *Config) Database() string { return c.databaseName } @@ -46,13 +46,13 @@ func With(config config.Common) Option { } } -func New(opts ...Option) Config { - c := Config{ +func New(opts ...Option) *Config { + c := &Config{ trace: &trace.Scheme{}, } for _, opt := range opts { if opt != nil { - opt(&c) + opt(c) } } diff --git a/internal/scripting/client.go b/internal/scripting/client.go index a8ecc18bb..92b88b895 100644 --- a/internal/scripting/client.go +++ b/internal/scripting/client.go @@ -60,6 +60,7 @@ func (c *Client) Execute( err = retry.Retry(ctx, call, retry.WithStackTrace(), retry.WithTrace(c.config.TraceRetry()), + retry.WithBudget(c.config.RetryBudget()), ) return r, xerrors.WithStackTrace(err) @@ -139,6 +140,7 @@ func (c *Client) Explain( retry.WithStackTrace(), retry.WithIdempotent(true), retry.WithTrace(c.config.TraceRetry()), + retry.WithBudget(c.config.RetryBudget()), ) return e, xerrors.WithStackTrace(err) @@ -213,6 +215,7 @@ func (c *Client) StreamExecute( err = retry.Retry(ctx, call, retry.WithStackTrace(), retry.WithTrace(c.config.TraceRetry()), + retry.WithBudget(c.config.RetryBudget()), ) return r, xerrors.WithStackTrace(err) diff --git a/internal/table/retry.go b/internal/table/retry.go index d6577d7ad..e2b522b45 100644 --- a/internal/table/retry.go +++ b/internal/table/retry.go @@ -99,6 +99,7 @@ func (c *Client) retryOptions(opts ...table.Option) *table.Options { ), RetryOptions: []retry.Option{ retry.WithTrace(c.config.TraceRetry()), + retry.WithBudget(c.config.RetryBudget()), }, } for _, opt := range opts { diff --git a/internal/table/scanner/result_test.go b/internal/table/scanner/result_test.go index dc81b0203..b7619cc11 100644 --- a/internal/table/scanner/result_test.go +++ b/internal/table/scanner/result_test.go @@ -207,7 +207,7 @@ func NewResultSet(a *allocator.Allocator, opts ...ResultSetOption) *Ydb.ResultSe func TestNewStreamWithRecvFirstResultSet(t *testing.T) { for _, tt := range []struct { - ctx context.Context + ctx context.Context //nolint:containedctx recvCounter int err error }{ diff --git a/internal/topic/topicclientinternal/client.go b/internal/topic/topicclientinternal/client.go index 2318d97b1..4609df17a 100644 --- a/internal/topic/topicclientinternal/client.go +++ b/internal/topic/topicclientinternal/client.go @@ -87,6 +87,7 @@ func (c *Client) Alter(ctx context.Context, path string, opts ...topicoptions.Al return retry.Retry(ctx, call, retry.WithIdempotent(true), retry.WithTrace(c.cfg.TraceRetry()), + retry.WithBudget(c.cfg.RetryBudget()), ) } @@ -119,6 +120,7 @@ func (c *Client) Create( return retry.Retry(ctx, call, retry.WithIdempotent(true), retry.WithTrace(c.cfg.TraceRetry()), + retry.WithBudget(c.cfg.RetryBudget()), ) } @@ -156,6 +158,7 @@ func (c *Client) Describe( err = retry.Retry(ctx, call, retry.WithIdempotent(true), retry.WithTrace(c.cfg.TraceRetry()), + retry.WithBudget(c.cfg.RetryBudget()), ) } else { err = call(ctx) @@ -192,6 +195,7 @@ func (c *Client) Drop(ctx context.Context, path string, opts ...topicoptions.Dro return retry.Retry(ctx, call, retry.WithIdempotent(true), retry.WithTrace(c.cfg.TraceRetry()), + retry.WithBudget(c.cfg.RetryBudget()), ) } diff --git a/internal/topic/topicreaderinternal/partition_session.go b/internal/topic/topicreaderinternal/partition_session.go index 9e23c5495..781de1782 100644 --- a/internal/topic/topicreaderinternal/partition_session.go +++ b/internal/topic/topicreaderinternal/partition_session.go @@ -24,7 +24,7 @@ type partitionSession struct { readerID int64 connectionID string - ctx context.Context + ctx context.Context //nolint:containedctx ctxCancel context.CancelFunc partitionSessionID rawtopicreader.PartitionSessionID diff --git a/internal/topic/topicreaderinternal/reader.go b/internal/topic/topicreaderinternal/reader.go index b499221d1..bc2dc865c 100644 --- a/internal/topic/topicreaderinternal/reader.go +++ b/internal/topic/topicreaderinternal/reader.go @@ -102,7 +102,6 @@ func NewReader( cfg.OperationTimeout(), cfg.RetrySettings, cfg.Trace, - cfg.BaseContext, ), defaultBatchConfig: cfg.DefaultBatchConfig, tracer: cfg.Trace, diff --git a/internal/topic/topicreaderinternal/stream_reader_impl.go b/internal/topic/topicreaderinternal/stream_reader_impl.go index fec186ffc..8eb91e0ac 100644 --- a/internal/topic/topicreaderinternal/stream_reader_impl.go +++ b/internal/topic/topicreaderinternal/stream_reader_impl.go @@ -34,7 +34,7 @@ type partitionSessionID = rawtopicreader.PartitionSessionID type topicStreamReaderImpl struct { cfg topicStreamReaderConfig - ctx context.Context + ctx context.Context //nolint:containedctx cancel context.CancelFunc freeBytes chan int @@ -60,7 +60,7 @@ type topicStreamReaderImpl struct { type topicStreamReaderConfig struct { CommitterBatchTimeLag time.Duration CommitterBatchCounterTrigger int - BaseContext context.Context + BaseContext context.Context //nolint:containedctx BufferSizeProtoBytes int Cred credentials.Credentials CredUpdateInterval time.Duration @@ -179,7 +179,7 @@ func (r *topicStreamReaderImpl) ReadMessageBatch( ) (batch *PublicBatch, err error) { onDone := trace.TopicOnReaderReadMessages( r.cfg.Trace, - ctx, + &ctx, opts.MinCount, opts.MaxCount, r.getRestBufferBytes(), @@ -295,15 +295,18 @@ func (r *topicStreamReaderImpl) onStopPartitionSessionRequestFromBuffer( return err } - onDone := trace.TopicOnReaderPartitionReadStopResponse( - r.cfg.Trace, - r.readConnectionID, - session.Context(), - session.Topic, - session.PartitionID, - session.partitionSessionID.ToInt64(), - msg.CommittedOffset.ToInt64(), - msg.Graceful, + var ( + ctx = session.Context() + onDone = trace.TopicOnReaderPartitionReadStopResponse( + r.cfg.Trace, + r.readConnectionID, + &ctx, + session.Topic, + session.PartitionID, + session.partitionSessionID.ToInt64(), + msg.CommittedOffset.ToInt64(), + msg.Graceful, + ) ) defer func() { onDone(err) @@ -357,7 +360,7 @@ func (r *topicStreamReaderImpl) Commit(ctx context.Context, commitRange commitRa session := commitRange.partitionSession onDone := trace.TopicOnReaderCommit( r.cfg.Trace, - ctx, + &ctx, session.Topic, session.PartitionID, session.partitionSessionID.ToInt64(), @@ -768,13 +771,16 @@ func (r *topicStreamReaderImpl) onStartPartitionSessionRequestFromBuffer( return err } - onDone := trace.TopicOnReaderPartitionReadStartResponse( - r.cfg.Trace, - r.readConnectionID, - session.Context(), - session.Topic, - session.PartitionID, - session.partitionSessionID.ToInt64(), + var ( + ctx = session.Context() + onDone = trace.TopicOnReaderPartitionReadStartResponse( + r.cfg.Trace, + r.readConnectionID, + &ctx, + session.Topic, + session.PartitionID, + session.partitionSessionID.ToInt64(), + ) ) respMessage := &rawtopicreader.StartPartitionSessionResponse{ diff --git a/internal/topic/topicreaderinternal/stream_reader_impl_test.go b/internal/topic/topicreaderinternal/stream_reader_impl_test.go index a518c2058..22fb2997a 100644 --- a/internal/topic/topicreaderinternal/stream_reader_impl_test.go +++ b/internal/topic/topicreaderinternal/stream_reader_impl_test.go @@ -379,7 +379,7 @@ func TestStreamReaderImpl_OnPartitionCloseHandle(t *testing.T) { e.reader.cfg.Trace.OnReaderPartitionReadStopResponse = func(info trace.TopicReaderPartitionReadStopResponseStartInfo) func(doneInfo trace.TopicReaderPartitionReadStopResponseDoneInfo) { //nolint:lll expected := trace.TopicReaderPartitionReadStopResponseStartInfo{ ReaderConnectionID: e.reader.readConnectionID, - PartitionContext: e.partitionSession.ctx, + PartitionContext: &e.partitionSession.ctx, Topic: e.partitionSession.Topic, PartitionID: e.partitionSession.PartitionID, PartitionSessionID: e.partitionSession.partitionSessionID.ToInt64(), @@ -388,7 +388,7 @@ func TestStreamReaderImpl_OnPartitionCloseHandle(t *testing.T) { } require.Equal(t, expected, info) - require.NoError(t, info.PartitionContext.Err()) + require.NoError(t, (*info.PartitionContext).Err()) readMessagesCtxCancel() @@ -424,7 +424,7 @@ func TestStreamReaderImpl_OnPartitionCloseHandle(t *testing.T) { e.reader.cfg.Trace.OnReaderPartitionReadStopResponse = func(info trace.TopicReaderPartitionReadStopResponseStartInfo) func(doneInfo trace.TopicReaderPartitionReadStopResponseDoneInfo) { //nolint:lll expected := trace.TopicReaderPartitionReadStopResponseStartInfo{ ReaderConnectionID: e.reader.readConnectionID, - PartitionContext: e.partitionSession.ctx, + PartitionContext: &e.partitionSession.ctx, Topic: e.partitionSession.Topic, PartitionID: e.partitionSession.PartitionID, PartitionSessionID: e.partitionSession.partitionSessionID.ToInt64(), @@ -432,7 +432,7 @@ func TestStreamReaderImpl_OnPartitionCloseHandle(t *testing.T) { Graceful: false, } require.Equal(t, expected, info) - require.Error(t, info.PartitionContext.Err()) + require.Error(t, (*info.PartitionContext).Err()) readMessagesCtxCancel() @@ -954,7 +954,7 @@ func TestTopicStreamReadImpl_CommitWithBadSession(t *testing.T) { } type streamEnv struct { - ctx context.Context + ctx context.Context //nolint:containedctx t testing.TB reader *topicStreamReaderImpl stopReadEvents empty.Chan diff --git a/internal/topic/topicreaderinternal/stream_reconnector.go b/internal/topic/topicreaderinternal/stream_reconnector.go index ec601ba67..f43044624 100644 --- a/internal/topic/topicreaderinternal/stream_reconnector.go +++ b/internal/topic/topicreaderinternal/stream_reconnector.go @@ -31,7 +31,6 @@ type readerConnectFunc func(ctx context.Context) (batchedStreamReader, error) type readerReconnector struct { background background.Worker clock clockwork.Clock - baseContext context.Context retrySettings topic.RetrySettings streamVal batchedStreamReader streamErr error @@ -49,14 +48,12 @@ type readerReconnector struct { initDone bool } -//nolint:revive func newReaderReconnector( readerID int64, connector readerConnectFunc, connectTimeout time.Duration, retrySettings topic.RetrySettings, tracer *trace.Topic, - baseContext context.Context, ) *readerReconnector { res := &readerReconnector{ readerID: readerID, @@ -65,7 +62,6 @@ func newReaderReconnector( streamErr: errUnconnected, connectTimeout: connectTimeout, tracer: tracer, - baseContext: baseContext, retrySettings: retrySettings, } diff --git a/internal/topic/topicwriterinternal/writer_reconnector_test.go b/internal/topic/topicwriterinternal/writer_reconnector_test.go index 4bf18c6ff..68eae0466 100644 --- a/internal/topic/topicwriterinternal/writer_reconnector_test.go +++ b/internal/topic/topicwriterinternal/writer_reconnector_test.go @@ -821,7 +821,7 @@ func isClosed(ch <-chan struct{}) bool { } type testEnv struct { - ctx context.Context + ctx context.Context //nolint:containedctx stream *MockRawTopicWriterStream writer *WriterReconnector sendFromServerChannel chan sendFromServerResponse diff --git a/internal/version/version.go b/internal/version/version.go index 0238c9656..1a50462e0 100644 --- a/internal/version/version.go +++ b/internal/version/version.go @@ -2,8 +2,8 @@ package version const ( Major = "3" - Minor = "65" - Patch = "1" + Minor = "66" + Patch = "0" Prefix = "ydb-go-sdk" ) diff --git a/internal/wait/wait.go b/internal/wait/wait.go deleted file mode 100644 index 11a01dee6..000000000 --- a/internal/wait/wait.go +++ /dev/null @@ -1,52 +0,0 @@ -package wait - -import ( - "context" - "time" - - "github.com/ydb-platform/ydb-go-sdk/v3/internal/backoff" - "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" -) - -// waitBackoff is a helper function that waits for i-th Backoff b or ctx -// expiration. -// It returns non-nil error if and only if deadline expiration branch wins. -func waitBackoff(ctx context.Context, b backoff.Backoff, i int) error { - t := time.NewTimer(b.Delay(i)) - defer t.Stop() - - select { - case <-t.C: - return nil - case <-ctx.Done(): - if err := ctx.Err(); err != nil { - return xerrors.WithStackTrace(err) - } - - return nil - } -} - -func Wait(ctx context.Context, fastBackoff, slowBackoff backoff.Backoff, t backoff.Type, i int) error { - var b backoff.Backoff - switch t { - case backoff.TypeNoBackoff: - if err := ctx.Err(); err != nil { - return xerrors.WithStackTrace(err) - } - - return nil - case backoff.TypeFast: - if fastBackoff == nil { - fastBackoff = backoff.Fast - } - b = fastBackoff - case backoff.TypeSlow: - if slowBackoff == nil { - slowBackoff = backoff.Slow - } - b = slowBackoff - } - - return waitBackoff(ctx, b, i) -} diff --git a/internal/xcontext/context_with_cancel.go b/internal/xcontext/context_with_cancel.go index 1deeac4bb..f8cc68923 100644 --- a/internal/xcontext/context_with_cancel.go +++ b/internal/xcontext/context_with_cancel.go @@ -16,8 +16,8 @@ func WithCancel(ctx context.Context) (context.Context, context.CancelFunc) { } type cancelCtx struct { - parentCtx context.Context - ctx context.Context + parentCtx context.Context //nolint:containedctx + ctx context.Context //nolint:containedctx ctxCancel context.CancelFunc m sync.Mutex diff --git a/internal/xcontext/context_with_timeout.go b/internal/xcontext/context_with_timeout.go index 5342798fe..b120fbd85 100644 --- a/internal/xcontext/context_with_timeout.go +++ b/internal/xcontext/context_with_timeout.go @@ -2,6 +2,7 @@ package xcontext import ( "context" + "errors" "sync" "time" @@ -19,8 +20,8 @@ func WithTimeout(ctx context.Context, t time.Duration) (context.Context, context } type timeoutCtx struct { - parentCtx context.Context - ctx context.Context + parentCtx context.Context //nolint:containedctx + ctx context.Context //nolint:containedctx ctxCancel context.CancelFunc from string @@ -44,7 +45,7 @@ func (ctx *timeoutCtx) Err() error { return ctx.err } - if ctx.ctx.Err() == context.DeadlineExceeded && ctx.parentCtx.Err() == nil { //nolint:errorlint + if errors.Is(ctx.ctx.Err(), context.DeadlineExceeded) && ctx.parentCtx.Err() == nil { ctx.err = errFrom(context.DeadlineExceeded, ctx.from) return ctx.err diff --git a/internal/xcontext/without_deadline.go b/internal/xcontext/without_deadline.go index 2e80a284f..4e14655f9 100644 --- a/internal/xcontext/without_deadline.go +++ b/internal/xcontext/without_deadline.go @@ -5,7 +5,9 @@ import ( "time" ) -type valueOnlyContext struct{ context.Context } +type valueOnlyContext struct { + context.Context //nolint:containedctx +} func (valueOnlyContext) Deadline() (deadline time.Time, ok bool) { return } diff --git a/internal/xerrors/join.go b/internal/xerrors/join.go index 80ec421c1..a1e33a054 100644 --- a/internal/xerrors/join.go +++ b/internal/xerrors/join.go @@ -6,8 +6,8 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/internal/xstring" ) -func Join(errs ...error) joinError { - return joinError{ +func Join(errs ...error) *joinError { + return &joinError{ errs: errs, } } @@ -16,7 +16,7 @@ type joinError struct { errs []error } -func (e joinError) Error() string { +func (e *joinError) Error() string { b := xstring.Buffer() defer b.Free() b.WriteByte('[') @@ -31,7 +31,7 @@ func (e joinError) Error() string { return b.String() } -func (e joinError) As(target interface{}) bool { +func (e *joinError) As(target interface{}) bool { for _, err := range e.errs { if As(err, target) { return true @@ -41,7 +41,7 @@ func (e joinError) As(target interface{}) bool { return false } -func (e joinError) Is(target error) bool { +func (e *joinError) Is(target error) bool { for _, err := range e.errs { if Is(err, target) { return true @@ -51,6 +51,6 @@ func (e joinError) Is(target error) bool { return false } -func (e joinError) Unwrap() []error { +func (e *joinError) Unwrap() []error { return e.errs } diff --git a/internal/xerrors/join_test.go b/internal/xerrors/join_test.go index a9ff7e297..9af1d4dfd 100644 --- a/internal/xerrors/join_test.go +++ b/internal/xerrors/join_test.go @@ -10,7 +10,7 @@ import ( func TestJoin(t *testing.T) { for _, tt := range []struct { - err joinError + err *joinError iss []error ass []interface{} s string diff --git a/internal/xsql/conn.go b/internal/xsql/conn.go index c5d670517..42d5517e1 100644 --- a/internal/xsql/conn.go +++ b/internal/xsql/conn.go @@ -68,7 +68,7 @@ func withTrace(t *trace.DatabaseSQL) connOption { type beginTxFunc func(ctx context.Context, txOptions driver.TxOptions) (currentTx, error) type conn struct { - openConnCtx context.Context + ctx context.Context //nolint:containedctx connector *Connector trace *trace.DatabaseSQL @@ -129,9 +129,9 @@ var ( func newConn(ctx context.Context, c *Connector, s table.ClosableSession, opts ...connOption) *conn { cc := &conn{ - openConnCtx: ctx, - connector: c, - session: s, + ctx: ctx, + connector: c, + session: s, } cc.beginTxFuncs = map[QueryMode]beginTxFunc{ DataQueryMode: cc.beginTx, @@ -169,7 +169,7 @@ func (c *conn) PrepareContext(ctx context.Context, query string) (_ driver.Stmt, return &stmt{ conn: c, processor: c, - stmtCtx: ctx, + ctx: ctx, query: query, trace: c.trace, }, nil @@ -414,9 +414,12 @@ func (c *conn) Ping(ctx context.Context) (finalErr error) { func (c *conn) Close() (finalErr error) { if c.closed.CompareAndSwap(false, true) { c.connector.detach(c) - onDone := trace.DatabaseSQLOnConnClose( - c.trace, &c.openConnCtx, - stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/xsql.(*conn).Close"), + var ( + ctx = c.ctx + onDone = trace.DatabaseSQLOnConnClose( + c.trace, &ctx, + stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/xsql.(*conn).Close"), + ) ) defer func() { onDone(finalErr) @@ -424,7 +427,7 @@ func (c *conn) Close() (finalErr error) { if c.currentTx != nil { _ = c.currentTx.Rollback() } - err := c.session.Close(xcontext.ValueOnly(c.openConnCtx)) + err := c.session.Close(xcontext.ValueOnly(ctx)) if err != nil { return badconn.Map(xerrors.WithStackTrace(err)) } @@ -540,7 +543,7 @@ func (c *conn) IsColumnExists(ctx context.Context, tableName, columnName string) return false, xerrors.WithStackTrace(fmt.Errorf("table '%s' not exist", tableName)) } - err = retry.Retry(ctx, func(ctx context.Context) (err error) { + err = c.retryIdempotent(ctx, func(ctx context.Context) (err error) { desc, err := c.session.DescribeTable(ctx, tableName) if err != nil { return err @@ -554,7 +557,7 @@ func (c *conn) IsColumnExists(ctx context.Context, tableName, columnName string) } return nil - }, retry.WithIdempotent(true)) + }) if err != nil { return false, xerrors.WithStackTrace(err) } @@ -575,7 +578,7 @@ func (c *conn) GetColumns(ctx context.Context, tableName string) (columns []stri return nil, xerrors.WithStackTrace(fmt.Errorf("table '%s' not exist", tableName)) } - err = retry.Retry(ctx, func(ctx context.Context) (err error) { + err = c.retryIdempotent(ctx, func(ctx context.Context) (err error) { desc, err := c.session.DescribeTable(ctx, tableName) if err != nil { return err @@ -585,7 +588,7 @@ func (c *conn) GetColumns(ctx context.Context, tableName string) (columns []stri } return nil - }, retry.WithIdempotent(true)) + }) if err != nil { return nil, xerrors.WithStackTrace(err) } @@ -614,7 +617,7 @@ func (c *conn) GetColumnType(ctx context.Context, tableName, columnName string) return "", xerrors.WithStackTrace(fmt.Errorf("column '%s' not exist in table '%s'", columnName, tableName)) } - err = retry.Retry(ctx, func(ctx context.Context) (err error) { + err = c.retryIdempotent(ctx, func(ctx context.Context) (err error) { desc, err := c.session.DescribeTable(ctx, tableName) if err != nil { return err @@ -628,7 +631,7 @@ func (c *conn) GetColumnType(ctx context.Context, tableName, columnName string) } return nil - }, retry.WithIdempotent(true)) + }) if err != nil { return "", xerrors.WithStackTrace(err) } @@ -649,7 +652,7 @@ func (c *conn) GetPrimaryKeys(ctx context.Context, tableName string) (pkCols []s return nil, xerrors.WithStackTrace(fmt.Errorf("table '%s' not exist", tableName)) } - err = retry.Retry(ctx, func(ctx context.Context) (err error) { + err = c.retryIdempotent(ctx, func(ctx context.Context) (err error) { desc, err := c.session.DescribeTable(ctx, tableName) if err != nil { return err @@ -657,7 +660,7 @@ func (c *conn) GetPrimaryKeys(ctx context.Context, tableName string) (pkCols []s pkCols = append(pkCols, desc.PrimaryKey...) return nil - }, retry.WithIdempotent(true)) + }) if err != nil { return nil, xerrors.WithStackTrace(err) } @@ -726,11 +729,11 @@ func (c *conn) getTables(ctx context.Context, absPath string, recursive, exclude } var d scheme.Directory - err := retry.Retry(ctx, func(ctx context.Context) (err error) { + err := c.retryIdempotent(ctx, func(ctx context.Context) (err error) { d, err = c.connector.parent.Scheme().ListDirectory(ctx, absPath) return err - }, retry.WithIdempotent(true)) + }) if err != nil { return nil, xerrors.WithStackTrace(err) } @@ -763,11 +766,11 @@ func (c *conn) GetTables(ctx context.Context, folder string, recursive, excludeS absPath := c.normalizePath(folder) var e scheme.Entry - err := retry.Retry(ctx, func(ctx context.Context) (err error) { + err := c.retryIdempotent(ctx, func(ctx context.Context) (err error) { e, err = c.connector.parent.Scheme().DescribePath(ctx, absPath) return err - }, retry.WithIdempotent(true)) + }) if err != nil { return nil, xerrors.WithStackTrace(err) } @@ -806,7 +809,7 @@ func (c *conn) GetIndexes(ctx context.Context, tableName string) (indexes []stri return nil, xerrors.WithStackTrace(fmt.Errorf("table '%s' not exist", tableName)) } - err = retry.Retry(ctx, func(ctx context.Context) (err error) { + err = c.retryIdempotent(ctx, func(ctx context.Context) (err error) { desc, err := c.session.DescribeTable(ctx, tableName) if err != nil { return err @@ -816,7 +819,7 @@ func (c *conn) GetIndexes(ctx context.Context, tableName string) (indexes []stri } return nil - }, retry.WithIdempotent(true)) + }) if err != nil { return nil, xerrors.WithStackTrace(err) } @@ -824,6 +827,19 @@ func (c *conn) GetIndexes(ctx context.Context, tableName string) (indexes []stri return indexes, nil } +func (c *conn) retryIdempotent(ctx context.Context, f func(ctx context.Context) error) error { + err := retry.Retry(ctx, f, + retry.WithIdempotent(true), + retry.WithTrace(c.connector.traceRetry), + retry.WithBudget(c.connector.retryBudget), + ) + if err != nil { + return xerrors.WithStackTrace(err) + } + + return nil +} + func (c *conn) GetIndexColumns(ctx context.Context, tableName, indexName string) (columns []string, _ error) { tableName = c.normalizePath(tableName) tableExists, err := helpers.IsEntryExists(ctx, @@ -837,7 +853,7 @@ func (c *conn) GetIndexColumns(ctx context.Context, tableName, indexName string) return nil, xerrors.WithStackTrace(fmt.Errorf("table '%s' not exist", tableName)) } - err = retry.Retry(ctx, func(ctx context.Context) (err error) { + err = c.retryIdempotent(ctx, func(ctx context.Context) (err error) { desc, err := c.session.DescribeTable(ctx, tableName) if err != nil { return err @@ -851,7 +867,7 @@ func (c *conn) GetIndexColumns(ctx context.Context, tableName, indexName string) } return xerrors.WithStackTrace(fmt.Errorf("index '%s' not found in table '%s'", indexName, tableName)) - }, retry.WithIdempotent(true)) + }) if err != nil { return nil, xerrors.WithStackTrace(err) } diff --git a/internal/xsql/connector.go b/internal/xsql/connector.go index 8009f66d1..f097c6954 100644 --- a/internal/xsql/connector.go +++ b/internal/xsql/connector.go @@ -15,6 +15,7 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" "github.com/ydb-platform/ydb-go-sdk/v3/meta" + "github.com/ydb-platform/ydb-go-sdk/v3/retry/budget" "github.com/ydb-platform/ydb-go-sdk/v3/scheme" "github.com/ydb-platform/ydb-go-sdk/v3/scripting" "github.com/ydb-platform/ydb-go-sdk/v3/table" @@ -175,6 +176,20 @@ func WithTraceRetry(t *trace.Retry) ConnectorOption { return traceRetryConnectorOption{t: t} } +type retryBudgetConnectorOption struct { + b budget.Budget +} + +func (l retryBudgetConnectorOption) Apply(c *Connector) error { + c.retryBudget = l.b + + return nil +} + +func WithretryBudget(b budget.Budget) ConnectorOption { + return retryBudgetConnectorOption{b: b} +} + type fakeTxConnectorOption QueryMode func (m fakeTxConnectorOption) Apply(c *Connector) error { @@ -248,8 +263,9 @@ type Connector struct { disableServerBalancer bool idleThreshold time.Duration - trace *trace.DatabaseSQL - traceRetry *trace.Retry + trace *trace.DatabaseSQL + traceRetry *trace.Retry + retryBudget budget.Budget } var ( @@ -353,6 +369,10 @@ func (d *driverWrapper) TraceRetry() *trace.Retry { return d.c.traceRetry } +func (d *driverWrapper) RetryBudget() budget.Budget { + return d.c.retryBudget +} + func (d *driverWrapper) Open(_ string) (driver.Conn, error) { return nil, ErrUnsupported } diff --git a/internal/xsql/stmt.go b/internal/xsql/stmt.go index 75b571acd..6909e55ef 100644 --- a/internal/xsql/stmt.go +++ b/internal/xsql/stmt.go @@ -17,8 +17,8 @@ type stmt struct { driver.ExecerContext driver.QueryerContext } - query string - stmtCtx context.Context + query string + ctx context.Context //nolint:containedctx trace *trace.DatabaseSQL } @@ -29,51 +29,54 @@ var ( _ driver.StmtExecContext = &stmt{} ) -func (s *stmt) QueryContext(ctx context.Context, args []driver.NamedValue) (_ driver.Rows, finalErr error) { - onDone := trace.DatabaseSQLOnStmtQuery(s.trace, &ctx, +func (stmt *stmt) QueryContext(ctx context.Context, args []driver.NamedValue) (_ driver.Rows, finalErr error) { + onDone := trace.DatabaseSQLOnStmtQuery(stmt.trace, &ctx, stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/xsql.(*stmt).QueryContext"), - s.stmtCtx, s.query, + stmt.ctx, stmt.query, ) defer func() { onDone(finalErr) }() - if !s.conn.isReady() { + if !stmt.conn.isReady() { return nil, badconn.Map(xerrors.WithStackTrace(errNotReadyConn)) } - switch m := queryModeFromContext(ctx, s.conn.defaultQueryMode); m { + switch m := queryModeFromContext(ctx, stmt.conn.defaultQueryMode); m { case DataQueryMode: - return s.processor.QueryContext(s.conn.withKeepInCache(ctx), s.query, args) + return stmt.processor.QueryContext(stmt.conn.withKeepInCache(ctx), stmt.query, args) default: return nil, fmt.Errorf("unsupported query mode '%s' for execute query on prepared statement", m) } } -func (s *stmt) ExecContext(ctx context.Context, args []driver.NamedValue) (_ driver.Result, finalErr error) { - onDone := trace.DatabaseSQLOnStmtExec(s.trace, &ctx, +func (stmt *stmt) ExecContext(ctx context.Context, args []driver.NamedValue) (_ driver.Result, finalErr error) { + onDone := trace.DatabaseSQLOnStmtExec(stmt.trace, &ctx, stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/xsql.(*stmt).ExecContext"), - s.stmtCtx, s.query, + stmt.ctx, stmt.query, ) defer func() { onDone(finalErr) }() - if !s.conn.isReady() { + if !stmt.conn.isReady() { return nil, badconn.Map(xerrors.WithStackTrace(errNotReadyConn)) } - switch m := queryModeFromContext(ctx, s.conn.defaultQueryMode); m { + switch m := queryModeFromContext(ctx, stmt.conn.defaultQueryMode); m { case DataQueryMode: - return s.processor.ExecContext(s.conn.withKeepInCache(ctx), s.query, args) + return stmt.processor.ExecContext(stmt.conn.withKeepInCache(ctx), stmt.query, args) default: return nil, fmt.Errorf("unsupported query mode '%s' for execute query on prepared statement", m) } } -func (s *stmt) NumInput() int { +func (stmt *stmt) NumInput() int { return -1 } -func (s *stmt) Close() (finalErr error) { - onDone := trace.DatabaseSQLOnStmtClose(s.trace, &s.stmtCtx, - stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/xsql.(*stmt).Close"), +func (stmt *stmt) Close() (finalErr error) { + var ( + ctx = stmt.ctx + onDone = trace.DatabaseSQLOnStmtClose(stmt.trace, &ctx, + stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/xsql.(*stmt).Close"), + ) ) defer func() { onDone(finalErr) @@ -82,10 +85,10 @@ func (s *stmt) Close() (finalErr error) { return nil } -func (s *stmt) Exec([]driver.Value) (driver.Result, error) { +func (stmt *stmt) Exec([]driver.Value) (driver.Result, error) { return nil, errDeprecated } -func (s *stmt) Query([]driver.Value) (driver.Rows, error) { +func (stmt *stmt) Query([]driver.Value) (driver.Rows, error) { return nil, errDeprecated } diff --git a/internal/xsql/tx.go b/internal/xsql/tx.go index d67813437..2a58e2728 100644 --- a/internal/xsql/tx.go +++ b/internal/xsql/tx.go @@ -14,9 +14,9 @@ import ( ) type tx struct { - conn *conn - txCtx context.Context - tx table.Transaction + conn *conn + ctx context.Context //nolint:containedctx + tx table.Transaction } var ( @@ -45,9 +45,9 @@ func (c *conn) beginTx(ctx context.Context, txOptions driver.TxOptions) (current return nil, badconn.Map(xerrors.WithStackTrace(err)) } c.currentTx = &tx{ - conn: c, - txCtx: ctx, - tx: transaction, + conn: c, + ctx: ctx, + tx: transaction, } return c.currentTx, nil @@ -73,9 +73,12 @@ func (tx *tx) checkTxState() error { } func (tx *tx) Commit() (finalErr error) { - onDone := trace.DatabaseSQLOnTxCommit(tx.conn.trace, &tx.txCtx, - stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/xsql.(*tx).Commit"), - tx, + var ( + ctx = tx.ctx + onDone = trace.DatabaseSQLOnTxCommit(tx.conn.trace, &ctx, + stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/xsql.(*tx).Commit"), + tx, + ) ) defer func() { onDone(finalErr) @@ -86,7 +89,7 @@ func (tx *tx) Commit() (finalErr error) { defer func() { tx.conn.currentTx = nil }() - _, err := tx.tx.CommitTx(tx.txCtx) + _, err := tx.tx.CommitTx(tx.ctx) if err != nil { return badconn.Map(xerrors.WithStackTrace(err)) } @@ -95,9 +98,12 @@ func (tx *tx) Commit() (finalErr error) { } func (tx *tx) Rollback() (finalErr error) { - onDone := trace.DatabaseSQLOnTxRollback(tx.conn.trace, &tx.txCtx, - stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/xsql.(*tx).Rollback"), - tx, + var ( + ctx = tx.ctx + onDone = trace.DatabaseSQLOnTxRollback(tx.conn.trace, &ctx, + stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/xsql.(*tx).Rollback"), + tx, + ) ) defer func() { onDone(finalErr) @@ -108,7 +114,7 @@ func (tx *tx) Rollback() (finalErr error) { defer func() { tx.conn.currentTx = nil }() - err := tx.tx.Rollback(tx.txCtx) + err := tx.tx.Rollback(tx.ctx) if err != nil { return badconn.Map(xerrors.WithStackTrace(err)) } @@ -121,7 +127,7 @@ func (tx *tx) QueryContext(ctx context.Context, query string, args []driver.Name ) { onDone := trace.DatabaseSQLOnTxQuery(tx.conn.trace, &ctx, stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/xsql.(*tx).QueryContext"), - tx.txCtx, tx, query, + tx.ctx, tx, query, ) defer func() { onDone(finalErr) @@ -163,7 +169,7 @@ func (tx *tx) ExecContext(ctx context.Context, query string, args []driver.Named ) { onDone := trace.DatabaseSQLOnTxExec(tx.conn.trace, &ctx, stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/xsql.(*tx).ExecContext"), - tx.txCtx, tx, query, + tx.ctx, tx, query, ) defer func() { onDone(finalErr) @@ -197,7 +203,7 @@ func (tx *tx) ExecContext(ctx context.Context, query string, args []driver.Named func (tx *tx) PrepareContext(ctx context.Context, query string) (_ driver.Stmt, finalErr error) { onDone := trace.DatabaseSQLOnTxPrepare(tx.conn.trace, &ctx, stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/xsql.(*tx).PrepareContext"), - &tx.txCtx, tx, query, + tx.ctx, tx, query, ) defer func() { onDone(finalErr) @@ -209,7 +215,7 @@ func (tx *tx) PrepareContext(ctx context.Context, query string) (_ driver.Stmt, return &stmt{ conn: tx.conn, processor: tx, - stmtCtx: ctx, + ctx: ctx, query: query, trace: tx.conn.trace, }, nil diff --git a/internal/xsql/tx_fake.go b/internal/xsql/tx_fake.go index 459aba718..0fc3efd27 100644 --- a/internal/xsql/tx_fake.go +++ b/internal/xsql/tx_fake.go @@ -12,15 +12,15 @@ import ( ) type txFake struct { - beginCtx context.Context + beginCtx context.Context //nolint:containedctx conn *conn - ctx context.Context + ctx context.Context //nolint:containedctx } func (tx *txFake) PrepareContext(ctx context.Context, query string) (_ driver.Stmt, finalErr error) { onDone := trace.DatabaseSQLOnTxPrepare(tx.conn.trace, &ctx, stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/xsql.(*txFake).PrepareContext"), - &tx.beginCtx, tx, query, + tx.beginCtx, tx, query, ) defer func() { onDone(finalErr) @@ -32,7 +32,7 @@ func (tx *txFake) PrepareContext(ctx context.Context, query string) (_ driver.St return &stmt{ conn: tx.conn, processor: tx, - stmtCtx: ctx, + ctx: ctx, query: query, trace: tx.conn.trace, }, nil @@ -57,9 +57,12 @@ func (tx *txFake) ID() string { } func (tx *txFake) Commit() (err error) { - onDone := trace.DatabaseSQLOnTxCommit(tx.conn.trace, &tx.ctx, - stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/xsql.(*txFake).Commit"), - tx, + var ( + ctx = tx.ctx + onDone = trace.DatabaseSQLOnTxCommit(tx.conn.trace, &ctx, + stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/xsql.(*txFake).Commit"), + tx, + ) ) defer func() { onDone(err) @@ -75,9 +78,12 @@ func (tx *txFake) Commit() (err error) { } func (tx *txFake) Rollback() (err error) { - onDone := trace.DatabaseSQLOnTxRollback(tx.conn.trace, &tx.ctx, - stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/xsql.(*txFake).Rollback"), - tx, + var ( + ctx = tx.ctx + onDone = trace.DatabaseSQLOnTxRollback(tx.conn.trace, &ctx, + stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/xsql.(*txFake).Rollback"), + tx, + ) ) defer func() { onDone(err) diff --git a/log/context.go b/log/context.go index 81231df01..b724e5ec3 100644 --- a/log/context.go +++ b/log/context.go @@ -1,6 +1,8 @@ package log -import "context" +import ( + "context" +) type ( ctxLevelKey struct{} @@ -18,7 +20,11 @@ func LevelFromContext(ctx context.Context) Level { } func WithNames(ctx context.Context, names ...string) context.Context { - return context.WithValue(ctx, ctxNamesKey{}, append(NamesFromContext(ctx), names...)) + // trim capacity for force allocate new memory while append and prevent data race + oldNames := NamesFromContext(ctx) + oldNames = oldNames[:len(oldNames):len(oldNames)] + + return context.WithValue(ctx, ctxNamesKey{}, append(oldNames, names...)) } func NamesFromContext(ctx context.Context) []string { @@ -27,7 +33,7 @@ func NamesFromContext(ctx context.Context) []string { return []string{} } - return v + return v[:len(v):len(v)] // prevent re } func with(ctx context.Context, lvl Level, names ...string) context.Context { diff --git a/log/context_test.go b/log/context_test.go index 3d8c05067..0f71d9b38 100644 --- a/log/context_test.go +++ b/log/context_test.go @@ -2,14 +2,18 @@ package log import ( "context" + "strconv" "testing" + "time" "github.com/stretchr/testify/require" + + "github.com/ydb-platform/ydb-go-sdk/v3/internal/xtest" ) func TestLevelFromContext(t *testing.T) { for _, tt := range []struct { - ctx context.Context + ctx context.Context //nolint:containedctx lvl Level }{ { @@ -33,7 +37,7 @@ func TestLevelFromContext(t *testing.T) { func TestNamesFromContext(t *testing.T) { for _, tt := range []struct { - ctx context.Context + ctx context.Context //nolint:containedctx names []string }{ { @@ -54,3 +58,35 @@ func TestNamesFromContext(t *testing.T) { }) } } + +func TestWithNamesRaceRegression(t *testing.T) { + count := 100 + xtest.TestManyTimes(t, func(t testing.TB) { + ctx := WithNames(context.Background(), "test") + ctx = WithNames(ctx, "test") + ctx = WithNames(ctx, "test") + res := make([]context.Context, count) + + start := make(chan bool) + finished := make(chan bool) + for i := 0; i < count; i++ { + go func(index int) { + <-start + res[index] = WithNames(ctx, strconv.Itoa(index)) + finished <- true + }(i) + } + + time.Sleep(time.Microsecond) + close(start) + + for i := 0; i < count; i++ { + <-finished + } + + for i := 0; i < count; i++ { + expected := []string{"test", "test", "test", strconv.Itoa(i)} + require.Equal(t, expected, NamesFromContext(res[i])) + } + }) +} diff --git a/options.go b/options.go index 305f700fc..8dcce2969 100644 --- a/options.go +++ b/options.go @@ -25,6 +25,7 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xsql" "github.com/ydb-platform/ydb-go-sdk/v3/log" + "github.com/ydb-platform/ydb-go-sdk/v3/retry/budget" "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions" "github.com/ydb-platform/ydb-go-sdk/v3/trace" ) @@ -294,6 +295,17 @@ func WithDiscoveryInterval(discoveryInterval time.Duration) Option { } } +// WithRetryBudget sets retry budget for all calls of all retryers. +// +// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental +func WithRetryBudget(b budget.Budget) Option { + return func(ctx context.Context, c *Driver) error { + c.options = append(c.options, config.WithRetryBudget(b)) + + return nil + } +} + // WithTraceDriver appends trace.Driver into driver traces func WithTraceDriver(t trace.Driver, opts ...trace.DriverComposeOption) Option { //nolint:gocritic return func(ctx context.Context, c *Driver) error { diff --git a/query/client.go b/query/client.go index 87fedc52b..e53bda415 100644 --- a/query/client.go +++ b/query/client.go @@ -5,6 +5,7 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/internal/closer" "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/options" + "github.com/ydb-platform/ydb-go-sdk/v3/retry/budget" "github.com/ydb-platform/ydb-go-sdk/v3/trace" ) @@ -66,3 +67,8 @@ func WithTrace(t *trace.Query) bothDoAndDoTxOption { func WithLabel(lbl string) bothDoAndDoTxOption { return options.WithLabel(lbl) } + +// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental +func WithRetryBudget(b budget.Budget) bothDoAndDoTxOption { + return options.WithRetryBudget(b) +} diff --git a/retry/budget/budget.go b/retry/budget/budget.go new file mode 100644 index 000000000..2f2625178 --- /dev/null +++ b/retry/budget/budget.go @@ -0,0 +1,118 @@ +package budget + +import ( + "context" + "fmt" + "time" + + "github.com/jonboulle/clockwork" + + "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/xrand" +) + +type ( + // Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental + Budget interface { + // Acquire will called on second and subsequent retry attempts + Acquire(ctx context.Context) error + } + fixedBudget struct { + clock clockwork.Clock + ticker clockwork.Ticker + quota chan struct{} + done chan struct{} + } + fixedBudgetOption func(q *fixedBudget) + percentBudget struct { + percent int + rand xrand.Rand + } +) + +func withFixedBudgetClock(clock clockwork.Clock) fixedBudgetOption { + return func(q *fixedBudget) { + q.clock = clock + } +} + +// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental +func Limited(attemptsPerSecond int, opts ...fixedBudgetOption) *fixedBudget { + q := &fixedBudget{ + clock: clockwork.NewRealClock(), + done: make(chan struct{}), + } + for _, opt := range opts { + opt(q) + } + if attemptsPerSecond <= 0 { + q.quota = make(chan struct{}) + close(q.quota) + } else { + q.quota = make(chan struct{}, attemptsPerSecond) + for range make([]struct{}, attemptsPerSecond) { + q.quota <- struct{}{} + } + q.ticker = q.clock.NewTicker(time.Second / time.Duration(attemptsPerSecond)) + go func() { + defer close(q.quota) + for { + select { + case <-q.ticker.Chan(): + select { + case q.quota <- struct{}{}: + case <-q.done: + return + } + case <-q.done: + return + } + } + }() + } + + return q +} + +// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental +func (q *fixedBudget) Stop() { + if q.ticker != nil { + q.ticker.Stop() + } + close(q.done) +} + +// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental +func (q *fixedBudget) Acquire(ctx context.Context) error { + if err := ctx.Err(); err != nil { + return xerrors.WithStackTrace(err) + } + select { + case <-q.done: + return xerrors.WithStackTrace(errClosedBudget) + case <-q.quota: + return nil + case <-ctx.Done(): + return xerrors.WithStackTrace(ctx.Err()) + } +} + +// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental +func Percent(percent int) *percentBudget { + if percent > 100 || percent < 0 { + panic(fmt.Sprintf("wrong percent value: %d", percent)) + } + + return &percentBudget{ + percent: percent, + rand: xrand.New(xrand.WithLock()), + } +} + +func (b *percentBudget) Acquire(ctx context.Context) error { + if b.rand.Int(100) < b.percent { //nolint:gomnd + return nil + } + + return ErrNoQuota +} diff --git a/retry/budget/budget_test.go b/retry/budget/budget_test.go new file mode 100644 index 000000000..5850a3ee7 --- /dev/null +++ b/retry/budget/budget_test.go @@ -0,0 +1,72 @@ +package budget + +import ( + "context" + "testing" + "time" + + "github.com/jonboulle/clockwork" + "github.com/stretchr/testify/require" + + "github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/xtest" +) + +func TestUnlimitedBudget(t *testing.T) { + xtest.TestManyTimes(t, func(t testing.TB) { + ctx, cancel := xcontext.WithCancel(xtest.Context(t)) + q := Limited(-1) + require.NoError(t, q.Acquire(ctx)) + cancel() + require.ErrorIs(t, q.Acquire(ctx), context.Canceled) + }) +} + +func TestLimited(t *testing.T) { + xtest.TestManyTimes(t, func(t testing.TB) { + ctx, cancel := xcontext.WithCancel(xtest.Context(t)) + clock := clockwork.NewFakeClock() + q := Limited(1, withFixedBudgetClock(clock)) + defer q.Stop() + require.NoError(t, q.Acquire(ctx)) + acquireCh := make(chan struct{}) + go func() { + err := q.Acquire(ctx) + acquireCh <- struct{}{} + require.NoError(t, err) + }() + timeCh := make(chan struct{}) + go func() { + clock.Advance(time.Second - time.Nanosecond) + timeCh <- struct{}{} + clock.Advance(time.Nanosecond) + }() + select { + case <-acquireCh: + require.Fail(t, "") + case <-timeCh: + } + <-acquireCh + cancel() + require.ErrorIs(t, q.Acquire(ctx), context.Canceled) + }) +} + +func TestPercent(t *testing.T) { + xtest.TestManyTimes(t, func(t testing.TB) { + var ( + total = 1000000 + percent = 0.25 + ctx = xtest.Context(t) + b = Percent(int(percent * 100)) + success int + ) + for i := 0; i < total; i++ { + if b.Acquire(ctx) == nil { + success++ + } + } + require.GreaterOrEqual(t, success, int(float64(total)*(percent-0.1*percent))) + require.LessOrEqual(t, success, int(float64(total)*(percent+0.1*percent))) + }, xtest.StopAfter(5*time.Second)) +} diff --git a/retry/budget/errors.go b/retry/budget/errors.go new file mode 100644 index 000000000..0eeb206f5 --- /dev/null +++ b/retry/budget/errors.go @@ -0,0 +1,16 @@ +package budget + +import ( + "errors" + + "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" +) + +var ( + // ErrNoQuota is a special error for no quota provided by external retry budget + // + // Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental + ErrNoQuota = xerrors.Wrap(errors.New("no retry quota")) + + errClosedBudget = xerrors.Wrap(errors.New("retry budget closed")) +) diff --git a/retry/retry.go b/retry/retry.go index 43a112692..c2e601603 100644 --- a/retry/retry.go +++ b/retry/retry.go @@ -3,12 +3,13 @@ package retry import ( "context" "fmt" + "time" "github.com/ydb-platform/ydb-go-sdk/v3/internal/backoff" "github.com/ydb-platform/ydb-go-sdk/v3/internal/stack" - "github.com/ydb-platform/ydb-go-sdk/v3/internal/wait" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" + "github.com/ydb-platform/ydb-go-sdk/v3/retry/budget" "github.com/ydb-platform/ydb-go-sdk/v3/trace" ) @@ -25,6 +26,7 @@ type retryOptions struct { stackTrace bool fastBackoff backoff.Backoff slowBackoff backoff.Backoff + budget budget.Budget panicCallback func(e interface{}) } @@ -124,6 +126,31 @@ func WithTrace(t *trace.Retry) traceOption { return traceOption{t: t} } +var _ Option = budgetOption{} + +type budgetOption struct { + b budget.Budget +} + +func (b budgetOption) ApplyRetryOption(opts *retryOptions) { + opts.budget = b.b +} + +func (b budgetOption) ApplyDoOption(opts *doOptions) { + opts.retryOptions = append(opts.retryOptions, WithBudget(b.b)) +} + +func (b budgetOption) ApplyDoTxOption(opts *doTxOptions) { + opts.retryOptions = append(opts.retryOptions, WithBudget(b.b)) +} + +// WithBudget returns budget option +// +// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental +func WithBudget(b budget.Budget) budgetOption { + return budgetOption{b: b} +} + var _ Option = idempotentOption(false) type idempotentOption bool @@ -236,6 +263,7 @@ func Retry(ctx context.Context, op retryOperation, opts ...Option) (finalErr err options := &retryOptions{ call: stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/retry.Retry"), trace: &trace.Retry{}, + budget: budget.Limited(-1), fastBackoff: backoff.Fast, slowBackoff: backoff.Slow, } @@ -284,21 +312,14 @@ func Retry(ctx context.Context, op retryOperation, opts ...Option) (finalErr err return nil } - if ctxErr := ctx.Err(); ctxErr != nil { - return xerrors.WithStackTrace( - xerrors.Join( - fmt.Errorf("context error occurred on attempt No.%d", attempts), - ctxErr, err, - ), - ) - } - m := Check(err) if m.StatusCode() != code { i = 0 } + code = m.StatusCode() + if !m.MustRetry(options.idempotent) { return xerrors.WithStackTrace( fmt.Errorf("non-retryable error occurred on attempt No.%d (idempotent=%v): %w", @@ -306,14 +327,34 @@ func Retry(ctx context.Context, op retryOperation, opts ...Option) (finalErr err ) } - if e := wait.Wait(ctx, options.fastBackoff, options.slowBackoff, m.BackoffType(), i); e != nil { + t := time.NewTimer(backoff.Delay(m.BackoffType(), i, + backoff.WithFastBackoff(options.fastBackoff), + backoff.WithSlowBackoff(options.slowBackoff), + )) + + select { + case <-ctx.Done(): + t.Stop() + return xerrors.WithStackTrace( xerrors.Join( - fmt.Errorf("wait exit on attempt No.%d", attempts), e, err), + fmt.Errorf("attempt No.%d: %w", attempts, ctx.Err()), + err, + ), ) + case <-t.C: + t.Stop() + + if acquireErr := options.budget.Acquire(ctx); acquireErr != nil { + return xerrors.WithStackTrace( + xerrors.Join( + fmt.Errorf("attempt No.%d: %w", attempts, budget.ErrNoQuota), + acquireErr, + err, + ), + ) + } } - - code = m.StatusCode() } } } diff --git a/retry/retry_test.go b/retry/retry_test.go index 75d9d7aad..40e2f0294 100644 --- a/retry/retry_test.go +++ b/retry/retry_test.go @@ -2,6 +2,7 @@ package retry import ( "context" + "errors" "fmt" "testing" "time" @@ -13,6 +14,7 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/xtest" ) func TestRetryModes(t *testing.T) { @@ -188,6 +190,26 @@ func TestRetryTransportCancelled(t *testing.T) { } } +type noQuota struct{} + +var errNoQuota = errors.New("no quota") + +func (noQuota) Acquire(ctx context.Context) error { + return errNoQuota +} + +func TestRetryWithBudget(t *testing.T) { + xtest.TestManyTimes(t, func(t testing.TB) { + quota := noQuota{} + ctx, cancel := context.WithCancel(xtest.Context(t)) + defer cancel() + err := Retry(ctx, func(ctx context.Context) (err error) { + return RetryableError(errors.New("custom error")) + }, WithBudget(quota)) + require.ErrorIs(t, err, errNoQuota) + }) +} + type MockPanicCallback struct { called bool received interface{} diff --git a/retry/sql.go b/retry/sql.go index 9b826a1ca..23706971e 100644 --- a/retry/sql.go +++ b/retry/sql.go @@ -8,6 +8,7 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/internal/stack" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" + budget "github.com/ydb-platform/ydb-go-sdk/v3/retry/budget" "github.com/ydb-platform/ydb-go-sdk/v3/trace" ) @@ -142,12 +143,14 @@ func DoTx(ctx context.Context, db *sql.DB, op func(context.Context, *sql.Tx) err } attempts = 0 ) - if tracer, has := db.Driver().(interface { + if d, has := db.Driver().(interface { TraceRetry() *trace.Retry + RetryBudget() budget.Budget }); has { - options.retryOptions = append(options.retryOptions, nil) - copy(options.retryOptions[1:], options.retryOptions) - options.retryOptions[0] = WithTrace(tracer.TraceRetry()) + options.retryOptions = append(options.retryOptions, nil, nil) + copy(options.retryOptions[2:], options.retryOptions) + options.retryOptions[0] = WithTrace(d.TraceRetry()) + options.retryOptions[1] = WithBudget(d.RetryBudget()) } for _, opt := range opts { if opt != nil { diff --git a/sql.go b/sql.go index 0db01dd72..efe7cf92e 100644 --- a/sql.go +++ b/sql.go @@ -167,6 +167,7 @@ func Connector(parent *Driver, opts ...ConnectorOption) (SQLConnector, error) { ), xsql.WithOnClose(d.detach), xsql.WithTraceRetry(parent.config.TraceRetry()), + xsql.WithretryBudget(parent.config.RetryBudget()), )..., ) if err != nil { diff --git a/table/table.go b/table/table.go index 1cb1ed8e0..86e110040 100644 --- a/table/table.go +++ b/table/table.go @@ -11,6 +11,7 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/internal/types" "github.com/ydb-platform/ydb-go-sdk/v3/internal/value" "github.com/ydb-platform/ydb-go-sdk/v3/retry" + "github.com/ydb-platform/ydb-go-sdk/v3/retry/budget" "github.com/ydb-platform/ydb-go-sdk/v3/table/options" "github.com/ydb-platform/ydb-go-sdk/v3/table/result" "github.com/ydb-platform/ydb-go-sdk/v3/trace" @@ -520,24 +521,23 @@ var _ Option = retryOptionsOption{} type retryOptionsOption []retry.Option func (retryOptions retryOptionsOption) ApplyTableOption(opts *Options) { - opts.RetryOptions = append(opts.RetryOptions, retry.WithIdempotent(true)) + opts.RetryOptions = append(opts.RetryOptions, retryOptions...) } -func WithRetryOptions(retryOptions []retry.Option) retryOptionsOption { - return retryOptions +// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental +func WithRetryBudget(b budget.Budget) retryOptionsOption { + return []retry.Option{retry.WithBudget(b)} } -var _ Option = idempotentOption{} - -type idempotentOption struct{} - -func (idempotentOption) ApplyTableOption(opts *Options) { - opts.Idempotent = true - opts.RetryOptions = append(opts.RetryOptions, retry.WithIdempotent(true)) +// Deprecated: redundant option +// Will be removed after Oct 2024. +// Read about versioning policy: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#deprecated +func WithRetryOptions(retryOptions []retry.Option) retryOptionsOption { + return retryOptions } -func WithIdempotent() idempotentOption { - return idempotentOption{} +func WithIdempotent() retryOptionsOption { + return []retry.Option{retry.WithIdempotent(true)} } var _ Option = txSettingsOption{} diff --git a/tests/integration/retry_budget_test.go b/tests/integration/retry_budget_test.go new file mode 100644 index 000000000..36e847401 --- /dev/null +++ b/tests/integration/retry_budget_test.go @@ -0,0 +1,108 @@ +//go:build integration +// +build integration + +package integration + +import ( + "context" + "database/sql" + "errors" + "os" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/ydb-platform/ydb-go-sdk/v3" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/version" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/xtest" + "github.com/ydb-platform/ydb-go-sdk/v3/query" + "github.com/ydb-platform/ydb-go-sdk/v3/retry" + "github.com/ydb-platform/ydb-go-sdk/v3/retry/budget" + "github.com/ydb-platform/ydb-go-sdk/v3/table" +) + +type noQuota struct{} + +func (n noQuota) Acquire(ctx context.Context) error { + return errors.New("no quota") +} + +func TestRetryBudget(t *testing.T) { + ctx := xtest.Context(t) + + defaultLimiter := budget.Limited(1) + defer defaultLimiter.Stop() + + nativeDriver, err := ydb.Open(ctx, os.Getenv("YDB_CONNECTION_STRING"), + ydb.WithDiscoveryInterval(time.Second), + ydb.WithRetryBudget(defaultLimiter), + ) + require.NoError(t, err) + + defer func() { + // cleanup + _ = nativeDriver.Close(ctx) + }() + + c, err := ydb.Connector(nativeDriver) + require.NoError(t, err) + + defer func() { + // cleanup + _ = c.Close() + }() + + db := sql.OpenDB(c) + defer func() { + // cleanup + _ = db.Close() + }() + + retryBudget := noQuota{} + + t.Run("retry.Retry", func(t *testing.T) { + err := retry.Retry(ctx, func(ctx context.Context) (err error) { + return retry.RetryableError(errors.New("custom error")) + }, retry.WithBudget(retryBudget)) + require.ErrorIs(t, err, budget.ErrNoQuota) + }) + t.Run("retry.Do", func(t *testing.T) { + err := retry.Do(ctx, db, func(ctx context.Context, cc *sql.Conn) (err error) { + return retry.RetryableError(errors.New("custom error")) + }, retry.WithBudget(retryBudget)) + require.ErrorIs(t, err, budget.ErrNoQuota) + }) + t.Run("retry.DoTx", func(t *testing.T) { + err := retry.DoTx(ctx, db, func(ctx context.Context, tx *sql.Tx) (err error) { + return retry.RetryableError(errors.New("custom error")) + }, retry.WithBudget(retryBudget)) + require.ErrorIs(t, err, budget.ErrNoQuota) + }) + t.Run("db.Table().Do", func(t *testing.T) { + err := nativeDriver.Table().Do(ctx, func(ctx context.Context, s table.Session) error { + return retry.RetryableError(errors.New("custom error")) + }, table.WithRetryBudget(retryBudget)) + require.ErrorIs(t, err, budget.ErrNoQuota) + }) + t.Run("db.Table().DoTx", func(t *testing.T) { + err := nativeDriver.Table().DoTx(ctx, func(ctx context.Context, tx table.TransactionActor) error { + return retry.RetryableError(errors.New("custom error")) + }, table.WithRetryBudget(retryBudget)) + require.ErrorIs(t, err, budget.ErrNoQuota) + }) + if version.Gte(os.Getenv("YDB_VERSION"), "24.1") { + t.Run("db.Query().Do", func(t *testing.T) { + err := nativeDriver.Query().Do(ctx, func(ctx context.Context, s query.Session) error { + return retry.RetryableError(errors.New("custom error")) + }, query.WithRetryBudget(retryBudget)) + require.ErrorIs(t, err, budget.ErrNoQuota) + }) + t.Run("db.Query().DoTx", func(t *testing.T) { + err := nativeDriver.Query().DoTx(ctx, func(ctx context.Context, tx query.TxActor) error { + return retry.RetryableError(errors.New("custom error")) + }, query.WithRetryBudget(retryBudget)) + require.ErrorIs(t, err, budget.ErrNoQuota) + }) + } +} diff --git a/tests/slo/database/sql/storage.go b/tests/slo/database/sql/storage.go index 8c740803c..0fb188e54 100755 --- a/tests/slo/database/sql/storage.go +++ b/tests/slo/database/sql/storage.go @@ -9,6 +9,7 @@ import ( ydb "github.com/ydb-platform/ydb-go-sdk/v3" "github.com/ydb-platform/ydb-go-sdk/v3/retry" + "github.com/ydb-platform/ydb-go-sdk/v3/retry/budget" "github.com/ydb-platform/ydb-go-sdk/v3/table" "github.com/ydb-platform/ydb-go-sdk/v3/trace" @@ -74,12 +75,19 @@ type Storage struct { dropQuery string upsertQuery string selectQuery string + retryBudget interface { + budget.Budget + + Stop() + } } func NewStorage(ctx context.Context, cfg *config.Config, poolSize int) (s *Storage, err error) { ctx, cancel := context.WithTimeout(ctx, time.Minute*5) //nolint:gomnd defer cancel() + retryBudget := budget.Limited(int(float64(poolSize) * 0.1)) //nolint:gomnd + s = &Storage{ cfg: cfg, createQuery: fmt.Sprintf(createTemplate, cfg.Table, @@ -87,11 +95,13 @@ func NewStorage(ctx context.Context, cfg *config.Config, poolSize int) (s *Stora dropQuery: fmt.Sprintf(dropTemplate, cfg.Table), upsertQuery: fmt.Sprintf(upsertTemplate, cfg.Table), selectQuery: fmt.Sprintf(selectTemplate, cfg.Table), + retryBudget: retryBudget, } s.cc, err = ydb.Open( ctx, s.cfg.Endpoint+s.cfg.DB, + ydb.WithRetryBudget(retryBudget), ) if err != nil { return nil, fmt.Errorf("ydb.Open error: %w", err) @@ -224,6 +234,8 @@ func (s *Storage) dropTable(ctx context.Context) error { } func (s *Storage) close(ctx context.Context) error { + s.retryBudget.Stop() + if err := ctx.Err(); err != nil { return err } diff --git a/tests/slo/go.mod b/tests/slo/go.mod index f0f99850d..31a355a50 100644 --- a/tests/slo/go.mod +++ b/tests/slo/go.mod @@ -39,8 +39,8 @@ require ( github.com/ydb-platform/ydb-go-sdk-auth-environ v0.2.0 // indirect github.com/ydb-platform/ydb-go-yc v0.10.2 // indirect github.com/ydb-platform/ydb-go-yc-metadata v0.5.3 // indirect - golang.org/x/net v0.17.0 // indirect - golang.org/x/sys v0.16.0 // indirect + golang.org/x/net v0.23.0 // indirect + golang.org/x/sys v0.18.0 // indirect golang.org/x/text v0.14.0 // indirect google.golang.org/genproto v0.0.0-20240102182953-50ed04b92917 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240108191215-35c7eff3a6b1 // indirect diff --git a/tests/slo/go.sum b/tests/slo/go.sum index 59f24a544..901f5c500 100644 --- a/tests/slo/go.sum +++ b/tests/slo/go.sum @@ -1387,8 +1387,9 @@ golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= -golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= +golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs= +golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -1539,8 +1540,8 @@ golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= -golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= +golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= diff --git a/tests/slo/native/query/storage.go b/tests/slo/native/query/storage.go index 4c0132adc..a806775fd 100755 --- a/tests/slo/native/query/storage.go +++ b/tests/slo/native/query/storage.go @@ -5,13 +5,12 @@ import ( "errors" "fmt" "io" - "os" "path" "time" ydb "github.com/ydb-platform/ydb-go-sdk/v3" - "github.com/ydb-platform/ydb-go-sdk/v3/log" "github.com/ydb-platform/ydb-go-sdk/v3/query" + "github.com/ydb-platform/ydb-go-sdk/v3/retry/budget" "github.com/ydb-platform/ydb-go-sdk/v3/trace" "slo/internal/config" @@ -19,9 +18,14 @@ import ( ) type Storage struct { - db *ydb.Driver - cfg *config.Config - tablePath string + db *ydb.Driver + cfg *config.Config + tablePath string + retryBudget interface { + budget.Budget + + Stop() + } } const writeQuery = ` @@ -69,10 +73,12 @@ func NewStorage(ctx context.Context, cfg *config.Config, poolSize int) (*Storage ctx, cancel := context.WithTimeout(ctx, time.Minute*5) //nolint:gomnd defer cancel() + retryBudget := budget.Limited(int(float64(poolSize) * 0.1)) //nolint:gomnd + db, err := ydb.Open(ctx, cfg.Endpoint+cfg.DB, ydb.WithSessionPoolSizeLimit(poolSize), - ydb.WithLogger(log.Default(os.Stderr, log.WithMinLevel(log.ERROR)), trace.DetailsAll), + ydb.WithRetryBudget(retryBudget), ) if err != nil { return nil, err @@ -81,9 +87,10 @@ func NewStorage(ctx context.Context, cfg *config.Config, poolSize int) (*Storage prefix := path.Join(db.Name(), label) s := &Storage{ - db: db, - cfg: cfg, - tablePath: "`" + path.Join(prefix, cfg.Table) + "`", + db: db, + cfg: cfg, + tablePath: "`" + path.Join(prefix, cfg.Table) + "`", + retryBudget: retryBudget, } return s, nil @@ -254,6 +261,8 @@ func (s *Storage) dropTable(ctx context.Context) error { } func (s *Storage) close(ctx context.Context) error { + s.retryBudget.Stop() + var ( shutdownCtx context.Context shutdownCancel context.CancelFunc diff --git a/tests/slo/native/table/storage.go b/tests/slo/native/table/storage.go index 220054aa0..affd5f60b 100755 --- a/tests/slo/native/table/storage.go +++ b/tests/slo/native/table/storage.go @@ -7,6 +7,7 @@ import ( "time" "github.com/ydb-platform/ydb-go-sdk/v3" + "github.com/ydb-platform/ydb-go-sdk/v3/retry/budget" "github.com/ydb-platform/ydb-go-sdk/v3/table" "github.com/ydb-platform/ydb-go-sdk/v3/table/options" "github.com/ydb-platform/ydb-go-sdk/v3/table/result" @@ -62,16 +63,24 @@ type Storage struct { prefix string upsertQuery string selectQuery string + retryBudget interface { + budget.Budget + + Stop() + } } func NewStorage(ctx context.Context, cfg *config.Config, poolSize int) (*Storage, error) { ctx, cancel := context.WithTimeout(ctx, time.Minute*5) //nolint:gomnd defer cancel() + retryBudget := budget.Limited(int(float64(poolSize) * 0.1)) //nolint:gomnd + db, err := ydb.Open( ctx, cfg.Endpoint+cfg.DB, ydb.WithSessionPoolSizeLimit(poolSize), + ydb.WithRetryBudget(retryBudget), ) if err != nil { return nil, err @@ -85,6 +94,7 @@ func NewStorage(ctx context.Context, cfg *config.Config, poolSize int) (*Storage prefix: prefix, upsertQuery: fmt.Sprintf(upsertTemplate, prefix, cfg.Table), selectQuery: fmt.Sprintf(selectTemplate, prefix, cfg.Table), + retryBudget: retryBudget, } return s, nil @@ -245,6 +255,8 @@ func (s *Storage) dropTable(ctx context.Context) error { } func (s *Storage) close(ctx context.Context) error { + s.retryBudget.Stop() + ctx, cancel := context.WithTimeout(ctx, time.Duration(s.cfg.ShutdownTime)*time.Second) defer cancel() diff --git a/trace/sql.go b/trace/sql.go index 9ee597a39..f18e53826 100644 --- a/trace/sql.go +++ b/trace/sql.go @@ -101,7 +101,7 @@ type ( // Safe replacement of context are provided only inside callback function Context *context.Context Call call - TxContext *context.Context + TxContext context.Context //nolint:containedctx Tx tableTransactionInfo Query string } @@ -193,7 +193,7 @@ type ( // Safe replacement of context are provided only inside callback function Context *context.Context Call call - TxContext context.Context + TxContext context.Context //nolint:containedctx Tx tableTransactionInfo Query string } @@ -209,7 +209,7 @@ type ( // Safe replacement of context are provided only inside callback function Context *context.Context Call call - TxContext context.Context + TxContext context.Context //nolint:containedctx Tx tableTransactionInfo Query string } @@ -262,7 +262,7 @@ type ( // Safe replacement of context are provided only inside callback function Context *context.Context Call call - StmtContext context.Context + StmtContext context.Context //nolint:containedctx Query string } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals @@ -277,7 +277,7 @@ type ( // Safe replacement of context are provided only inside callback function Context *context.Context Call call - StmtContext context.Context + StmtContext context.Context //nolint:containedctx Query string } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals diff --git a/trace/sql_gtrace.go b/trace/sql_gtrace.go index c4a686e1d..1b7fd0867 100644 --- a/trace/sql_gtrace.go +++ b/trace/sql_gtrace.go @@ -1054,7 +1054,7 @@ func DatabaseSQLOnTxExec(t *DatabaseSQL, c *context.Context, call call, txContex } } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func DatabaseSQLOnTxPrepare(t *DatabaseSQL, c *context.Context, call call, txContext *context.Context, tx tableTransactionInfo, query string) func(error) { +func DatabaseSQLOnTxPrepare(t *DatabaseSQL, c *context.Context, call call, txContext context.Context, tx tableTransactionInfo, query string) func(error) { var p DatabaseSQLTxPrepareStartInfo p.Context = c p.Call = call diff --git a/trace/topic.go b/trace/topic.go index 53077a3b8..ad20bc980 100644 --- a/trace/topic.go +++ b/trace/topic.go @@ -96,7 +96,7 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicReaderPartitionReadStartResponseStartInfo struct { ReaderConnectionID string - PartitionContext context.Context + PartitionContext *context.Context Topic string PartitionID int64 PartitionSessionID int64 @@ -118,7 +118,7 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicReaderPartitionReadStopResponseStartInfo struct { ReaderConnectionID string - PartitionContext context.Context + PartitionContext *context.Context Topic string PartitionID int64 PartitionSessionID int64 @@ -197,7 +197,7 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicReaderReadMessagesStartInfo struct { - RequestContext context.Context + RequestContext *context.Context MinCount int MaxCount int FreeBufferCapacity int @@ -239,7 +239,7 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicReaderCommitStartInfo struct { - RequestContext context.Context + RequestContext *context.Context Topic string PartitionID int64 PartitionSessionID int64 diff --git a/trace/topic_gtrace.go b/trace/topic_gtrace.go index 5f9dcde80..2f3d4a6b4 100644 --- a/trace/topic_gtrace.go +++ b/trace/topic_gtrace.go @@ -1021,7 +1021,7 @@ func TopicOnReaderReconnectRequest(t *Topic, reason error, wasSent bool) { t.onReaderReconnectRequest(p) } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnReaderPartitionReadStartResponse(t *Topic, readerConnectionID string, partitionContext context.Context, topic string, partitionID int64, partitionSessionID int64) func(readOffset *int64, commitOffset *int64, _ error) { +func TopicOnReaderPartitionReadStartResponse(t *Topic, readerConnectionID string, partitionContext *context.Context, topic string, partitionID int64, partitionSessionID int64) func(readOffset *int64, commitOffset *int64, _ error) { var p TopicReaderPartitionReadStartResponseStartInfo p.ReaderConnectionID = readerConnectionID p.PartitionContext = partitionContext @@ -1038,7 +1038,7 @@ func TopicOnReaderPartitionReadStartResponse(t *Topic, readerConnectionID string } } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnReaderPartitionReadStopResponse(t *Topic, readerConnectionID string, partitionContext context.Context, topic string, partitionID int64, partitionSessionID int64, committedOffset int64, graceful bool) func(error) { +func TopicOnReaderPartitionReadStopResponse(t *Topic, readerConnectionID string, partitionContext *context.Context, topic string, partitionID int64, partitionSessionID int64, committedOffset int64, graceful bool) func(error) { var p TopicReaderPartitionReadStopResponseStartInfo p.ReaderConnectionID = readerConnectionID p.PartitionContext = partitionContext @@ -1055,7 +1055,7 @@ func TopicOnReaderPartitionReadStopResponse(t *Topic, readerConnectionID string, } } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnReaderCommit(t *Topic, requestContext context.Context, topic string, partitionID int64, partitionSessionID int64, startOffset int64, endOffset int64) func(error) { +func TopicOnReaderCommit(t *Topic, requestContext *context.Context, topic string, partitionID int64, partitionSessionID int64, startOffset int64, endOffset int64) func(error) { var p TopicReaderCommitStartInfo p.RequestContext = requestContext p.Topic = topic @@ -1162,7 +1162,7 @@ func TopicOnReaderReceiveDataResponse(t *Topic, readerConnectionID string, local } } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnReaderReadMessages(t *Topic, requestContext context.Context, minCount int, maxCount int, freeBufferCapacity int) func(messagesCount int, topic string, partitionID int64, partitionSessionID int64, offsetStart int64, offsetEnd int64, freeBufferCapacity int, _ error) { +func TopicOnReaderReadMessages(t *Topic, requestContext *context.Context, minCount int, maxCount int, freeBufferCapacity int) func(messagesCount int, topic string, partitionID int64, partitionSessionID int64, offsetStart int64, offsetEnd int64, freeBufferCapacity int, _ error) { var p TopicReaderReadMessagesStartInfo p.RequestContext = requestContext p.MinCount = minCount