From b117bb22ddb2f4f55b8b00a7e01312fcc0f682bb Mon Sep 17 00:00:00 2001 From: Aleksey Myasnikov Date: Tue, 26 Mar 2024 15:12:58 +0300 Subject: [PATCH] * Renamed `db.Coordination().CreateSession()` to `db.Coordination().Session()` for compatibility with protos --- CHANGELOG.md | 2 + coordination/coordination.go | 4 +- coordination/example_test.go | 2 +- coordination/options/options.go | 32 +++++----- examples/coordination/lock/main.go | 2 +- examples/coordination/workers/main.go | 2 +- internal/coordination/client.go | 23 ++++--- internal/coordination/session.go | 2 +- log/coordination.go | 25 +++++++- tests/integration/coordination_test.go | 2 +- trace/coordination.go | 30 ++++++--- trace/coordination_gtrace.go | 87 ++++++++++++++++++++++---- 12 files changed, 156 insertions(+), 57 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 37a341e51..55c6dd433 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,5 @@ +* Renamed `db.Coordination().CreateSession()` to `db.Coordination().Session()` for compatibility with protos + ## v3.61.0 * Added `Tuple` support for `Variant` in `ydb.ParamsBuilder()` diff --git a/coordination/coordination.go b/coordination/coordination.go index 4fa79bd40..bfe6e9870 100644 --- a/coordination/coordination.go +++ b/coordination/coordination.go @@ -16,7 +16,7 @@ type Client interface { DropNode(ctx context.Context, path string) (err error) DescribeNode(ctx context.Context, path string) (_ *scheme.Entry, _ *NodeConfig, err error) - // CreateSession starts a new session. This method blocks until the server session is created. The context provided + // Session starts a new session. This method blocks until the server session is created. The context provided // may be used to cancel the invocation. If the method completes successfully, the session remains alive even if // the context is canceled. // @@ -29,7 +29,7 @@ type Client interface { // # Experimental // // Notice: This API is EXPERIMENTAL and may be changed or removed in a later release. - CreateSession(ctx context.Context, path string, opts ...options.CreateSessionOption) (Session, error) + Session(ctx context.Context, path string, opts ...options.SessionOption) (Session, error) } const ( diff --git a/coordination/example_test.go b/coordination/example_test.go index 83ae594cc..20ca57852 100644 --- a/coordination/example_test.go +++ b/coordination/example_test.go @@ -81,7 +81,7 @@ func Example_semaphore() { } fmt.Printf("node description: %+v\nnode config: %+v\n", e, c) - s, err := db.Coordination().CreateSession(ctx, "/local/test") + s, err := db.Coordination().Session(ctx, "/local/test") if err != nil { fmt.Printf("failed to create session: %v\n", err) diff --git a/coordination/options/options.go b/coordination/options/options.go index 2f9c577db..e156bc068 100644 --- a/coordination/options/options.go +++ b/coordination/options/options.go @@ -7,70 +7,70 @@ import ( "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Coordination" ) -// WithDescription returns an CreateSessionOption that specifies a user-defined description that may be used to describe +// WithDescription returns an SessionOption that specifies a user-defined description that may be used to describe // the client. -func WithDescription(description string) CreateSessionOption { +func WithDescription(description string) SessionOption { return func(c *CreateSessionOptions) { c.Description = description } } -// WithSessionTimeout returns an CreateSessionOption that specifies the timeout during which client may restore a +// WithSessionTimeout returns an SessionOption that specifies the timeout during which client may restore a // detached session. The client is forced to terminate the session if the last successful session request occurred // earlier than this time. // // If this is not set, the client uses the default 5 seconds. -func WithSessionTimeout(timeout time.Duration) CreateSessionOption { +func WithSessionTimeout(timeout time.Duration) SessionOption { return func(c *CreateSessionOptions) { c.SessionTimeout = timeout } } -// WithSessionStartTimeout returns an CreateSessionOption that specifies the time that the client should wait for a +// WithSessionStartTimeout returns an SessionOption that specifies the time that the client should wait for a // response to the StartSession request from the server before it terminates the gRPC stream and tries to reconnect. // // If this is not set, the client uses the default time 1 second. -func WithSessionStartTimeout(timeout time.Duration) CreateSessionOption { +func WithSessionStartTimeout(timeout time.Duration) SessionOption { return func(c *CreateSessionOptions) { c.SessionStartTimeout = timeout } } -// WithSessionStopTimeout returns an CreateSessionOption that specifies the time that the client should wait for a +// WithSessionStopTimeout returns an SessionOption that specifies the time that the client should wait for a // response to the StopSession request from the server before it terminates the gRPC stream and tries to reconnect. // // If this is not set, the client uses the default time 1 second. -func WithSessionStopTimeout(timeout time.Duration) CreateSessionOption { +func WithSessionStopTimeout(timeout time.Duration) SessionOption { return func(c *CreateSessionOptions) { c.SessionStartTimeout = timeout } } -// WithSessionKeepAliveTimeout returns an CreateSessionOption that specifies the time that the client will wait before +// WithSessionKeepAliveTimeout returns an SessionOption that specifies the time that the client will wait before // it terminates the gRPC stream and tries to reconnect if no successful responses have been received from the server. // // If this is not set, the client uses the default time 10 seconds. -func WithSessionKeepAliveTimeout(timeout time.Duration) CreateSessionOption { +func WithSessionKeepAliveTimeout(timeout time.Duration) SessionOption { return func(c *CreateSessionOptions) { c.SessionKeepAliveTimeout = timeout } } -// WithSessionReconnectDelay returns an CreateSessionOption that specifies the time that the client will wait before it +// WithSessionReconnectDelay returns an SessionOption that specifies the time that the client will wait before it // tries to reconnect the underlying gRPC stream in case of error. // // If this is not set, the client uses the default time 500 milliseconds. -func WithSessionReconnectDelay(delay time.Duration) CreateSessionOption { +func WithSessionReconnectDelay(delay time.Duration) SessionOption { return func(c *CreateSessionOptions) { c.SessionReconnectDelay = delay } } -// CreateSessionOption configures how we create a new session. -type CreateSessionOption func(c *CreateSessionOptions) +// SessionOption configures how we create a new session. +type SessionOption func(c *CreateSessionOptions) -// CreateSessionOptions configure an CreateSession call. CreateSessionOptions are set by the CreateSessionOption values -// passed to the CreateSession function. +// CreateSessionOptions configure an Session call. CreateSessionOptions are set by the SessionOption values +// passed to the Session function. type CreateSessionOptions struct { Description string SessionTimeout time.Duration diff --git a/examples/coordination/lock/main.go b/examples/coordination/lock/main.go index 63f3c3a80..905bb1e4b 100644 --- a/examples/coordination/lock/main.go +++ b/examples/coordination/lock/main.go @@ -91,7 +91,7 @@ func main() { for { fmt.Println("waiting for a lock...") - session, err := db.Coordination().CreateSession(ctx, path) + session, err := db.Coordination().Session(ctx, path) if err != nil { fmt.Println("failed to open session", err) diff --git a/examples/coordination/workers/main.go b/examples/coordination/workers/main.go index a012d134b..451336a7a 100644 --- a/examples/coordination/workers/main.go +++ b/examples/coordination/workers/main.go @@ -110,7 +110,7 @@ func main() { fmt.Println("starting tasks") for { - session, err := db.Coordination().CreateSession(ctx, path) + session, err := db.Coordination().Session(ctx, path) if err != nil { fmt.Println("failed to open session", err) diff --git a/internal/coordination/client.go b/internal/coordination/client.go index a4429183e..70322f702 100644 --- a/internal/coordination/client.go +++ b/internal/coordination/client.go @@ -299,7 +299,7 @@ func describeNode( }, nil } -func newCreateSessionConfig(opts ...options.CreateSessionOption) *options.CreateSessionOptions { +func newCreateSessionConfig(opts ...options.SessionOption) *options.CreateSessionOptions { c := defaultCreateSessionConfig() for _, o := range opts { if o != nil { @@ -344,17 +344,17 @@ func defaultCreateSessionConfig() *options.CreateSessionOptions { } } -func (c *Client) CreateSession( +func (c *Client) Session( ctx context.Context, path string, - opts ...options.CreateSessionOption, + opts ...options.SessionOption, ) (_ coordination.Session, finalErr error) { if c == nil { return nil, xerrors.WithStackTrace(errNilClient) } - onDone := trace.CoordinationOnCreateSession(c.config.Trace(), &ctx, - stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/coordination.(*Client).CreateSession"), + onDone := trace.CoordinationOnSession(c.config.Trace(), &ctx, + stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/coordination.(*Client).Session"), path, ) defer func() { @@ -364,17 +364,20 @@ func (c *Client) CreateSession( return createSession(ctx, c, path, newCreateSessionConfig(opts...)) } -func (c *Client) Close(ctx context.Context) error { +func (c *Client) Close(ctx context.Context) (finalErr error) { if c == nil { return xerrors.WithStackTrace(errNilClient) } - c.closeSessions(ctx) + onDone := trace.CoordinationOnClose(c.config.Trace(), &ctx, + stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/coordination.(*Client).Close"), + ) + defer func() { + onDone(finalErr) + }() - return c.close(ctx) -} + c.closeSessions(ctx) -func (c *Client) close(context.Context) error { return nil } diff --git a/internal/coordination/session.go b/internal/coordination/session.go index 32e5668b6..4d7ec4649 100644 --- a/internal/coordination/session.go +++ b/internal/coordination/session.go @@ -3,7 +3,6 @@ package coordination import ( "context" "encoding/binary" - "github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext" "math" "math/rand" "sync" @@ -16,6 +15,7 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/coordination" "github.com/ydb-platform/ydb-go-sdk/v3/coordination/options" "github.com/ydb-platform/ydb-go-sdk/v3/internal/coordination/conversation" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext" "github.com/ydb-platform/ydb-go-sdk/v3/trace" ) diff --git a/log/coordination.go b/log/coordination.go index 24e8871ce..865b8cd9c 100644 --- a/log/coordination.go +++ b/log/coordination.go @@ -125,7 +125,7 @@ func internalCoordination( } } }, - OnCreateSession: func(info trace.CoordinationCreateSessionStartInfo) func(trace.CoordinationCreateSessionDoneInfo) { + OnSession: func(info trace.CoordinationSessionStartInfo) func(trace.CoordinationSessionDoneInfo) { if d.Details()&trace.CoordinationEvents == 0 { return nil } @@ -133,7 +133,28 @@ func internalCoordination( l.Log(ctx, "start") start := time.Now() - return func(info trace.CoordinationCreateSessionDoneInfo) { + return func(info trace.CoordinationSessionDoneInfo) { + if info.Error == nil { + l.Log(WithLevel(ctx, INFO), "done", + latencyField(start), + ) + } else { + l.Log(WithLevel(ctx, ERROR), "fail", + latencyField(start), + versionField(), + ) + } + } + }, + OnClose: func(info trace.CoordinationCloseStartInfo) func(trace.CoordinationCloseDoneInfo) { + if d.Details()&trace.CoordinationEvents == 0 { + return nil + } + ctx := with(*info.Context, TRACE, "ydb", "coordination", "close") + l.Log(ctx, "start") + start := time.Now() + + return func(info trace.CoordinationCloseDoneInfo) { if info.Error == nil { l.Log(WithLevel(ctx, INFO), "done", latencyField(start), diff --git a/tests/integration/coordination_test.go b/tests/integration/coordination_test.go index e4fce84fb..c3551e604 100644 --- a/tests/integration/coordination_test.go +++ b/tests/integration/coordination_test.go @@ -53,7 +53,7 @@ func TestCoordinationSemaphore(t *testing.T) { } fmt.Printf("node description: %+v\nnode config: %+v\n", e, c) - s, err := db.Coordination().CreateSession(ctx, nodePath) + s, err := db.Coordination().Session(ctx, nodePath) if err != nil { t.Fatalf("failed to create session: %v\n", err) } diff --git a/trace/coordination.go b/trace/coordination.go index d6e277e03..8c9ac18a0 100644 --- a/trace/coordination.go +++ b/trace/coordination.go @@ -15,12 +15,13 @@ type ( // Coordination specified trace of coordination client activity. // gtrace:gen Coordination struct { - OnNew func(CoordinationNewStartInfo) func(CoordinationNewDoneInfo) - OnCreateNode func(CoordinationCreateNodeStartInfo) func(CoordinationCreateNodeDoneInfo) - OnAlterNode func(CoordinationAlterNodeStartInfo) func(CoordinationAlterNodeDoneInfo) - OnDropNode func(CoordinationDropNodeStartInfo) func(CoordinationDropNodeDoneInfo) - OnDescribeNode func(CoordinationDescribeNodeStartInfo) func(CoordinationDescribeNodeDoneInfo) - OnCreateSession func(CoordinationCreateSessionStartInfo) func(CoordinationCreateSessionDoneInfo) + OnNew func(CoordinationNewStartInfo) func(CoordinationNewDoneInfo) + OnCreateNode func(CoordinationCreateNodeStartInfo) func(CoordinationCreateNodeDoneInfo) + OnAlterNode func(CoordinationAlterNodeStartInfo) func(CoordinationAlterNodeDoneInfo) + OnDropNode func(CoordinationDropNodeStartInfo) func(CoordinationDropNodeDoneInfo) + OnDescribeNode func(CoordinationDescribeNodeStartInfo) func(CoordinationDescribeNodeDoneInfo) + OnSession func(CoordinationSessionStartInfo) func(CoordinationSessionDoneInfo) + OnClose func(CoordinationCloseStartInfo) func(CoordinationCloseDoneInfo) OnStreamNew func(CoordinationStreamNewStartInfo) func(CoordinationStreamNewDoneInfo) OnSessionStarted func(CoordinationSessionStartedInfo) @@ -47,7 +48,18 @@ type ( Context *context.Context Call call } - CoordinationNewDoneInfo struct{} + CoordinationNewDoneInfo struct{} + CoordinationCloseStartInfo struct { + // Context make available context in trace callback function. + // Pointer to context provide replacement of context in trace callback function. + // Warning: concurrent access to pointer on client side must be excluded. + // Safe replacement of context are provided only inside callback function + Context *context.Context + Call call + } + CoordinationCloseDoneInfo struct { + Error error + } CoordinationCreateNodeStartInfo struct { // Context make available context in trace callback function. // Pointer to context provide replacement of context in trace callback function. @@ -100,7 +112,7 @@ type ( CoordinationDescribeNodeDoneInfo struct { Error error } - CoordinationCreateSessionStartInfo struct { + CoordinationSessionStartInfo struct { // Context make available context in trace callback function. // Pointer to context provide replacement of context in trace callback function. // Warning: concurrent access to pointer on client side must be excluded. @@ -110,7 +122,7 @@ type ( Path string } - CoordinationCreateSessionDoneInfo struct { + CoordinationSessionDoneInfo struct { Error error } CoordinationStreamNewStartInfo struct{} diff --git a/trace/coordination_gtrace.go b/trace/coordination_gtrace.go index c89040bac..91ccf3747 100644 --- a/trace/coordination_gtrace.go +++ b/trace/coordination_gtrace.go @@ -209,9 +209,9 @@ func (t *Coordination) Compose(x *Coordination, opts ...CoordinationComposeOptio } } { - h1 := t.OnCreateSession - h2 := x.OnCreateSession - ret.OnCreateSession = func(c CoordinationCreateSessionStartInfo) func(CoordinationCreateSessionDoneInfo) { + h1 := t.OnSession + h2 := x.OnSession + ret.OnSession = func(c CoordinationSessionStartInfo) func(CoordinationSessionDoneInfo) { if options.panicCallback != nil { defer func() { if e := recover(); e != nil { @@ -219,14 +219,49 @@ func (t *Coordination) Compose(x *Coordination, opts ...CoordinationComposeOptio } }() } - var r, r1 func(CoordinationCreateSessionDoneInfo) + var r, r1 func(CoordinationSessionDoneInfo) if h1 != nil { r = h1(c) } if h2 != nil { r1 = h2(c) } - return func(c CoordinationCreateSessionDoneInfo) { + return func(c CoordinationSessionDoneInfo) { + if options.panicCallback != nil { + defer func() { + if e := recover(); e != nil { + options.panicCallback(e) + } + }() + } + if r != nil { + r(c) + } + if r1 != nil { + r1(c) + } + } + } + } + { + h1 := t.OnClose + h2 := x.OnClose + ret.OnClose = func(c CoordinationCloseStartInfo) func(CoordinationCloseDoneInfo) { + if options.panicCallback != nil { + defer func() { + if e := recover(); e != nil { + options.panicCallback(e) + } + }() + } + var r, r1 func(CoordinationCloseDoneInfo) + if h1 != nil { + r = h1(c) + } + if h2 != nil { + r1 = h2(c) + } + return func(c CoordinationCloseDoneInfo) { if options.panicCallback != nil { defer func() { if e := recover(); e != nil { @@ -650,16 +685,31 @@ func (t *Coordination) onDescribeNode(c CoordinationDescribeNodeStartInfo) func( } return res } -func (t *Coordination) onCreateSession(c CoordinationCreateSessionStartInfo) func(CoordinationCreateSessionDoneInfo) { - fn := t.OnCreateSession +func (t *Coordination) onSession(c CoordinationSessionStartInfo) func(CoordinationSessionDoneInfo) { + fn := t.OnSession + if fn == nil { + return func(CoordinationSessionDoneInfo) { + return + } + } + res := fn(c) + if res == nil { + return func(CoordinationSessionDoneInfo) { + return + } + } + return res +} +func (t *Coordination) onClose(c CoordinationCloseStartInfo) func(CoordinationCloseDoneInfo) { + fn := t.OnClose if fn == nil { - return func(CoordinationCreateSessionDoneInfo) { + return func(CoordinationCloseDoneInfo) { return } } res := fn(c) if res == nil { - return func(CoordinationCreateSessionDoneInfo) { + return func(CoordinationCloseDoneInfo) { return } } @@ -853,14 +903,25 @@ func CoordinationOnDescribeNode(t *Coordination, c *context.Context, call call, res(p) } } -func CoordinationOnCreateSession(t *Coordination, c *context.Context, call call, path string) func(error) { - var p CoordinationCreateSessionStartInfo +func CoordinationOnSession(t *Coordination, c *context.Context, call call, path string) func(error) { + var p CoordinationSessionStartInfo p.Context = c p.Call = call p.Path = path - res := t.onCreateSession(p) + res := t.onSession(p) + return func(e error) { + var p CoordinationSessionDoneInfo + p.Error = e + res(p) + } +} +func CoordinationOnClose(t *Coordination, c *context.Context, call call) func(error) { + var p CoordinationCloseStartInfo + p.Context = c + p.Call = call + res := t.onClose(p) return func(e error) { - var p CoordinationCreateSessionDoneInfo + var p CoordinationCloseDoneInfo p.Error = e res(p) }