Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
global retry limiter
Browse files Browse the repository at this point in the history
asmyasnikov committed Apr 8, 2024
1 parent feea9b7 commit e3a69fe
Showing 16 changed files with 170 additions and 53 deletions.
7 changes: 7 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
@@ -12,6 +12,7 @@ import (
balancerConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/balancer/config"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/config"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/meta"
"github.com/ydb-platform/ydb-go-sdk/v3/retry"
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
)

@@ -157,6 +158,12 @@ func WithTrace(t trace.Driver, opts ...trace.DriverComposeOption) Option { //nol
}
}

func WithRetryLimiter(l retry.Limiter) Option {
return func(c *Config) {
config.SetRetryLimiter(&c.Common, l)
}
}

func WithTraceRetry(t *trace.Retry, opts ...trace.RetryComposeOption) Option {
return func(c *Config) {
config.SetTraceRetry(&c.Common, t, opts...)
1 change: 1 addition & 0 deletions internal/balancer/balancer.go
Original file line number Diff line number Diff line change
@@ -89,6 +89,7 @@ func (b *Balancer) clusterDiscovery(ctx context.Context) (err error) {
},
retry.WithIdempotent(true),
retry.WithTrace(b.driverConfig.TraceRetry()),
retry.WithLimiter(b.driverConfig.RetryLimiter()),
)
}

16 changes: 16 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
@@ -3,14 +3,18 @@ package config
import (
"time"

"github.com/ydb-platform/ydb-go-sdk/v3/retry"
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
)

var defaultRetryLimiter = retry.Quoter(-1)

type Common struct {
operationTimeout time.Duration
operationCancelAfter time.Duration
disableAutoRetry bool
traceRetry trace.Retry
retryLimiter retry.Limiter

panicCallback func(e interface{})
}
@@ -48,6 +52,14 @@ func (c *Common) TraceRetry() *trace.Retry {
return &c.traceRetry
}

func (c *Common) RetryLimiter() retry.Limiter {
if c.retryLimiter == nil {
return defaultRetryLimiter
}

return c.retryLimiter
}

// SetOperationTimeout define the maximum amount of time a YDB server will process
// an operation. After timeout exceeds YDB will try to cancel operation and
// regardless of the cancellation appropriate error will be returned to
@@ -81,3 +93,7 @@ func SetAutoRetry(c *Common, autoRetry bool) {
func SetTraceRetry(c *Common, t *trace.Retry, opts ...trace.RetryComposeOption) {
c.traceRetry = *c.traceRetry.Compose(t, opts...)
}

func SetRetryLimiter(c *Common, l retry.Limiter) {
c.retryLimiter = l
}
56 changes: 40 additions & 16 deletions internal/coordination/client.go
Original file line number Diff line number Diff line change
@@ -110,9 +110,15 @@ func (c *Client) CreateNode(ctx context.Context, path string, config coordinatio
return createNode(ctx, c.client, request)
}

return retry.Retry(ctx, func(ctx context.Context) error {
return createNode(ctx, c.client, request)
}, retry.WithStackTrace(), retry.WithIdempotent(true), retry.WithTrace(c.config.TraceRetry()))
return retry.Retry(ctx,
func(ctx context.Context) error {
return createNode(ctx, c.client, request)
},
retry.WithStackTrace(),
retry.WithIdempotent(true),
retry.WithTrace(c.config.TraceRetry()),
retry.WithLimiter(c.config.RetryLimiter()),
)
}

func (c *Client) AlterNode(ctx context.Context, path string, config coordination.NodeConfig) (finalErr error) {
@@ -137,9 +143,15 @@ func (c *Client) AlterNode(ctx context.Context, path string, config coordination
return xerrors.WithStackTrace(call(ctx))
}

return retry.Retry(ctx, func(ctx context.Context) (err error) {
return alterNode(ctx, c.client, request)
}, retry.WithStackTrace(), retry.WithIdempotent(true), retry.WithTrace(c.config.TraceRetry()))
return retry.Retry(ctx,
func(ctx context.Context) (err error) {
return alterNode(ctx, c.client, request)
},
retry.WithStackTrace(),
retry.WithIdempotent(true),
retry.WithTrace(c.config.TraceRetry()),
retry.WithLimiter(c.config.RetryLimiter()),
)
}

func alterNodeRequest(
@@ -192,9 +204,15 @@ func (c *Client) DropNode(ctx context.Context, path string) (finalErr error) {
return xerrors.WithStackTrace(call(ctx))
}

return retry.Retry(ctx, func(ctx context.Context) (err error) {
return dropNode(ctx, c.client, request)
}, retry.WithStackTrace(), retry.WithIdempotent(true), retry.WithTrace(c.config.TraceRetry()))
return retry.Retry(ctx,
func(ctx context.Context) (err error) {
return dropNode(ctx, c.client, request)
},
retry.WithStackTrace(),
retry.WithIdempotent(true),
retry.WithTrace(c.config.TraceRetry()),
retry.WithLimiter(c.config.RetryLimiter()),
)
}

func dropNodeRequest(path string, operationParams *Ydb_Operations.OperationParams) *Ydb_Coordination.DropNodeRequest {
@@ -241,14 +259,20 @@ func (c *Client) DescribeNode(
return describeNode(ctx, c.client, request)
}

err := retry.Retry(ctx, func(ctx context.Context) (err error) {
entry, config, err = describeNode(ctx, c.client, request)
if err != nil {
return xerrors.WithStackTrace(err)
}
err := retry.Retry(ctx,
func(ctx context.Context) (err error) {
entry, config, err = describeNode(ctx, c.client, request)
if err != nil {
return xerrors.WithStackTrace(err)
}

return nil
}, retry.WithStackTrace(), retry.WithIdempotent(true), retry.WithTrace(c.config.TraceRetry()))
return nil
},
retry.WithStackTrace(),
retry.WithIdempotent(true),
retry.WithTrace(c.config.TraceRetry()),
retry.WithLimiter(c.config.RetryLimiter()),
)
if err != nil {
return nil, nil, xerrors.WithStackTrace(err)
}
6 changes: 6 additions & 0 deletions internal/ratelimiter/client.go
Original file line number Diff line number Diff line change
@@ -63,6 +63,7 @@ func (c *Client) CreateResource(
retry.WithStackTrace(),
retry.WithIdempotent(true),
retry.WithTrace(c.config.TraceRetry()),
retry.WithLimiter(c.config.RetryLimiter()),
)
}

@@ -112,6 +113,7 @@ func (c *Client) AlterResource(
retry.WithStackTrace(),
retry.WithIdempotent(true),
retry.WithTrace(c.config.TraceRetry()),
retry.WithLimiter(c.config.RetryLimiter()),
)
}

@@ -161,6 +163,7 @@ func (c *Client) DropResource(
retry.WithStackTrace(),
retry.WithIdempotent(true),
retry.WithTrace(c.config.TraceRetry()),
retry.WithLimiter(c.config.RetryLimiter()),
)
}

@@ -206,6 +209,7 @@ func (c *Client) ListResource(
retry.WithIdempotent(true),
retry.WithStackTrace(),
retry.WithTrace(c.config.TraceRetry()),
retry.WithLimiter(c.config.RetryLimiter()),
)

return list, err
@@ -265,6 +269,7 @@ func (c *Client) DescribeResource(
retry.WithIdempotent(true),
retry.WithStackTrace(),
retry.WithTrace(c.config.TraceRetry()),
retry.WithLimiter(c.config.RetryLimiter()),
)

return
@@ -333,6 +338,7 @@ func (c *Client) AcquireResource(
return retry.Retry(ctx, call,
retry.WithStackTrace(),
retry.WithTrace(c.config.TraceRetry()),
retry.WithLimiter(c.config.RetryLimiter()),
)
}

9 changes: 7 additions & 2 deletions internal/scheme/client.go
Original file line number Diff line number Diff line change
@@ -22,7 +22,7 @@ import (
var errNilClient = xerrors.Wrap(errors.New("scheme client is not initialized"))

type Client struct {
config config.Config
config *config.Config
service Ydb_Scheme_V1.SchemeServiceClient
}

@@ -38,7 +38,7 @@ func (c *Client) Close(_ context.Context) error {
return nil
}

func New(ctx context.Context, cc grpc.ClientConnInterface, config config.Config) *Client {
func New(ctx context.Context, cc grpc.ClientConnInterface, config *config.Config) *Client {
return &Client{
config: config,
service: Ydb_Scheme_V1.NewSchemeServiceClient(cc),
@@ -64,6 +64,7 @@ func (c *Client) MakeDirectory(ctx context.Context, path string) (finalErr error
retry.WithStackTrace(),
retry.WithIdempotent(true),
retry.WithTrace(c.config.TraceRetry()),
retry.WithLimiter(c.config.RetryLimiter()),
)
}

@@ -103,6 +104,7 @@ func (c *Client) RemoveDirectory(ctx context.Context, path string) (finalErr err
retry.WithStackTrace(),
retry.WithIdempotent(true),
retry.WithTrace(c.config.TraceRetry()),
retry.WithLimiter(c.config.RetryLimiter()),
)
}

@@ -144,6 +146,7 @@ func (c *Client) ListDirectory(ctx context.Context, path string) (d scheme.Direc
retry.WithIdempotent(true),
retry.WithStackTrace(),
retry.WithTrace(c.config.TraceRetry()),
retry.WithLimiter(c.config.RetryLimiter()),
)

return d, xerrors.WithStackTrace(err)
@@ -207,6 +210,7 @@ func (c *Client) DescribePath(ctx context.Context, path string) (e scheme.Entry,
retry.WithIdempotent(true),
retry.WithStackTrace(),
retry.WithTrace(c.config.TraceRetry()),
retry.WithLimiter(c.config.RetryLimiter()),
)

return e, xerrors.WithStackTrace(err)
@@ -268,6 +272,7 @@ func (c *Client) ModifyPermissions(
retry.WithStackTrace(),
retry.WithIdempotent(true),
retry.WithTrace(c.config.TraceRetry()),
retry.WithLimiter(c.config.RetryLimiter()),
)
}

10 changes: 5 additions & 5 deletions internal/scheme/config/config.go
Original file line number Diff line number Diff line change
@@ -14,12 +14,12 @@ type Config struct {
}

// Trace returns trace over scheme client calls
func (c Config) Trace() *trace.Scheme {
func (c *Config) Trace() *trace.Scheme {
return c.trace
}

// Database returns database name
func (c Config) Database() string {
func (c *Config) Database() string {
return c.databaseName
}

@@ -46,13 +46,13 @@ func With(config config.Common) Option {
}
}

func New(opts ...Option) Config {
c := Config{
func New(opts ...Option) *Config {
c := &Config{
trace: &trace.Scheme{},
}
for _, opt := range opts {
if opt != nil {
opt(&c)
opt(c)
}
}

3 changes: 3 additions & 0 deletions internal/scripting/client.go
Original file line number Diff line number Diff line change
@@ -60,6 +60,7 @@ func (c *Client) Execute(
err = retry.Retry(ctx, call,
retry.WithStackTrace(),
retry.WithTrace(c.config.TraceRetry()),
retry.WithLimiter(c.config.RetryLimiter()),
)

return r, xerrors.WithStackTrace(err)
@@ -139,6 +140,7 @@ func (c *Client) Explain(
retry.WithStackTrace(),
retry.WithIdempotent(true),
retry.WithTrace(c.config.TraceRetry()),
retry.WithLimiter(c.config.RetryLimiter()),
)

return e, xerrors.WithStackTrace(err)
@@ -213,6 +215,7 @@ func (c *Client) StreamExecute(
err = retry.Retry(ctx, call,
retry.WithStackTrace(),
retry.WithTrace(c.config.TraceRetry()),
retry.WithLimiter(c.config.RetryLimiter()),
)

return r, xerrors.WithStackTrace(err)
1 change: 1 addition & 0 deletions internal/table/retry.go
Original file line number Diff line number Diff line change
@@ -99,6 +99,7 @@ func (c *Client) retryOptions(opts ...table.Option) *table.Options {
),
RetryOptions: []retry.Option{
retry.WithTrace(c.config.TraceRetry()),
retry.WithLimiter(c.config.RetryLimiter()),
},
}
for _, opt := range opts {
4 changes: 4 additions & 0 deletions internal/topic/topicclientinternal/client.go
Original file line number Diff line number Diff line change
@@ -87,6 +87,7 @@ func (c *Client) Alter(ctx context.Context, path string, opts ...topicoptions.Al
return retry.Retry(ctx, call,
retry.WithIdempotent(true),
retry.WithTrace(c.cfg.TraceRetry()),
retry.WithLimiter(c.cfg.RetryLimiter()),
)
}

@@ -119,6 +120,7 @@ func (c *Client) Create(
return retry.Retry(ctx, call,
retry.WithIdempotent(true),
retry.WithTrace(c.cfg.TraceRetry()),
retry.WithLimiter(c.cfg.RetryLimiter()),
)
}

@@ -156,6 +158,7 @@ func (c *Client) Describe(
err = retry.Retry(ctx, call,
retry.WithIdempotent(true),
retry.WithTrace(c.cfg.TraceRetry()),
retry.WithLimiter(c.cfg.RetryLimiter()),
)
} else {
err = call(ctx)
@@ -192,6 +195,7 @@ func (c *Client) Drop(ctx context.Context, path string, opts ...topicoptions.Dro
return retry.Retry(ctx, call,
retry.WithIdempotent(true),
retry.WithTrace(c.cfg.TraceRetry()),
retry.WithLimiter(c.cfg.RetryLimiter()),
)
}

Loading

0 comments on commit e3a69fe

Please sign in to comment.