Skip to content

Commit

Permalink
Merge pull request #1449 from ydb-platform/slo-logs
Browse files Browse the repository at this point in the history
fixed log output for SLO
  • Loading branch information
asmyasnikov authored Sep 9, 2024
2 parents a9b150e + eb4aa2d commit 3be1b9c
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 50 deletions.
22 changes: 8 additions & 14 deletions internal/query/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,8 @@ func (c *Client) Do(ctx context.Context, op query.Operation, opts ...options.DoO
defer cancel()

var (
onDone = trace.QueryOnDo(c.config.Trace(), &ctx,
settings = options.ParseDoOpts(c.config.Trace(), opts...)
onDone = trace.QueryOnDo(settings.Trace(), &ctx,
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/query.(*Client).Do"),
)
attempts = 0
Expand All @@ -238,8 +239,6 @@ func (c *Client) Do(ctx context.Context, op query.Operation, opts ...options.DoO

err := do(ctx, c.pool,
func(ctx context.Context, s *Session) error {
attempts++

return op(ctx, s)
},
append([]retry.Option{
Expand All @@ -250,7 +249,7 @@ func (c *Client) Do(ctx context.Context, op query.Operation, opts ...options.DoO
}
},
}),
}, options.ParseDoOpts(c.config.Trace(), opts...).RetryOpts()...)...,
}, settings.RetryOpts()...)...,
)

return err
Expand Down Expand Up @@ -481,23 +480,18 @@ func (c *Client) DoTx(ctx context.Context, op query.TxOperation, opts ...options
defer cancel()

var (
onDone = trace.QueryOnDoTx(c.config.Trace(), &ctx,
settings = options.ParseDoTxOpts(c.config.Trace(), opts...)
onDone = trace.QueryOnDoTx(settings.Trace(), &ctx,
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/query.(*Client).DoTx"),
)
doTxOpts = options.ParseDoTxOpts(opts...)
attempts = 0
)
defer func() {
onDone(attempts, finalErr)
}()

err := doTx(ctx, c.pool,
func(ctx context.Context, tx query.TxActor) error {
attempts++

return op(ctx, tx)
},
doTxOpts.TxSettings(),
err := doTx(ctx, c.pool, op,
settings.TxSettings(),
append(
[]retry.Option{
retry.WithTrace(&trace.Retry{
Expand All @@ -508,7 +502,7 @@ func (c *Client) DoTx(ctx context.Context, op query.TxOperation, opts ...options
},
}),
},
doTxOpts.RetryOpts()...,
settings.RetryOpts()...,
)...,
)
if err != nil {
Expand Down
6 changes: 4 additions & 2 deletions internal/query/options/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,12 @@ func ParseDoOpts(t *trace.Query, opts ...DoOption) (s *doSettings) {
return s
}

func ParseDoTxOpts(opts ...DoTxOption) (s *doTxSettings) {
func ParseDoTxOpts(t *trace.Query, opts ...DoTxOption) (s *doTxSettings) {
s = &doTxSettings{
txSettings: tx.NewSettings(tx.WithDefaultTxMode()),
doSettings: doSettings{},
doSettings: doSettings{
trace: t,
},
}

for _, opt := range opts {
Expand Down
13 changes: 1 addition & 12 deletions tests/slo/internal/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package metrics

import (
"fmt"
"log"
"time"

"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -122,21 +121,11 @@ func (m *Metrics) Start(name SpanName) Span {
return j
}

func (j Span) Stop(err error, attempts int) {
func (j Span) Finish(err error, attempts int) {
j.m.inflight.WithLabelValues(j.name).Sub(1)

latency := time.Since(j.start)

if attempts > 1 {
log.Printf("more than 1 attempt for request (request_type: %q, attempts: %d, start: %s, latency: %s, err: %v)",
j.name,
attempts,
j.start.Format(time.DateTime),
latency.String(),
err,
)
}

var (
successLabel = JobStatusOK
successCounter = j.m.oks
Expand Down
17 changes: 7 additions & 10 deletions tests/slo/internal/workers/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,17 @@ func (w *Workers) Read(ctx context.Context, wg *sync.WaitGroup, rl *rate.Limiter
}
}

func (w *Workers) read(ctx context.Context) (err error) {
func (w *Workers) read(ctx context.Context) error {
id := uint64(rand.Intn(int(w.cfg.InitialDataCount))) //nolint:gosec // speed more important

var attempts int

m := w.m.Start(metrics.JobRead)
defer func() {
m.Stop(err, attempts)
if err != nil {
log.Printf("get entry error: %v", err)
}
}()

_, attempts, err = w.s.Read(ctx, id)
_, attempts, err := w.s.Read(ctx, id)
if err != nil {
log.Printf("read failed with %d attempts: %v", attempts, err)
}

m.Finish(err, attempts)

return err
}
20 changes: 8 additions & 12 deletions tests/slo/internal/workers/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,26 +23,22 @@ func (w *Workers) Write(ctx context.Context, wg *sync.WaitGroup, rl *rate.Limite
}
}

func (w *Workers) write(ctx context.Context, gen *generator.Generator) (err error) {
var row generator.Row
row, err = gen.Generate()
func (w *Workers) write(ctx context.Context, gen *generator.Generator) error {
row, err := gen.Generate()
if err != nil {
log.Printf("generate error: %v", err)

return err
}

var attempts int

m := w.m.Start(metrics.JobWrite)
defer func() {
m.Stop(err, attempts)
if err != nil {
log.Printf("error when stop 'write' worker: %v", err)
}
}()

attempts, err = w.s.Write(ctx, row)
attempts, err := w.s.Write(ctx, row)
if err != nil {
log.Printf("write failed with %d attempts: %v", attempts, err)
}

m.Finish(err, attempts)

return err
}
5 changes: 5 additions & 0 deletions tests/slo/native/query/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ func main() {
ctx, cancel = context.WithTimeout(ctx, time.Duration(cfg.Time)*time.Second)
defer cancel()

go func() {
<-ctx.Done()
log.Println("exiting...")
}()

s, err := NewStorage(ctx, cfg, cfg.ReadRPS+cfg.WriteRPS, label)
if err != nil {
panic(fmt.Errorf("create storage failed: %w", err))
Expand Down

0 comments on commit 3be1b9c

Please sign in to comment.