Skip to content

Commit

Permalink
Merge pull request #1184 from ydb-platform/retry-quoter
Browse files Browse the repository at this point in the history
added experimental `retry/budget` package
  • Loading branch information
asmyasnikov authored Apr 23, 2024
2 parents d716a3b + 7c3c1a7 commit b50dba0
Show file tree
Hide file tree
Showing 33 changed files with 772 additions and 164 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
* Added experimental package `retry/budget` for limit second and subsequent retry attempts
* Refactored internals for enabling `containedctx` linter
* Fixed the hanging semaphore issue on coordination session reconnect

Expand Down
8 changes: 8 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
balancerConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/balancer/config"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/config"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/meta"
"github.com/ydb-platform/ydb-go-sdk/v3/retry/budget"
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
)

Expand Down Expand Up @@ -157,6 +158,13 @@ func WithTrace(t trace.Driver, opts ...trace.DriverComposeOption) Option { //nol
}
}

// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
func WithRetryBudget(b budget.Budget) Option {
return func(c *Config) {
config.SetRetryBudget(&c.Common, b)
}
}

func WithTraceRetry(t *trace.Retry, opts ...trace.RetryComposeOption) Option {
return func(c *Config) {
config.SetTraceRetry(&c.Common, t, opts...)
Expand Down
43 changes: 43 additions & 0 deletions internal/backoff/delay.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package backoff

import (
"time"
)

type (
delayOptions struct {
fast Backoff
slow Backoff
}
delayOption func(o *delayOptions)
)

func WithFastBackoff(fast Backoff) delayOption {
return func(o *delayOptions) {
o.fast = fast
}
}

func WithSlowBackoff(slow Backoff) delayOption {
return func(o *delayOptions) {
o.slow = slow
}
}

func Delay(t Type, i int, opts ...delayOption) time.Duration {
optionsHolder := delayOptions{
fast: Fast,
slow: Slow,
}
for _, opt := range opts {
opt(&optionsHolder)
}
switch t {
case TypeFast:
return optionsHolder.fast.Delay(i)
case TypeSlow:
return optionsHolder.slow.Delay(i)
default:
return 0
}
}
92 changes: 92 additions & 0 deletions internal/backoff/delay_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package backoff

import (
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/ydb-platform/ydb-go-sdk/v3/internal/xtest"
)

func TestDelay(t *testing.T) {
for _, tt := range []struct {
name string
act time.Duration
exp time.Duration
}{
{
name: xtest.CurrentFileLine(),
act: Delay(TypeNoBackoff, 0),
exp: 0,
},
{
name: xtest.CurrentFileLine(),
act: Delay(TypeNoBackoff, 1),
exp: 0,
},
{
name: xtest.CurrentFileLine(),
act: Delay(TypeNoBackoff, 2),
exp: 0,
},
{
name: xtest.CurrentFileLine(),
act: Delay(TypeFast, 0, WithFastBackoff(New(
WithSlotDuration(fastSlot),
WithCeiling(6),
WithJitterLimit(1),
))),
exp: 5 * time.Millisecond,
},
{
name: xtest.CurrentFileLine(),
act: Delay(TypeFast, 1, WithFastBackoff(New(
WithSlotDuration(fastSlot),
WithCeiling(6),
WithJitterLimit(1),
))),
exp: 10 * time.Millisecond,
},
{
name: xtest.CurrentFileLine(),
act: Delay(TypeFast, 3, WithFastBackoff(New(
WithSlotDuration(fastSlot),
WithCeiling(6),
WithJitterLimit(1),
))),
exp: 40 * time.Millisecond,
},
{
name: xtest.CurrentFileLine(),
act: Delay(TypeSlow, 0, WithSlowBackoff(New(
WithSlotDuration(slowSlot),
WithCeiling(6),
WithJitterLimit(1),
))),
exp: time.Second,
},
{
name: xtest.CurrentFileLine(),
act: Delay(TypeSlow, 1, WithSlowBackoff(New(
WithSlotDuration(slowSlot),
WithCeiling(6),
WithJitterLimit(1),
))),
exp: 2 * time.Second,
},
{
name: xtest.CurrentFileLine(),
act: Delay(TypeSlow, 3, WithSlowBackoff(New(
WithSlotDuration(slowSlot),
WithCeiling(6),
WithJitterLimit(1),
))),
exp: 8 * time.Second,
},
} {
t.Run(tt.name, func(t *testing.T) {
require.Equal(t, tt.exp, tt.act)
})
}
}
1 change: 1 addition & 0 deletions internal/balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func (b *Balancer) clusterDiscovery(ctx context.Context) (err error) {
},
retry.WithIdempotent(true),
retry.WithTrace(b.driverConfig.TraceRetry()),
retry.WithBudget(b.driverConfig.RetryBudget()),
)
}

Expand Down
16 changes: 16 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,18 @@ package config
import (
"time"

"github.com/ydb-platform/ydb-go-sdk/v3/retry/budget"
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
)

var defaultRetryBudget = budget.Limited(-1)

type Common struct {
operationTimeout time.Duration
operationCancelAfter time.Duration
disableAutoRetry bool
traceRetry trace.Retry
retryBudget budget.Budget

panicCallback func(e interface{})
}
Expand Down Expand Up @@ -48,6 +52,14 @@ func (c *Common) TraceRetry() *trace.Retry {
return &c.traceRetry
}

func (c *Common) RetryBudget() budget.Budget {
if c.retryBudget == nil {
return defaultRetryBudget
}

return c.retryBudget
}

// SetOperationTimeout define the maximum amount of time a YDB server will process
// an operation. After timeout exceeds YDB will try to cancel operation and
// regardless of the cancellation appropriate error will be returned to
Expand Down Expand Up @@ -81,3 +93,7 @@ func SetAutoRetry(c *Common, autoRetry bool) {
func SetTraceRetry(c *Common, t *trace.Retry, opts ...trace.RetryComposeOption) {
c.traceRetry = *c.traceRetry.Compose(t, opts...)
}

func SetRetryBudget(c *Common, b budget.Budget) {
c.retryBudget = b
}
56 changes: 40 additions & 16 deletions internal/coordination/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,15 @@ func (c *Client) CreateNode(ctx context.Context, path string, config coordinatio
return createNode(ctx, c.client, request)
}

return retry.Retry(ctx, func(ctx context.Context) error {
return createNode(ctx, c.client, request)
}, retry.WithStackTrace(), retry.WithIdempotent(true), retry.WithTrace(c.config.TraceRetry()))
return retry.Retry(ctx,
func(ctx context.Context) error {
return createNode(ctx, c.client, request)
},
retry.WithStackTrace(),
retry.WithIdempotent(true),
retry.WithTrace(c.config.TraceRetry()),
retry.WithBudget(c.config.RetryBudget()),
)
}

func (c *Client) AlterNode(ctx context.Context, path string, config coordination.NodeConfig) (finalErr error) {
Expand All @@ -137,9 +143,15 @@ func (c *Client) AlterNode(ctx context.Context, path string, config coordination
return xerrors.WithStackTrace(call(ctx))
}

return retry.Retry(ctx, func(ctx context.Context) (err error) {
return alterNode(ctx, c.client, request)
}, retry.WithStackTrace(), retry.WithIdempotent(true), retry.WithTrace(c.config.TraceRetry()))
return retry.Retry(ctx,
func(ctx context.Context) (err error) {
return alterNode(ctx, c.client, request)
},
retry.WithStackTrace(),
retry.WithIdempotent(true),
retry.WithTrace(c.config.TraceRetry()),
retry.WithBudget(c.config.RetryBudget()),
)
}

func alterNodeRequest(
Expand Down Expand Up @@ -192,9 +204,15 @@ func (c *Client) DropNode(ctx context.Context, path string) (finalErr error) {
return xerrors.WithStackTrace(call(ctx))
}

return retry.Retry(ctx, func(ctx context.Context) (err error) {
return dropNode(ctx, c.client, request)
}, retry.WithStackTrace(), retry.WithIdempotent(true), retry.WithTrace(c.config.TraceRetry()))
return retry.Retry(ctx,
func(ctx context.Context) (err error) {
return dropNode(ctx, c.client, request)
},
retry.WithStackTrace(),
retry.WithIdempotent(true),
retry.WithTrace(c.config.TraceRetry()),
retry.WithBudget(c.config.RetryBudget()),
)
}

func dropNodeRequest(path string, operationParams *Ydb_Operations.OperationParams) *Ydb_Coordination.DropNodeRequest {
Expand Down Expand Up @@ -241,14 +259,20 @@ func (c *Client) DescribeNode(
return describeNode(ctx, c.client, request)
}

err := retry.Retry(ctx, func(ctx context.Context) (err error) {
entry, config, err = describeNode(ctx, c.client, request)
if err != nil {
return xerrors.WithStackTrace(err)
}
err := retry.Retry(ctx,
func(ctx context.Context) (err error) {
entry, config, err = describeNode(ctx, c.client, request)
if err != nil {
return xerrors.WithStackTrace(err)
}

return nil
}, retry.WithStackTrace(), retry.WithIdempotent(true), retry.WithTrace(c.config.TraceRetry()))
return nil
},
retry.WithStackTrace(),
retry.WithIdempotent(true),
retry.WithTrace(c.config.TraceRetry()),
retry.WithBudget(c.config.RetryBudget()),
)
if err != nil {
return nil, nil, xerrors.WithStackTrace(err)
}
Expand Down
40 changes: 17 additions & 23 deletions internal/query/options/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,15 @@ package options
import (
"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/tx"
"github.com/ydb-platform/ydb-go-sdk/v3/retry"
"github.com/ydb-platform/ydb-go-sdk/v3/retry/budget"
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
)

var (
_ DoOption = idempotentOption{}
_ DoOption = labelOption("")
_ DoOption = retryOptionsOption(nil)
_ DoOption = traceOption{}

_ DoTxOption = idempotentOption{}
_ DoTxOption = labelOption("")
_ DoTxOption = retryOptionsOption(nil)
_ DoTxOption = traceOption{}
_ DoTxOption = doTxSettingsOption{}
)
Expand All @@ -36,9 +35,8 @@ type (
txSettings tx.Settings
}

idempotentOption struct{}
labelOption string
traceOption struct {
retryOptionsOption []retry.Option
traceOption struct {
t *trace.Query
}
doTxSettingsOption struct {
Expand All @@ -62,14 +60,6 @@ func (s *doTxSettings) TxSettings() tx.Settings {
return s.txSettings
}

func (opt idempotentOption) applyDoTxOption(s *doTxSettings) {
s.doOpts = append(s.doOpts, opt)
}

func (idempotentOption) applyDoOption(s *doSettings) {
s.retryOpts = append(s.retryOpts, retry.WithIdempotent(true))
}

func (opt traceOption) applyDoOption(s *doSettings) {
s.trace = s.trace.Compose(opt.t)
}
Expand All @@ -78,12 +68,12 @@ func (opt traceOption) applyDoTxOption(s *doTxSettings) {
s.doOpts = append(s.doOpts, opt)
}

func (opt labelOption) applyDoOption(s *doSettings) {
s.retryOpts = append(s.retryOpts, retry.WithLabel(string(opt)))
func (opts retryOptionsOption) applyDoOption(s *doSettings) {
s.retryOpts = append(s.retryOpts, opts...)
}

func (opt labelOption) applyDoTxOption(s *doTxSettings) {
s.doOpts = append(s.doOpts, opt)
func (opts retryOptionsOption) applyDoTxOption(s *doTxSettings) {
s.doOpts = append(s.doOpts, opts)
}

func (opt doTxSettingsOption) applyDoTxOption(opts *doTxSettings) {
Expand All @@ -94,18 +84,22 @@ func WithTxSettings(txSettings tx.Settings) doTxSettingsOption {
return doTxSettingsOption{txSettings: txSettings}
}

func WithIdempotent() idempotentOption {
return idempotentOption{}
func WithIdempotent() retryOptionsOption {
return []retry.Option{retry.WithIdempotent(true)}
}

func WithLabel(lbl string) labelOption {
return labelOption(lbl)
func WithLabel(lbl string) retryOptionsOption {
return []retry.Option{retry.WithLabel(lbl)}
}

func WithTrace(t *trace.Query) traceOption {
return traceOption{t: t}
}

func WithRetryBudget(b budget.Budget) retryOptionsOption {
return []retry.Option{retry.WithBudget(b)}
}

func ParseDoOpts(t *trace.Query, opts ...DoOption) (s *doSettings) {
s = &doSettings{
trace: t,
Expand Down
Loading

0 comments on commit b50dba0

Please sign in to comment.