Skip to content

Commit

Permalink
refactored retry budget sources
Browse files Browse the repository at this point in the history
  • Loading branch information
asmyasnikov committed Apr 23, 2024
1 parent 8a96d36 commit 2a635d1
Show file tree
Hide file tree
Showing 25 changed files with 183 additions and 147 deletions.
4 changes: 2 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
balancerConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/balancer/config"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/config"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/meta"
"github.com/ydb-platform/ydb-go-sdk/v3/retry"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/retry"
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
)

Expand Down Expand Up @@ -159,7 +159,7 @@ func WithTrace(t trace.Driver, opts ...trace.DriverComposeOption) Option { //nol
}

// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
func WithRetryLimiter(l retry.Limiter) Option {
func WithRetryLimiter(l retry.Budget) Option {
return func(c *Config) {
config.SetRetryLimiter(&c.Common, l)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (b *Balancer) clusterDiscovery(ctx context.Context) (err error) {
},
retry.WithIdempotent(true),
retry.WithTrace(b.driverConfig.TraceRetry()),
retry.WithLimiter(b.driverConfig.RetryLimiter()),
retry.WithBudget(b.driverConfig.RetryLimiter()),
)
}

Expand Down
9 changes: 5 additions & 4 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,19 @@ package config
import (
"time"

retry2 "github.com/ydb-platform/ydb-go-sdk/v3/internal/retry"
"github.com/ydb-platform/ydb-go-sdk/v3/retry"
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
)

var defaultRetryLimiter = retry.Quoter(-1)
var defaultRetryLimiter = retry.Budget(-1)

type Common struct {
operationTimeout time.Duration
operationCancelAfter time.Duration
disableAutoRetry bool
traceRetry trace.Retry
retryLimiter retry.Limiter
retryLimiter retry2.Budget

panicCallback func(e interface{})
}
Expand Down Expand Up @@ -52,7 +53,7 @@ func (c *Common) TraceRetry() *trace.Retry {
return &c.traceRetry
}

func (c *Common) RetryLimiter() retry.Limiter {
func (c *Common) RetryLimiter() retry2.Budget {
if c.retryLimiter == nil {
return defaultRetryLimiter
}
Expand Down Expand Up @@ -94,6 +95,6 @@ func SetTraceRetry(c *Common, t *trace.Retry, opts ...trace.RetryComposeOption)
c.traceRetry = *c.traceRetry.Compose(t, opts...)
}

func SetRetryLimiter(c *Common, l retry.Limiter) {
func SetRetryLimiter(c *Common, l retry2.Budget) {
c.retryLimiter = l
}
8 changes: 4 additions & 4 deletions internal/coordination/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (c *Client) CreateNode(ctx context.Context, path string, config coordinatio
retry.WithStackTrace(),
retry.WithIdempotent(true),
retry.WithTrace(c.config.TraceRetry()),
retry.WithLimiter(c.config.RetryLimiter()),
retry.WithBudget(c.config.RetryLimiter()),
)
}

Expand Down Expand Up @@ -150,7 +150,7 @@ func (c *Client) AlterNode(ctx context.Context, path string, config coordination
retry.WithStackTrace(),
retry.WithIdempotent(true),
retry.WithTrace(c.config.TraceRetry()),
retry.WithLimiter(c.config.RetryLimiter()),
retry.WithBudget(c.config.RetryLimiter()),
)
}

Expand Down Expand Up @@ -211,7 +211,7 @@ func (c *Client) DropNode(ctx context.Context, path string) (finalErr error) {
retry.WithStackTrace(),
retry.WithIdempotent(true),
retry.WithTrace(c.config.TraceRetry()),
retry.WithLimiter(c.config.RetryLimiter()),
retry.WithBudget(c.config.RetryLimiter()),
)
}

Expand Down Expand Up @@ -271,7 +271,7 @@ func (c *Client) DescribeNode(
retry.WithStackTrace(),
retry.WithIdempotent(true),
retry.WithTrace(c.config.TraceRetry()),
retry.WithLimiter(c.config.RetryLimiter()),
retry.WithBudget(c.config.RetryLimiter()),
)
if err != nil {
return nil, nil, xerrors.WithStackTrace(err)
Expand Down
5 changes: 3 additions & 2 deletions internal/query/options/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package options

import (
"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/tx"
retry2 "github.com/ydb-platform/ydb-go-sdk/v3/internal/retry"
"github.com/ydb-platform/ydb-go-sdk/v3/retry"
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
)
Expand Down Expand Up @@ -95,8 +96,8 @@ func WithTrace(t *trace.Query) traceOption {
return traceOption{t: t}
}

func WithLimiter(l retry.Limiter) retryOptionsOption {
return []retry.Option{retry.WithLimiter(l)}
func WithBudget(l retry2.Budget) retryOptionsOption {
return []retry.Option{retry.WithBudget(l)}
}

func ParseDoOpts(t *trace.Query, opts ...DoOption) (s *doSettings) {
Expand Down
12 changes: 6 additions & 6 deletions internal/ratelimiter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (c *Client) CreateResource(
retry.WithStackTrace(),
retry.WithIdempotent(true),
retry.WithTrace(c.config.TraceRetry()),
retry.WithLimiter(c.config.RetryLimiter()),
retry.WithBudget(c.config.RetryLimiter()),
)
}

Expand Down Expand Up @@ -113,7 +113,7 @@ func (c *Client) AlterResource(
retry.WithStackTrace(),
retry.WithIdempotent(true),
retry.WithTrace(c.config.TraceRetry()),
retry.WithLimiter(c.config.RetryLimiter()),
retry.WithBudget(c.config.RetryLimiter()),
)
}

Expand Down Expand Up @@ -163,7 +163,7 @@ func (c *Client) DropResource(
retry.WithStackTrace(),
retry.WithIdempotent(true),
retry.WithTrace(c.config.TraceRetry()),
retry.WithLimiter(c.config.RetryLimiter()),
retry.WithBudget(c.config.RetryLimiter()),
)
}

Expand Down Expand Up @@ -209,7 +209,7 @@ func (c *Client) ListResource(
retry.WithIdempotent(true),
retry.WithStackTrace(),
retry.WithTrace(c.config.TraceRetry()),
retry.WithLimiter(c.config.RetryLimiter()),
retry.WithBudget(c.config.RetryLimiter()),
)

return list, err
Expand Down Expand Up @@ -269,7 +269,7 @@ func (c *Client) DescribeResource(
retry.WithIdempotent(true),
retry.WithStackTrace(),
retry.WithTrace(c.config.TraceRetry()),
retry.WithLimiter(c.config.RetryLimiter()),
retry.WithBudget(c.config.RetryLimiter()),
)

return
Expand Down Expand Up @@ -338,7 +338,7 @@ func (c *Client) AcquireResource(
return retry.Retry(ctx, call,
retry.WithStackTrace(),
retry.WithTrace(c.config.TraceRetry()),
retry.WithLimiter(c.config.RetryLimiter()),
retry.WithBudget(c.config.RetryLimiter()),
)
}

Expand Down
33 changes: 13 additions & 20 deletions retry/quoter.go → internal/retry/budget.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,43 +2,36 @@ package retry

import (
"context"
"errors"
"time"

"github.com/jonboulle/clockwork"

"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
)

var (
// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
ErrNoQuota = xerrors.Wrap(errors.New("no retry quota"))

_ Limiter = (*rateLimiter)(nil)
)

type (
// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
Limiter interface {
Budget interface {
Acquire(ctx context.Context) error
}
// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
LimiterStoper interface {
Limiter
Stop()
}
rateLimiter struct {
budget struct {
clock clockwork.Clock
ticker clockwork.Ticker
quota chan struct{}
done chan struct{}
}
rateLimiterOption func(q *rateLimiter)
BudgetOption func(q *budget)
)

func WithBudgetClock(clock clockwork.Clock) BudgetOption {
return func(q *budget) {
q.clock = clock
}
}

// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
func Quoter(attemptsPerSecond int, opts ...rateLimiterOption) *rateLimiter {
q := &rateLimiter{
func NewBudget(attemptsPerSecond int, opts ...BudgetOption) *budget {
q := &budget{
clock: clockwork.NewRealClock(),
done: make(chan struct{}),
}
Expand Down Expand Up @@ -75,15 +68,15 @@ func Quoter(attemptsPerSecond int, opts ...rateLimiterOption) *rateLimiter {
}

// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
func (q *rateLimiter) Stop() {
func (q *budget) Stop() {
if q.ticker != nil {
q.ticker.Stop()
}
close(q.done)
}

// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
func (q *rateLimiter) Acquire(ctx context.Context) error {
func (q *budget) Acquire(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
Expand Down
10 changes: 4 additions & 6 deletions retry/quoter_test.go → internal/retry/budget_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,21 @@ import (
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xtest"
)

func TestUnlimitedLimiter(t *testing.T) {
func TestUnlimitedBudget(t *testing.T) {
xtest.TestManyTimes(t, func(t testing.TB) {
ctx, cancel := xcontext.WithCancel(xtest.Context(t))
q := Quoter(-1)
q := NewBudget(-1)
require.NoError(t, q.Acquire(ctx))
cancel()
require.ErrorIs(t, q.Acquire(ctx), context.Canceled)
})
}

func TestQuoter(t *testing.T) {
func TestBudget(t *testing.T) {
xtest.TestManyTimes(t, func(t testing.TB) {
ctx, cancel := xcontext.WithCancel(xtest.Context(t))
clock := clockwork.NewFakeClock()
q := Quoter(1, func(q *rateLimiter) {
q.clock = clock
})
q := NewBudget(1, WithBudgetClock(clock))
defer q.Stop()
require.NoError(t, q.Acquire(ctx))
acquireCh := make(chan struct{})
Expand Down
10 changes: 5 additions & 5 deletions internal/scheme/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (c *Client) MakeDirectory(ctx context.Context, path string) (finalErr error
retry.WithStackTrace(),
retry.WithIdempotent(true),
retry.WithTrace(c.config.TraceRetry()),
retry.WithLimiter(c.config.RetryLimiter()),
retry.WithBudget(c.config.RetryLimiter()),
)
}

Expand Down Expand Up @@ -104,7 +104,7 @@ func (c *Client) RemoveDirectory(ctx context.Context, path string) (finalErr err
retry.WithStackTrace(),
retry.WithIdempotent(true),
retry.WithTrace(c.config.TraceRetry()),
retry.WithLimiter(c.config.RetryLimiter()),
retry.WithBudget(c.config.RetryLimiter()),
)
}

Expand Down Expand Up @@ -146,7 +146,7 @@ func (c *Client) ListDirectory(ctx context.Context, path string) (d scheme.Direc
retry.WithIdempotent(true),
retry.WithStackTrace(),
retry.WithTrace(c.config.TraceRetry()),
retry.WithLimiter(c.config.RetryLimiter()),
retry.WithBudget(c.config.RetryLimiter()),
)

return d, xerrors.WithStackTrace(err)
Expand Down Expand Up @@ -210,7 +210,7 @@ func (c *Client) DescribePath(ctx context.Context, path string) (e scheme.Entry,
retry.WithIdempotent(true),
retry.WithStackTrace(),
retry.WithTrace(c.config.TraceRetry()),
retry.WithLimiter(c.config.RetryLimiter()),
retry.WithBudget(c.config.RetryLimiter()),
)

return e, xerrors.WithStackTrace(err)
Expand Down Expand Up @@ -272,7 +272,7 @@ func (c *Client) ModifyPermissions(
retry.WithStackTrace(),
retry.WithIdempotent(true),
retry.WithTrace(c.config.TraceRetry()),
retry.WithLimiter(c.config.RetryLimiter()),
retry.WithBudget(c.config.RetryLimiter()),
)
}

Expand Down
6 changes: 3 additions & 3 deletions internal/scripting/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (c *Client) Execute(
err = retry.Retry(ctx, call,
retry.WithStackTrace(),
retry.WithTrace(c.config.TraceRetry()),
retry.WithLimiter(c.config.RetryLimiter()),
retry.WithBudget(c.config.RetryLimiter()),
)

return r, xerrors.WithStackTrace(err)
Expand Down Expand Up @@ -140,7 +140,7 @@ func (c *Client) Explain(
retry.WithStackTrace(),
retry.WithIdempotent(true),
retry.WithTrace(c.config.TraceRetry()),
retry.WithLimiter(c.config.RetryLimiter()),
retry.WithBudget(c.config.RetryLimiter()),
)

return e, xerrors.WithStackTrace(err)
Expand Down Expand Up @@ -215,7 +215,7 @@ func (c *Client) StreamExecute(
err = retry.Retry(ctx, call,
retry.WithStackTrace(),
retry.WithTrace(c.config.TraceRetry()),
retry.WithLimiter(c.config.RetryLimiter()),
retry.WithBudget(c.config.RetryLimiter()),
)

return r, xerrors.WithStackTrace(err)
Expand Down
2 changes: 1 addition & 1 deletion internal/table/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (c *Client) retryOptions(opts ...table.Option) *table.Options {
),
RetryOptions: []retry.Option{
retry.WithTrace(c.config.TraceRetry()),
retry.WithLimiter(c.config.RetryLimiter()),
retry.WithBudget(c.config.RetryLimiter()),
},
}
for _, opt := range opts {
Expand Down
8 changes: 4 additions & 4 deletions internal/topic/topicclientinternal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (c *Client) Alter(ctx context.Context, path string, opts ...topicoptions.Al
return retry.Retry(ctx, call,
retry.WithIdempotent(true),
retry.WithTrace(c.cfg.TraceRetry()),
retry.WithLimiter(c.cfg.RetryLimiter()),
retry.WithBudget(c.cfg.RetryLimiter()),
)
}

Expand Down Expand Up @@ -120,7 +120,7 @@ func (c *Client) Create(
return retry.Retry(ctx, call,
retry.WithIdempotent(true),
retry.WithTrace(c.cfg.TraceRetry()),
retry.WithLimiter(c.cfg.RetryLimiter()),
retry.WithBudget(c.cfg.RetryLimiter()),
)
}

Expand Down Expand Up @@ -158,7 +158,7 @@ func (c *Client) Describe(
err = retry.Retry(ctx, call,
retry.WithIdempotent(true),
retry.WithTrace(c.cfg.TraceRetry()),
retry.WithLimiter(c.cfg.RetryLimiter()),
retry.WithBudget(c.cfg.RetryLimiter()),
)
} else {
err = call(ctx)
Expand Down Expand Up @@ -195,7 +195,7 @@ func (c *Client) Drop(ctx context.Context, path string, opts ...topicoptions.Dro
return retry.Retry(ctx, call,
retry.WithIdempotent(true),
retry.WithTrace(c.cfg.TraceRetry()),
retry.WithLimiter(c.cfg.RetryLimiter()),
retry.WithBudget(c.cfg.RetryLimiter()),
)
}

Expand Down
Loading

0 comments on commit 2a635d1

Please sign in to comment.