diff --git a/internal/query/client.go b/internal/query/client.go index 86d8e938b..3d56dcae8 100644 --- a/internal/query/client.go +++ b/internal/query/client.go @@ -227,7 +227,8 @@ func (c *Client) Do(ctx context.Context, op query.Operation, opts ...options.DoO defer cancel() var ( - onDone = trace.QueryOnDo(c.config.Trace(), &ctx, + settings = options.ParseDoOpts(c.config.Trace(), opts...) + onDone = trace.QueryOnDo(settings.Trace(), &ctx, stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/query.(*Client).Do"), ) attempts = 0 @@ -238,8 +239,6 @@ func (c *Client) Do(ctx context.Context, op query.Operation, opts ...options.DoO err := do(ctx, c.pool, func(ctx context.Context, s *Session) error { - attempts++ - return op(ctx, s) }, append([]retry.Option{ @@ -250,7 +249,7 @@ func (c *Client) Do(ctx context.Context, op query.Operation, opts ...options.DoO } }, }), - }, options.ParseDoOpts(c.config.Trace(), opts...).RetryOpts()...)..., + }, settings.RetryOpts()...)..., ) return err @@ -481,23 +480,18 @@ func (c *Client) DoTx(ctx context.Context, op query.TxOperation, opts ...options defer cancel() var ( - onDone = trace.QueryOnDoTx(c.config.Trace(), &ctx, + settings = options.ParseDoTxOpts(c.config.Trace(), opts...) + onDone = trace.QueryOnDoTx(settings.Trace(), &ctx, stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/query.(*Client).DoTx"), ) - doTxOpts = options.ParseDoTxOpts(opts...) attempts = 0 ) defer func() { onDone(attempts, finalErr) }() - err := doTx(ctx, c.pool, - func(ctx context.Context, tx query.TxActor) error { - attempts++ - - return op(ctx, tx) - }, - doTxOpts.TxSettings(), + err := doTx(ctx, c.pool, op, + settings.TxSettings(), append( []retry.Option{ retry.WithTrace(&trace.Retry{ @@ -508,7 +502,7 @@ func (c *Client) DoTx(ctx context.Context, op query.TxOperation, opts ...options }, }), }, - doTxOpts.RetryOpts()..., + settings.RetryOpts()..., )..., ) if err != nil { diff --git a/internal/query/options/retry.go b/internal/query/options/retry.go index 11a458e49..af7c0f207 100644 --- a/internal/query/options/retry.go +++ b/internal/query/options/retry.go @@ -114,10 +114,12 @@ func ParseDoOpts(t *trace.Query, opts ...DoOption) (s *doSettings) { return s } -func ParseDoTxOpts(opts ...DoTxOption) (s *doTxSettings) { +func ParseDoTxOpts(t *trace.Query, opts ...DoTxOption) (s *doTxSettings) { s = &doTxSettings{ txSettings: tx.NewSettings(tx.WithDefaultTxMode()), - doSettings: doSettings{}, + doSettings: doSettings{ + trace: t, + }, } for _, opt := range opts { diff --git a/tests/slo/internal/metrics/metrics.go b/tests/slo/internal/metrics/metrics.go index 49b47eaae..3223d7df3 100644 --- a/tests/slo/internal/metrics/metrics.go +++ b/tests/slo/internal/metrics/metrics.go @@ -2,7 +2,6 @@ package metrics import ( "fmt" - "log" "time" "github.com/prometheus/client_golang/prometheus" @@ -122,21 +121,11 @@ func (m *Metrics) Start(name SpanName) Span { return j } -func (j Span) Stop(err error, attempts int) { +func (j Span) Finish(err error, attempts int) { j.m.inflight.WithLabelValues(j.name).Sub(1) latency := time.Since(j.start) - if attempts > 1 { - log.Printf("more than 1 attempt for request (request_type: %q, attempts: %d, start: %s, latency: %s, err: %v)", - j.name, - attempts, - j.start.Format(time.DateTime), - latency.String(), - err, - ) - } - var ( successLabel = JobStatusOK successCounter = j.m.oks diff --git a/tests/slo/internal/workers/read.go b/tests/slo/internal/workers/read.go index 5794d7c8d..d5280574a 100644 --- a/tests/slo/internal/workers/read.go +++ b/tests/slo/internal/workers/read.go @@ -23,20 +23,17 @@ func (w *Workers) Read(ctx context.Context, wg *sync.WaitGroup, rl *rate.Limiter } } -func (w *Workers) read(ctx context.Context) (err error) { +func (w *Workers) read(ctx context.Context) error { id := uint64(rand.Intn(int(w.cfg.InitialDataCount))) //nolint:gosec // speed more important - var attempts int - m := w.m.Start(metrics.JobRead) - defer func() { - m.Stop(err, attempts) - if err != nil { - log.Printf("get entry error: %v", err) - } - }() - _, attempts, err = w.s.Read(ctx, id) + _, attempts, err := w.s.Read(ctx, id) + if err != nil { + log.Printf("read failed with %d attempts: %v", attempts, err) + } + + m.Finish(err, attempts) return err } diff --git a/tests/slo/internal/workers/write.go b/tests/slo/internal/workers/write.go index 17a27cba4..0d11ef9b0 100644 --- a/tests/slo/internal/workers/write.go +++ b/tests/slo/internal/workers/write.go @@ -23,26 +23,22 @@ func (w *Workers) Write(ctx context.Context, wg *sync.WaitGroup, rl *rate.Limite } } -func (w *Workers) write(ctx context.Context, gen *generator.Generator) (err error) { - var row generator.Row - row, err = gen.Generate() +func (w *Workers) write(ctx context.Context, gen *generator.Generator) error { + row, err := gen.Generate() if err != nil { log.Printf("generate error: %v", err) return err } - var attempts int - m := w.m.Start(metrics.JobWrite) - defer func() { - m.Stop(err, attempts) - if err != nil { - log.Printf("error when stop 'write' worker: %v", err) - } - }() - attempts, err = w.s.Write(ctx, row) + attempts, err := w.s.Write(ctx, row) + if err != nil { + log.Printf("write failed with %d attempts: %v", attempts, err) + } + + m.Finish(err, attempts) return err } diff --git a/tests/slo/native/query/main.go b/tests/slo/native/query/main.go index fb7301bea..052106153 100644 --- a/tests/slo/native/query/main.go +++ b/tests/slo/native/query/main.go @@ -37,6 +37,11 @@ func main() { ctx, cancel = context.WithTimeout(ctx, time.Duration(cfg.Time)*time.Second) defer cancel() + go func() { + <-ctx.Done() + log.Println("exiting...") + }() + s, err := NewStorage(ctx, cfg, cfg.ReadRPS+cfg.WriteRPS, label) if err != nil { panic(fmt.Errorf("create storage failed: %w", err))