Skip to content

Commit

Permalink
added package otel with tracing interface and implementations of trac…
Browse files Browse the repository at this point in the history
…e callbacks
  • Loading branch information
asmyasnikov committed Oct 14, 2024
1 parent 60754e6 commit 4c6cd58
Show file tree
Hide file tree
Showing 26 changed files with 3,604 additions and 1,314 deletions.
404 changes: 404 additions & 0 deletions internal/kv/field.go

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions log/field_test.go → internal/kv/field_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package log
package kv

import (
"errors"
Expand All @@ -16,7 +16,7 @@ func (s stringerTest) String() string {

func TestField_String(t *testing.T) {
for _, tt := range []struct {
f Field
f KeyValue
want string
panic bool
fail bool
Expand All @@ -42,7 +42,7 @@ func TestField_String(t *testing.T) {
{f: Any("any_string_ptr", func(v string) *string { return &v }("string pointer")), want: "*string(string pointer)"}, //nolint:lll
{f: Any("any_string_nil", (*string)(nil)), want: "<nil>"},
{f: Stringer("stringer", stringerTest("stringerTest")), want: "stringerTest"},
{f: Field{ftype: InvalidType, key: "invalid"}, want: "", panic: true},
{f: KeyValue{ftype: InvalidType, key: "invalid"}, want: "", panic: true},
} {
t.Run(tt.f.key, func(t *testing.T) {
// Known fieldType, but String() panics with it.
Expand All @@ -65,7 +65,7 @@ func TestField_String(t *testing.T) {
func TestField_AnyValue(t *testing.T) {
for _, tt := range []struct {
name string
f Field
f KeyValue
want interface{}
}{
{name: "int", f: Int("any", 1), want: 1},
Expand Down
105 changes: 53 additions & 52 deletions log/coordination.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"strconv"
"time"

"github.com/ydb-platform/ydb-go-sdk/v3/internal/kv"
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
)

Expand All @@ -29,8 +30,8 @@ func internalCoordination(

return func(info trace.CoordinationNewDoneInfo) {
l.Log(WithLevel(ctx, INFO), "done",
latencyField(start),
versionField(),
kv.Latency(start),
kv.Version(),
)
}
},
Expand All @@ -40,19 +41,19 @@ func internalCoordination(
}
ctx := with(*info.Context, TRACE, "ydb", "coordination", "node", "create")
l.Log(ctx, "start",
String("path", info.Path),
kv.String("path", info.Path),
)
start := time.Now()

return func(info trace.CoordinationCreateNodeDoneInfo) {
if info.Error == nil {
l.Log(WithLevel(ctx, INFO), "done",
latencyField(start),
kv.Latency(start),
)
} else {
l.Log(WithLevel(ctx, ERROR), "fail",
latencyField(start),
versionField(),
kv.Latency(start),
kv.Version(),
)
}
}
Expand All @@ -63,19 +64,19 @@ func internalCoordination(
}
ctx := with(*info.Context, TRACE, "ydb", "coordination", "node", "alter")
l.Log(ctx, "start",
String("path", info.Path),
kv.String("path", info.Path),
)
start := time.Now()

return func(info trace.CoordinationAlterNodeDoneInfo) {
if info.Error == nil {
l.Log(WithLevel(ctx, INFO), "done",
latencyField(start),
kv.Latency(start),
)
} else {
l.Log(WithLevel(ctx, ERROR), "fail",
latencyField(start),
versionField(),
kv.Latency(start),
kv.Version(),
)
}
}
Expand All @@ -86,19 +87,19 @@ func internalCoordination(
}
ctx := with(*info.Context, TRACE, "ydb", "coordination", "node", "drop")
l.Log(ctx, "start",
String("path", info.Path),
kv.String("path", info.Path),
)
start := time.Now()

return func(info trace.CoordinationDropNodeDoneInfo) {
if info.Error == nil {
l.Log(WithLevel(ctx, INFO), "done",
latencyField(start),
kv.Latency(start),
)
} else {
l.Log(WithLevel(ctx, ERROR), "fail",
latencyField(start),
versionField(),
kv.Latency(start),
kv.Version(),
)
}
}
Expand All @@ -109,19 +110,19 @@ func internalCoordination(
}
ctx := with(*info.Context, TRACE, "ydb", "coordination", "node", "describe")
l.Log(ctx, "start",
String("path", info.Path),
kv.String("path", info.Path),
)
start := time.Now()

return func(info trace.CoordinationDescribeNodeDoneInfo) {
if info.Error == nil {
l.Log(WithLevel(ctx, INFO), "done",
latencyField(start),
kv.Latency(start),
)
} else {
l.Log(WithLevel(ctx, ERROR), "fail",
latencyField(start),
versionField(),
kv.Latency(start),
kv.Version(),
)
}
}
Expand All @@ -137,12 +138,12 @@ func internalCoordination(
return func(info trace.CoordinationSessionDoneInfo) {
if info.Error == nil {
l.Log(WithLevel(ctx, INFO), "done",
latencyField(start),
kv.Latency(start),
)
} else {
l.Log(WithLevel(ctx, ERROR), "fail",
latencyField(start),
versionField(),
kv.Latency(start),
kv.Version(),
)
}
}
Expand All @@ -158,12 +159,12 @@ func internalCoordination(
return func(info trace.CoordinationCloseDoneInfo) {
if info.Error == nil {
l.Log(WithLevel(ctx, INFO), "done",
latencyField(start),
kv.Latency(start),
)
} else {
l.Log(WithLevel(ctx, ERROR), "fail",
latencyField(start),
versionField(),
kv.Latency(start),
kv.Version(),
)
}
}
Expand All @@ -182,9 +183,9 @@ func internalCoordination(

return func(info trace.CoordinationStreamNewDoneInfo) {
l.Log(ctx, "done",
latencyField(start),
Error(info.Error),
versionField(),
kv.Latency(start),
kv.Error(info.Error),
kv.Version(),
)
}
},
Expand All @@ -194,8 +195,8 @@ func internalCoordination(
}
ctx := with(context.Background(), TRACE, "ydb", "coordination", "session", "started")
l.Log(ctx, "",
String("sessionID", strconv.FormatUint(info.SessionID, 10)),
String("expectedSessionID", strconv.FormatUint(info.SessionID, 10)),
kv.String("sessionID", strconv.FormatUint(info.SessionID, 10)),
kv.String("expectedSessionID", strconv.FormatUint(info.SessionID, 10)),
)
},
OnSessionStartTimeout: func(info trace.CoordinationSessionStartTimeoutInfo) {
Expand All @@ -204,7 +205,7 @@ func internalCoordination(
}
ctx := with(context.Background(), TRACE, "ydb", "coordination", "session", "start", "timeout")
l.Log(ctx, "",
Stringer("timeout", info.Timeout),
kv.Stringer("timeout", info.Timeout),
)
},
OnSessionKeepAliveTimeout: func(info trace.CoordinationSessionKeepAliveTimeoutInfo) {
Expand All @@ -213,8 +214,8 @@ func internalCoordination(
}
ctx := with(context.Background(), TRACE, "ydb", "coordination", "session", "keepAlive", "timeout")
l.Log(ctx, "",
Stringer("timeout", info.Timeout),
Stringer("lastGoodResponseTime", info.LastGoodResponseTime),
kv.Stringer("timeout", info.Timeout),
kv.Stringer("lastGoodResponseTime", info.LastGoodResponseTime),
)
},
OnSessionStopped: func(info trace.CoordinationSessionStoppedInfo) {
Expand All @@ -223,8 +224,8 @@ func internalCoordination(
}
ctx := with(context.Background(), TRACE, "ydb", "coordination", "session", "stopped")
l.Log(ctx, "",
String("sessionID", strconv.FormatUint(info.SessionID, 10)),
String("expectedSessionID", strconv.FormatUint(info.SessionID, 10)),
kv.String("sessionID", strconv.FormatUint(info.SessionID, 10)),
kv.String("expectedSessionID", strconv.FormatUint(info.SessionID, 10)),
)
},
OnSessionStopTimeout: func(info trace.CoordinationSessionStopTimeoutInfo) {
Expand All @@ -233,7 +234,7 @@ func internalCoordination(
}
ctx := with(context.Background(), TRACE, "ydb", "coordination", "session", "stop", "timeout")
l.Log(ctx, "",
Stringer("timeout", info.Timeout),
kv.Stringer("timeout", info.Timeout),
)
},
OnSessionClientTimeout: func(info trace.CoordinationSessionClientTimeoutInfo) {
Expand All @@ -242,8 +243,8 @@ func internalCoordination(
}
ctx := with(context.Background(), TRACE, "ydb", "coordination", "session", "client", "timeout")
l.Log(ctx, "",
Stringer("timeout", info.Timeout),
Stringer("lastGoodResponseTime", info.LastGoodResponseTime),
kv.Stringer("timeout", info.Timeout),
kv.Stringer("lastGoodResponseTime", info.LastGoodResponseTime),
)
},
OnSessionServerExpire: func(info trace.CoordinationSessionServerExpireInfo) {
Expand All @@ -252,7 +253,7 @@ func internalCoordination(
}
ctx := with(context.Background(), TRACE, "ydb", "coordination", "session", "server", "expire")
l.Log(ctx, "",
Stringer("failure", info.Failure),
kv.Stringer("failure", info.Failure),
)
},
OnSessionServerError: func(info trace.CoordinationSessionServerErrorInfo) {
Expand All @@ -261,7 +262,7 @@ func internalCoordination(
}
ctx := with(context.Background(), TRACE, "ydb", "coordination", "session", "server", "error")
l.Log(ctx, "",
Stringer("failure", info.Failure),
kv.Stringer("failure", info.Failure),
)
},
OnSessionReceive: func(
Expand All @@ -278,10 +279,10 @@ func internalCoordination(

return func(info trace.CoordinationSessionReceiveDoneInfo) {
l.Log(ctx, "done",
latencyField(start),
Error(info.Error),
Stringer("response", info.Response),
versionField(),
kv.Latency(start),
kv.Error(info.Error),
kv.Stringer("response", info.Response),
kv.Version(),
)
}
},
Expand All @@ -291,7 +292,7 @@ func internalCoordination(
}
ctx := with(context.Background(), TRACE, "ydb", "coordination", "session", "receive", "unexpected")
l.Log(ctx, "",
Stringer("response", info.Response),
kv.Stringer("response", info.Response),
)
},
OnSessionStop: func(info trace.CoordinationSessionStopInfo) {
Expand All @@ -300,7 +301,7 @@ func internalCoordination(
}
ctx := with(context.Background(), TRACE, "ydb", "coordination", "session", "stop")
l.Log(ctx, "",
String("sessionID", strconv.FormatUint(info.SessionID, 10)),
kv.String("sessionID", strconv.FormatUint(info.SessionID, 10)),
)
},
OnSessionStart: func(
Expand All @@ -317,9 +318,9 @@ func internalCoordination(

return func(info trace.CoordinationSessionStartDoneInfo) {
l.Log(ctx, "done",
latencyField(start),
Error(info.Error),
versionField(),
kv.Latency(start),
kv.Error(info.Error),
kv.Version(),
)
}
},
Expand All @@ -333,15 +334,15 @@ func internalCoordination(
}
ctx := with(context.Background(), TRACE, "ydb", "coordination", "session", "send")
l.Log(ctx, "start",
Stringer("request", info.Request),
kv.Stringer("request", info.Request),
)
start := time.Now()

return func(info trace.CoordinationSessionSendDoneInfo) {
l.Log(ctx, "done",
latencyField(start),
Error(info.Error),
versionField(),
kv.Latency(start),
kv.Error(info.Error),
kv.Version(),
)
}
},
Expand Down
27 changes: 14 additions & 13 deletions log/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package log
import (
"time"

"github.com/ydb-platform/ydb-go-sdk/v3/internal/kv"
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
)

Expand All @@ -18,22 +19,22 @@ func internalDiscovery(l Logger, d trace.Detailer) (t trace.Discovery) {
}
ctx := with(*info.Context, DEBUG, "ydb", "discovery", "list", "endpoints")
l.Log(ctx, "start",
String("address", info.Address),
String("database", info.Database),
kv.String("address", info.Address),
kv.String("database", info.Database),
)
start := time.Now()

return func(info trace.DiscoveryDiscoverDoneInfo) {
if info.Error == nil {
l.Log(WithLevel(ctx, INFO), "done",
latencyField(start),
Stringer("endpoints", endpoints(info.Endpoints)),
kv.Latency(start),
kv.Stringer("endpoints", kv.Endpoints(info.Endpoints)),
)
} else {
l.Log(WithLevel(ctx, ERROR), "failed",
Error(info.Error),
latencyField(start),
versionField(),
kv.Error(info.Error),
kv.Latency(start),
kv.Version(),
)
}
}
Expand All @@ -49,15 +50,15 @@ func internalDiscovery(l Logger, d trace.Detailer) (t trace.Discovery) {
return func(info trace.DiscoveryWhoAmIDoneInfo) {
if info.Error == nil {
l.Log(ctx, "done",
latencyField(start),
String("user", info.User),
Strings("groups", info.Groups),
kv.Latency(start),
kv.String("user", info.User),
kv.Strings("groups", info.Groups),
)
} else {
l.Log(WithLevel(ctx, WARN), "failed",
Error(info.Error),
latencyField(start),
versionField(),
kv.Error(info.Error),
kv.Latency(start),
kv.Version(),
)
}
}
Expand Down
Loading

0 comments on commit 4c6cd58

Please sign in to comment.