Skip to content

Commit

Permalink
Refactored retry.doOption and retry.doTxOption types
Browse files Browse the repository at this point in the history
Marked as deprecated `retry.WithDoRetryOptions` and `retry.WithDoTxRetryOptions`
  • Loading branch information
asmyasnikov committed Oct 5, 2023
1 parent aebceb5 commit 1b5075f
Show file tree
Hide file tree
Showing 30 changed files with 365 additions and 238 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
* Marked as deprecated `retry.WithDoRetryOptions` and `retry.WithDoTxRetryOptions`
* Added receiving first result set on construct `internal/table/scanner.NewStream()`
* Added experimental package `metrics` with SDK metrics
* Fixed redundant trace call for finished `database/sql` transactions
Expand Down
8 changes: 3 additions & 5 deletions SQL.md
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ err := retry.Do(context.TODO(), db, func(ctx context.Context, cc *sql.Conn) erro
}
...
return nil // good final of retry operation
}, retry.WithDoRetryOptions(retry.WithIdempotent(true)))
}, retry.WithIdempotent(true))

```

Expand All @@ -296,9 +296,7 @@ err := retry.DoTx(context.TODO(), db, func(ctx context.Context, tx *sql.Tx) erro
}
...
return nil // good final of retry tx operation
}, retry.WithDoTxRetryOptions(
retry.WithIdempotent(true),
), retry.WithTxOptions(&sql.TxOptions{
}, retry.WithIdempotent(true), retry.WithTxOptions(&sql.TxOptions{
Isolation: sql.LevelSnapshot,
ReadOnly: true,
}))
Expand Down Expand Up @@ -568,7 +566,7 @@ err := retry.Do(context.TODO(), db, func(ctx context.Context, cc *sql.Conn) erro
}
...
return nil // good final of retry operation
}, retry.WithDoRetryOptions(retry.WithIdempotent(true)))
}, retry.WithIdempotent(true))
```
## Troubleshooting <a name="troubleshooting"></a>
Expand Down
2 changes: 1 addition & 1 deletion example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func Example_databaseSQL() {
}
log.Printf("id=%v, myStr='%s'\n", id, myStr)
return nil
}, retry.WithDoTxRetryOptions(retry.WithIdempotent(true)))
}, retry.WithIdempotent(true))
if err != nil {
log.Printf("query failed: %v", err)
}
Expand Down
15 changes: 7 additions & 8 deletions examples/basic/database_sql/series.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"path"
"time"

"github.com/ydb-platform/ydb-go-sdk/v3"
"github.com/ydb-platform/ydb-go-sdk/v3/retry"
"github.com/ydb-platform/ydb-go-sdk/v3/sugar"
"github.com/ydb-platform/ydb-go-sdk/v3/table"
Expand All @@ -31,7 +30,7 @@ func selectDefault(ctx context.Context, db *sql.DB) (err error) {
}
log.Printf("AST = %s\n\nPlan = %s", ast, plan)
return nil
}, retry.WithDoRetryOptions(retry.WithIdempotent(true)))
}, retry.WithIdempotent(true))
if err != nil {
return fmt.Errorf("explain query failed: %w", err)
}
Expand Down Expand Up @@ -62,7 +61,7 @@ func selectDefault(ctx context.Context, db *sql.DB) (err error) {
)
}
return rows.Err()
}, retry.WithDoRetryOptions(retry.WithIdempotent(true)))
}, retry.WithIdempotent(true))
if err != nil {
return fmt.Errorf("execute data query failed: %w", err)
}
Expand Down Expand Up @@ -155,7 +154,7 @@ func selectScan(ctx context.Context, db *sql.DB) (err error) {
)
}
return rows.Err()
}, retry.WithDoRetryOptions(retry.WithIdempotent(true)))
}, retry.WithIdempotent(true))
if err != nil {
return fmt.Errorf("scan query failed: %w", err)
}
Expand Down Expand Up @@ -202,7 +201,7 @@ func fillTablesWithData(ctx context.Context, db *sql.DB) (err error) {
return err
}
return nil
}, retry.WithDoTxRetryOptions(retry.WithIdempotent(true)))
}, retry.WithIdempotent(true))
if err != nil {
return fmt.Errorf("upsert query failed: %w", err)
}
Expand Down Expand Up @@ -235,7 +234,7 @@ func prepareSchema(ctx context.Context, db *sql.DB) (err error) {
return err
}
return nil
}, retry.WithDoRetryOptions(retry.WithIdempotent(true)))
}, retry.WithIdempotent(true))
if err != nil {
return fmt.Errorf("create table failed: %w", err)
}
Expand Down Expand Up @@ -266,7 +265,7 @@ func prepareSchema(ctx context.Context, db *sql.DB) (err error) {
return err
}
return nil
}, retry.WithDoRetryOptions(retry.WithIdempotent(true)))
}, retry.WithIdempotent(true))
if err != nil {
return fmt.Errorf("create table failed: %w", err)
}
Expand Down Expand Up @@ -299,7 +298,7 @@ func prepareSchema(ctx context.Context, db *sql.DB) (err error) {
return err
}
return nil
}, retry.WithDoRetryOptions(retry.WithIdempotent(true)))
}, retry.WithIdempotent(true))
if err != nil {
return fmt.Errorf("create table failed: %w", err)
}
Expand Down
36 changes: 18 additions & 18 deletions internal/table/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,31 +241,31 @@ func (c *Client) CreateSession(ctx context.Context, opts ...table.Option) (_ tab
}
return s, nil
}
options := retryOptions(c.config.Trace(), opts...)
err = retry.Retry(
ctx,
err = retry.Retry(ctx,
func(ctx context.Context) (err error) {
s, err = createSession(ctx)
if err != nil {
return xerrors.WithStackTrace(err)
}
return nil
},
retry.WithIdempotent(true),
retry.WithID("CreateSession"),
retry.WithFastBackoff(options.FastBackoff),
retry.WithSlowBackoff(options.SlowBackoff),
retry.WithTrace(trace.Retry{
OnRetry: func(info trace.RetryLoopStartInfo) func(trace.RetryLoopIntermediateInfo) func(trace.RetryLoopDoneInfo) {
onIntermediate := trace.TableOnCreateSession(c.config.Trace(), info.Context)
return func(info trace.RetryLoopIntermediateInfo) func(trace.RetryLoopDoneInfo) {
onDone := onIntermediate(info.Error)
return func(info trace.RetryLoopDoneInfo) {
onDone(s, info.Attempts, info.Error)
}
}
},
}),
append(
[]retry.Option{
retry.WithIdempotent(true),
retry.WithID("CreateSession"),
retry.WithTrace(trace.Retry{
OnRetry: func(info trace.RetryLoopStartInfo) func(trace.RetryLoopIntermediateInfo) func(trace.RetryLoopDoneInfo) {
onIntermediate := trace.TableOnCreateSession(c.config.Trace(), info.Context)
return func(info trace.RetryLoopIntermediateInfo) func(trace.RetryLoopDoneInfo) {
onDone := onIntermediate(info.Error)
return func(info trace.RetryLoopDoneInfo) {
onDone(s, info.Attempts, info.Error)
}
}
},
}),
}, retryOptions(c.config.Trace(), opts...).RetryOptions...,
)...,
)
if err != nil {
return nil, xerrors.WithStackTrace(err)
Expand Down
27 changes: 9 additions & 18 deletions internal/table/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package table
import (
"context"

"github.com/ydb-platform/ydb-go-sdk/v3/internal/backoff"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/table/config"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
"github.com/ydb-platform/ydb-go-sdk/v3/retry"
Expand Down Expand Up @@ -50,18 +49,14 @@ func doTx(
attempts, onIntermediate := 0, trace.TableOnDoTx(
opts.Trace,
&ctx,
opts.ID,
opts.Idempotent,
isRetryCalledAbove(ctx),
)
defer func() {
onIntermediate(err)(attempts, err)
}()
err = retryBackoff(
ctx,
c,
opts.FastBackoff,
opts.SlowBackoff,
opts.Idempotent,
err = retryBackoff(ctx, c,
func(ctx context.Context, s table.Session) (err error) {
attempts++

Expand Down Expand Up @@ -110,6 +105,7 @@ func doTx(

return nil
},
opts.RetryOptions...,
)
if err != nil {
return xerrors.WithStackTrace(err)
Expand All @@ -127,11 +123,11 @@ func do(
if opts.Trace == nil {
opts.Trace = &trace.Table{}
}
attempts, onIntermediate := 0, trace.TableOnDo(opts.Trace, &ctx, opts.Idempotent, isRetryCalledAbove(ctx))
attempts, onIntermediate := 0, trace.TableOnDo(opts.Trace, &ctx, opts.ID, opts.Idempotent, isRetryCalledAbove(ctx))
defer func() {
onIntermediate(err)(attempts, err)
}()
return retryBackoff(ctx, c, opts.FastBackoff, opts.SlowBackoff, opts.Idempotent,
return retryBackoff(ctx, c,
func(ctx context.Context, s table.Session) (err error) {
attempts++

Expand All @@ -156,16 +152,15 @@ func do(

return nil
},
opts.RetryOptions...,
)
}

func retryBackoff(
ctx context.Context,
p SessionProvider,
fastBackoff backoff.Backoff,
slowBackoff backoff.Backoff,
isOperationIdempotent bool,
op table.Operation,
opts ...retry.Option,
) (err error) {
err = retry.Retry(markRetryCall(ctx),
func(ctx context.Context) (err error) {
Expand All @@ -188,9 +183,7 @@ func retryBackoff(

return nil
},
retry.WithFastBackoff(fastBackoff),
retry.WithSlowBackoff(slowBackoff),
retry.WithIdempotent(isOperationIdempotent),
opts...,
)
if err != nil {
return xerrors.WithStackTrace(err)
Expand All @@ -200,9 +193,7 @@ func retryBackoff(

func retryOptions(trace *trace.Table, opts ...table.Option) *table.Options {
options := &table.Options{
Trace: trace,
FastBackoff: backoff.Fast,
SlowBackoff: backoff.Slow,
Trace: trace,
TxSettings: table.TxSettings(
table.WithSerializableReadWrite(),
),
Expand Down
44 changes: 28 additions & 16 deletions internal/table/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,22 @@ func TestRetryerBackoffRetryCancelation(t *testing.T) {
return testErr
},
&table.Options{
FastBackoff: testutil.BackoffFunc(func(n int) <-chan time.Time {
ch := make(chan time.Time)
backoff <- ch
return ch
}),
SlowBackoff: testutil.BackoffFunc(func(n int) <-chan time.Time {
ch := make(chan time.Time)
backoff <- ch
return ch
}),
RetryOptions: []retry.Option{
retry.WithFastBackoff(
testutil.BackoffFunc(func(n int) <-chan time.Time {
ch := make(chan time.Time)
backoff <- ch
return ch
}),
),
retry.WithSlowBackoff(
testutil.BackoffFunc(func(n int) <-chan time.Time {
ch := make(chan time.Time)
backoff <- ch
return ch
}),
),
},
},
)
results <- err
Expand Down Expand Up @@ -207,12 +213,18 @@ func TestRetryerImmediateReturn(t *testing.T) {
return testErr
},
&table.Options{
FastBackoff: testutil.BackoffFunc(func(n int) <-chan time.Time {
panic("this code will not be called")
}),
SlowBackoff: testutil.BackoffFunc(func(n int) <-chan time.Time {
panic("this code will not be called")
}),
RetryOptions: []retry.Option{
retry.WithFastBackoff(
testutil.BackoffFunc(func(n int) <-chan time.Time {
panic("this code will not be called")
}),
),
retry.WithSlowBackoff(
testutil.BackoffFunc(func(n int) <-chan time.Time {
panic("this code will not be called")
}),
),
},
},
)
if !xerrors.Is(err, testErr) {
Expand Down
42 changes: 36 additions & 6 deletions metrics/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@ func table(config Config) (t trace.Table) {
wait := config.WithSystem("pool").GaugeVec("wait")
waitLatency := config.WithSystem("pool").WithSystem("wait").TimerVec("latency")
alive := config.GaugeVec("sessions", "node_id")
doAttempts := config.WithSystem("do").HistogramVec("attempts", []float64{0, 1, 2, 5, 10})
doErrors := config.WithSystem("do").WithSystem("intermediate").CounterVec("errors", "status")
doTxAttempts := config.WithSystem("doTx").HistogramVec("attempts", []float64{0, 1, 2, 5, 10})
doTxErrors := config.WithSystem("doTx").WithSystem("intermediate").CounterVec("errors", "status")
doAttempts := config.WithSystem("do").HistogramVec("attempts", []float64{0, 1, 2, 5, 10}, "name")
doErrors := config.WithSystem("do").CounterVec("errors", "status", "name")
doIntermediateErrors := config.WithSystem("do").WithSystem("intermediate").CounterVec("errors", "status", "name")
doLatency := config.WithSystem("do").TimerVec("latency", "status", "name")
doTxAttempts := config.WithSystem("doTx").HistogramVec("attempts", []float64{0, 1, 2, 5, 10}, "name")
doTxIntermediateErrors := config.WithSystem("doTx").WithSystem("intermediate").CounterVec("errors", "status", "name")
doTxErrors := config.WithSystem("doTx").CounterVec("errors", "status", "name")
doTxLatency := config.WithSystem("doTx").TimerVec("latency", "status", "name")
t.OnInit = func(info trace.TableInitStartInfo) func(trace.TableInitDoneInfo) {
return func(info trace.TableInitDoneInfo) {
limit.With(nil).Set(float64(info.Limit))
Expand All @@ -31,15 +35,28 @@ func table(config Config) (t trace.Table) {
) func(
trace.TableDoDoneInfo,
) {
var (
name = info.ID
start = time.Now()
)
return func(info trace.TableDoIntermediateInfo) func(trace.TableDoDoneInfo) {
if info.Error != nil && config.Details()&trace.TableEvents != 0 {
doErrors.With(map[string]string{
doIntermediateErrors.With(map[string]string{
"status": errorBrief(info.Error),
"name": name,
}).Inc()
}
return func(info trace.TableDoDoneInfo) {
if config.Details()&trace.TableEvents != 0 {
doAttempts.With(nil).Record(float64(info.Attempts))
doErrors.With(map[string]string{
"status": errorBrief(info.Error),
"name": name,
}).Inc()
doLatency.With(map[string]string{
"status": errorBrief(info.Error),
"name": name,
}).Record(time.Since(start))
}
}
}
Expand All @@ -49,15 +66,28 @@ func table(config Config) (t trace.Table) {
) func(
trace.TableDoTxDoneInfo,
) {
var (
name = info.ID
start = time.Now()
)
return func(info trace.TableDoTxIntermediateInfo) func(trace.TableDoTxDoneInfo) {
if info.Error != nil && config.Details()&trace.TableEvents != 0 {
doTxErrors.With(map[string]string{
doTxIntermediateErrors.With(map[string]string{
"status": errorBrief(info.Error),
"name": name,
}).Inc()
}
return func(info trace.TableDoTxDoneInfo) {
if config.Details()&trace.TableEvents != 0 {
doTxAttempts.With(nil).Record(float64(info.Attempts))
doTxErrors.With(map[string]string{
"status": errorBrief(info.Error),
"name": name,
}).Inc()
doTxLatency.With(map[string]string{
"status": errorBrief(info.Error),
"name": name,
}).Record(time.Since(start))
}
}
}
Expand Down
Loading

0 comments on commit 1b5075f

Please sign in to comment.