Skip to content

Commit

Permalink
Merge pull request #1165 from ydb-platform/create-session
Browse files Browse the repository at this point in the history
* Renamed `db.Coordination().CreateSession()` to `db.Coordination().Session()` for compatibility with protos
  • Loading branch information
asmyasnikov authored Mar 26, 2024
2 parents 66d2102 + b117bb2 commit 02aaa6e
Show file tree
Hide file tree
Showing 12 changed files with 157 additions and 57 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
* Renamed `db.Coordination().CreateSession()` to `db.Coordination().Session()` for compatibility with protos

## v3.61.0
* Added `Tuple` support for `Variant` in `ydb.ParamsBuilder()`

Expand Down
4 changes: 2 additions & 2 deletions coordination/coordination.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type Client interface {
DropNode(ctx context.Context, path string) (err error)
DescribeNode(ctx context.Context, path string) (_ *scheme.Entry, _ *NodeConfig, err error)

// CreateSession starts a new session. This method blocks until the server session is created. The context provided
// Session starts a new session. This method blocks until the server session is created. The context provided
// may be used to cancel the invocation. If the method completes successfully, the session remains alive even if
// the context is canceled.
//
Expand All @@ -29,7 +29,7 @@ type Client interface {
// # Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
CreateSession(ctx context.Context, path string, opts ...options.CreateSessionOption) (Session, error)
Session(ctx context.Context, path string, opts ...options.SessionOption) (Session, error)
}

const (
Expand Down
2 changes: 1 addition & 1 deletion coordination/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func Example_semaphore() {
}
fmt.Printf("node description: %+v\nnode config: %+v\n", e, c)

s, err := db.Coordination().CreateSession(ctx, "/local/test")
s, err := db.Coordination().Session(ctx, "/local/test")
if err != nil {
fmt.Printf("failed to create session: %v\n", err)

Expand Down
32 changes: 16 additions & 16 deletions coordination/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,70 +7,70 @@ import (
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Coordination"
)

// WithDescription returns an CreateSessionOption that specifies a user-defined description that may be used to describe
// WithDescription returns an SessionOption that specifies a user-defined description that may be used to describe
// the client.
func WithDescription(description string) CreateSessionOption {
func WithDescription(description string) SessionOption {
return func(c *CreateSessionOptions) {
c.Description = description
}
}

// WithSessionTimeout returns an CreateSessionOption that specifies the timeout during which client may restore a
// WithSessionTimeout returns an SessionOption that specifies the timeout during which client may restore a
// detached session. The client is forced to terminate the session if the last successful session request occurred
// earlier than this time.
//
// If this is not set, the client uses the default 5 seconds.
func WithSessionTimeout(timeout time.Duration) CreateSessionOption {
func WithSessionTimeout(timeout time.Duration) SessionOption {
return func(c *CreateSessionOptions) {
c.SessionTimeout = timeout
}
}

// WithSessionStartTimeout returns an CreateSessionOption that specifies the time that the client should wait for a
// WithSessionStartTimeout returns an SessionOption that specifies the time that the client should wait for a
// response to the StartSession request from the server before it terminates the gRPC stream and tries to reconnect.
//
// If this is not set, the client uses the default time 1 second.
func WithSessionStartTimeout(timeout time.Duration) CreateSessionOption {
func WithSessionStartTimeout(timeout time.Duration) SessionOption {
return func(c *CreateSessionOptions) {
c.SessionStartTimeout = timeout
}
}

// WithSessionStopTimeout returns an CreateSessionOption that specifies the time that the client should wait for a
// WithSessionStopTimeout returns an SessionOption that specifies the time that the client should wait for a
// response to the StopSession request from the server before it terminates the gRPC stream and tries to reconnect.
//
// If this is not set, the client uses the default time 1 second.
func WithSessionStopTimeout(timeout time.Duration) CreateSessionOption {
func WithSessionStopTimeout(timeout time.Duration) SessionOption {
return func(c *CreateSessionOptions) {
c.SessionStartTimeout = timeout
}
}

// WithSessionKeepAliveTimeout returns an CreateSessionOption that specifies the time that the client will wait before
// WithSessionKeepAliveTimeout returns an SessionOption that specifies the time that the client will wait before
// it terminates the gRPC stream and tries to reconnect if no successful responses have been received from the server.
//
// If this is not set, the client uses the default time 10 seconds.
func WithSessionKeepAliveTimeout(timeout time.Duration) CreateSessionOption {
func WithSessionKeepAliveTimeout(timeout time.Duration) SessionOption {
return func(c *CreateSessionOptions) {
c.SessionKeepAliveTimeout = timeout
}
}

// WithSessionReconnectDelay returns an CreateSessionOption that specifies the time that the client will wait before it
// WithSessionReconnectDelay returns an SessionOption that specifies the time that the client will wait before it
// tries to reconnect the underlying gRPC stream in case of error.
//
// If this is not set, the client uses the default time 500 milliseconds.
func WithSessionReconnectDelay(delay time.Duration) CreateSessionOption {
func WithSessionReconnectDelay(delay time.Duration) SessionOption {
return func(c *CreateSessionOptions) {
c.SessionReconnectDelay = delay
}
}

// CreateSessionOption configures how we create a new session.
type CreateSessionOption func(c *CreateSessionOptions)
// SessionOption configures how we create a new session.
type SessionOption func(c *CreateSessionOptions)

// CreateSessionOptions configure an CreateSession call. CreateSessionOptions are set by the CreateSessionOption values
// passed to the CreateSession function.
// CreateSessionOptions configure an Session call. CreateSessionOptions are set by the SessionOption values
// passed to the Session function.
type CreateSessionOptions struct {
Description string
SessionTimeout time.Duration
Expand Down
2 changes: 1 addition & 1 deletion examples/coordination/lock/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func main() {
for {
fmt.Println("waiting for a lock...")

session, err := db.Coordination().CreateSession(ctx, path)
session, err := db.Coordination().Session(ctx, path)
if err != nil {
fmt.Println("failed to open session", err)

Expand Down
2 changes: 1 addition & 1 deletion examples/coordination/workers/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func main() {

fmt.Println("starting tasks")
for {
session, err := db.Coordination().CreateSession(ctx, path)
session, err := db.Coordination().Session(ctx, path)
if err != nil {
fmt.Println("failed to open session", err)

Expand Down
23 changes: 13 additions & 10 deletions internal/coordination/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ func describeNode(
}, nil
}

func newCreateSessionConfig(opts ...options.CreateSessionOption) *options.CreateSessionOptions {
func newCreateSessionConfig(opts ...options.SessionOption) *options.CreateSessionOptions {
c := defaultCreateSessionConfig()
for _, o := range opts {
if o != nil {
Expand Down Expand Up @@ -344,17 +344,17 @@ func defaultCreateSessionConfig() *options.CreateSessionOptions {
}
}

func (c *Client) CreateSession(
func (c *Client) Session(
ctx context.Context,
path string,
opts ...options.CreateSessionOption,
opts ...options.SessionOption,
) (_ coordination.Session, finalErr error) {
if c == nil {
return nil, xerrors.WithStackTrace(errNilClient)
}

onDone := trace.CoordinationOnCreateSession(c.config.Trace(), &ctx,
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/coordination.(*Client).CreateSession"),
onDone := trace.CoordinationOnSession(c.config.Trace(), &ctx,
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/coordination.(*Client).Session"),
path,
)
defer func() {
Expand All @@ -364,17 +364,20 @@ func (c *Client) CreateSession(
return createSession(ctx, c, path, newCreateSessionConfig(opts...))
}

func (c *Client) Close(ctx context.Context) error {
func (c *Client) Close(ctx context.Context) (finalErr error) {
if c == nil {
return xerrors.WithStackTrace(errNilClient)
}

c.closeSessions(ctx)
onDone := trace.CoordinationOnClose(c.config.Trace(), &ctx,
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/coordination.(*Client).Close"),
)
defer func() {
onDone(finalErr)
}()

return c.close(ctx)
}
c.closeSessions(ctx)

func (c *Client) close(context.Context) error {
return nil
}

Expand Down
3 changes: 2 additions & 1 deletion internal/coordination/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/ydb-platform/ydb-go-sdk/v3/coordination"
"github.com/ydb-platform/ydb-go-sdk/v3/coordination/options"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/coordination/conversation"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
)

Expand Down Expand Up @@ -46,7 +47,7 @@ func createSession(
path string,
opts *options.CreateSessionOptions,
) (*session, error) {
sessionCtx, cancel := context.WithCancel(context.Background())
sessionCtx, cancel := xcontext.WithCancel(xcontext.ValueOnly(ctx))
s := session{
options: opts,
client: client,
Expand Down
25 changes: 23 additions & 2 deletions log/coordination.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,15 +125,36 @@ func internalCoordination(
}
}
},
OnCreateSession: func(info trace.CoordinationCreateSessionStartInfo) func(trace.CoordinationCreateSessionDoneInfo) {
OnSession: func(info trace.CoordinationSessionStartInfo) func(trace.CoordinationSessionDoneInfo) {
if d.Details()&trace.CoordinationEvents == 0 {
return nil
}
ctx := with(*info.Context, TRACE, "ydb", "coordination", "node", "describe")
l.Log(ctx, "start")
start := time.Now()

return func(info trace.CoordinationCreateSessionDoneInfo) {
return func(info trace.CoordinationSessionDoneInfo) {
if info.Error == nil {
l.Log(WithLevel(ctx, INFO), "done",
latencyField(start),
)
} else {
l.Log(WithLevel(ctx, ERROR), "fail",
latencyField(start),
versionField(),
)
}
}
},
OnClose: func(info trace.CoordinationCloseStartInfo) func(trace.CoordinationCloseDoneInfo) {
if d.Details()&trace.CoordinationEvents == 0 {
return nil
}
ctx := with(*info.Context, TRACE, "ydb", "coordination", "close")
l.Log(ctx, "start")
start := time.Now()

return func(info trace.CoordinationCloseDoneInfo) {
if info.Error == nil {
l.Log(WithLevel(ctx, INFO), "done",
latencyField(start),
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/coordination_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestCoordinationSemaphore(t *testing.T) {
}
fmt.Printf("node description: %+v\nnode config: %+v\n", e, c)

s, err := db.Coordination().CreateSession(ctx, nodePath)
s, err := db.Coordination().Session(ctx, nodePath)
if err != nil {
t.Fatalf("failed to create session: %v\n", err)
}
Expand Down
30 changes: 21 additions & 9 deletions trace/coordination.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@ type (
// Coordination specified trace of coordination client activity.
// gtrace:gen
Coordination struct {
OnNew func(CoordinationNewStartInfo) func(CoordinationNewDoneInfo)
OnCreateNode func(CoordinationCreateNodeStartInfo) func(CoordinationCreateNodeDoneInfo)
OnAlterNode func(CoordinationAlterNodeStartInfo) func(CoordinationAlterNodeDoneInfo)
OnDropNode func(CoordinationDropNodeStartInfo) func(CoordinationDropNodeDoneInfo)
OnDescribeNode func(CoordinationDescribeNodeStartInfo) func(CoordinationDescribeNodeDoneInfo)
OnCreateSession func(CoordinationCreateSessionStartInfo) func(CoordinationCreateSessionDoneInfo)
OnNew func(CoordinationNewStartInfo) func(CoordinationNewDoneInfo)
OnCreateNode func(CoordinationCreateNodeStartInfo) func(CoordinationCreateNodeDoneInfo)
OnAlterNode func(CoordinationAlterNodeStartInfo) func(CoordinationAlterNodeDoneInfo)
OnDropNode func(CoordinationDropNodeStartInfo) func(CoordinationDropNodeDoneInfo)
OnDescribeNode func(CoordinationDescribeNodeStartInfo) func(CoordinationDescribeNodeDoneInfo)
OnSession func(CoordinationSessionStartInfo) func(CoordinationSessionDoneInfo)
OnClose func(CoordinationCloseStartInfo) func(CoordinationCloseDoneInfo)

OnStreamNew func(CoordinationStreamNewStartInfo) func(CoordinationStreamNewDoneInfo)
OnSessionStarted func(CoordinationSessionStartedInfo)
Expand All @@ -47,7 +48,18 @@ type (
Context *context.Context
Call call
}
CoordinationNewDoneInfo struct{}
CoordinationNewDoneInfo struct{}
CoordinationCloseStartInfo struct {
// Context make available context in trace callback function.
// Pointer to context provide replacement of context in trace callback function.
// Warning: concurrent access to pointer on client side must be excluded.
// Safe replacement of context are provided only inside callback function
Context *context.Context
Call call
}
CoordinationCloseDoneInfo struct {
Error error
}
CoordinationCreateNodeStartInfo struct {
// Context make available context in trace callback function.
// Pointer to context provide replacement of context in trace callback function.
Expand Down Expand Up @@ -100,7 +112,7 @@ type (
CoordinationDescribeNodeDoneInfo struct {
Error error
}
CoordinationCreateSessionStartInfo struct {
CoordinationSessionStartInfo struct {
// Context make available context in trace callback function.
// Pointer to context provide replacement of context in trace callback function.
// Warning: concurrent access to pointer on client side must be excluded.
Expand All @@ -110,7 +122,7 @@ type (

Path string
}
CoordinationCreateSessionDoneInfo struct {
CoordinationSessionDoneInfo struct {
Error error
}
CoordinationStreamNewStartInfo struct{}
Expand Down
Loading

0 comments on commit 02aaa6e

Please sign in to comment.