Skip to content

Commit

Permalink
Merge pull request #1457 from ydb-platform/query-session
Browse files Browse the repository at this point in the history
extracted query session controller logic into separated package
  • Loading branch information
asmyasnikov authored Sep 12, 2024
2 parents a06c3b5 + 802e7fb commit fb252b8
Show file tree
Hide file tree
Showing 13 changed files with 466 additions and 363 deletions.
43 changes: 23 additions & 20 deletions internal/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@ import (
)

type (
Item[T any] interface {
*T
Item interface {
IsAlive() bool
Close(ctx context.Context) error
}
Config[PT Item[T], T any] struct {
ItemConstraint[T any] interface {
*T
Item
}
Config[PT ItemConstraint[T], T any] struct {
trace *Trace
clock clockwork.Clock
limit int
Expand All @@ -32,15 +35,15 @@ type (
closeItem func(ctx context.Context, item PT)
idleThreshold time.Duration
}
itemInfo[PT Item[T], T any] struct {
itemInfo[PT ItemConstraint[T], T any] struct {
idle *xlist.Element[PT]
touched time.Time
}
waitChPool[PT Item[T], T any] interface {
waitChPool[PT ItemConstraint[T], T any] interface {
GetOrNew() *chan PT
Put(t *chan PT)
}
Pool[PT Item[T], T any] struct {
Pool[PT ItemConstraint[T], T any] struct {
config Config[PT, T]

createItem func(ctx context.Context) (PT, error)
Expand All @@ -55,62 +58,62 @@ type (

done chan struct{}
}
option[PT Item[T], T any] func(c *Config[PT, T])
Option[PT ItemConstraint[T], T any] func(c *Config[PT, T])
)

func WithCreateItemFunc[PT Item[T], T any](f func(ctx context.Context) (PT, error)) option[PT, T] {
func WithCreateItemFunc[PT ItemConstraint[T], T any](f func(ctx context.Context) (PT, error)) Option[PT, T] {
return func(c *Config[PT, T]) {
c.createItem = f
}
}

func WithSyncCloseItem[PT Item[T], T any]() option[PT, T] {
func WithSyncCloseItem[PT ItemConstraint[T], T any]() Option[PT, T] {
return func(c *Config[PT, T]) {
c.closeItem = func(ctx context.Context, item PT) {
_ = item.Close(ctx)
}
}
}

func WithCreateItemTimeout[PT Item[T], T any](t time.Duration) option[PT, T] {
func WithCreateItemTimeout[PT ItemConstraint[T], T any](t time.Duration) Option[PT, T] {
return func(c *Config[PT, T]) {
c.createTimeout = t
}
}

func WithCloseItemTimeout[PT Item[T], T any](t time.Duration) option[PT, T] {
func WithCloseItemTimeout[PT ItemConstraint[T], T any](t time.Duration) Option[PT, T] {
return func(c *Config[PT, T]) {
c.closeTimeout = t
}
}

func WithLimit[PT Item[T], T any](size int) option[PT, T] {
func WithLimit[PT ItemConstraint[T], T any](size int) Option[PT, T] {
return func(c *Config[PT, T]) {
c.limit = size
}
}

func WithTrace[PT Item[T], T any](t *Trace) option[PT, T] {
func WithTrace[PT ItemConstraint[T], T any](t *Trace) Option[PT, T] {
return func(c *Config[PT, T]) {
c.trace = t
}
}

func WithIdleThreshold[PT Item[T], T any](idleThreshold time.Duration) option[PT, T] {
func WithIdleThreshold[PT ItemConstraint[T], T any](idleThreshold time.Duration) Option[PT, T] {
return func(c *Config[PT, T]) {
c.idleThreshold = idleThreshold
}
}

func WithClock[PT Item[T], T any](clock clockwork.Clock) option[PT, T] {
func WithClock[PT ItemConstraint[T], T any](clock clockwork.Clock) Option[PT, T] {
return func(c *Config[PT, T]) {
c.clock = clock
}
}

func New[PT Item[T], T any](
func New[PT ItemConstraint[T], T any](
ctx context.Context,
opts ...option[PT, T],
opts ...Option[PT, T],
) *Pool[PT, T] {
p := &Pool[PT, T]{
config: Config[PT, T]{
Expand Down Expand Up @@ -162,14 +165,14 @@ func New[PT Item[T], T any](
}

// defaultCreateItem returns a new item
func defaultCreateItem[T any, PT Item[T]](context.Context) (PT, error) {
func defaultCreateItem[T any, PT ItemConstraint[T]](context.Context) (PT, error) {
var item T

return &item, nil
}

// makeAsyncCreateItemFunc wraps the createItem function with timeout handling
func makeAsyncCreateItemFunc[PT Item[T], T any]( //nolint:funlen
func makeAsyncCreateItemFunc[PT ItemConstraint[T], T any]( //nolint:funlen
p *Pool[PT, T],
) func(ctx context.Context) (PT, error) {
return func(ctx context.Context) (PT, error) {
Expand Down Expand Up @@ -277,7 +280,7 @@ func (p *Pool[PT, T]) Stats() Stats {
return p.stats()
}

func makeAsyncCloseItemFunc[PT Item[T], T any](
func makeAsyncCloseItemFunc[PT ItemConstraint[T], T any](
p *Pool[PT, T],
) func(ctx context.Context, item PT) {
return func(ctx context.Context, item PT) {
Expand Down
4 changes: 2 additions & 2 deletions internal/pool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func caller() string {
return fmt.Sprintf("%s:%d", path.Base(file), line)
}

func mustGetItem[PT Item[T], T any](t testing.TB, p *Pool[PT, T]) PT {
func mustGetItem[PT ItemConstraint[T], T any](t testing.TB, p *Pool[PT, T]) PT {
s, err := p.getItem(context.Background())
if err != nil {
t.Helper()
Expand All @@ -145,7 +145,7 @@ func mustGetItem[PT Item[T], T any](t testing.TB, p *Pool[PT, T]) PT {
return s
}

func mustPutItem[PT Item[T], T any](t testing.TB, p *Pool[PT, T], item PT) {
func mustPutItem[PT ItemConstraint[T], T any](t testing.TB, p *Pool[PT, T], item PT) {
if err := p.putItem(context.Background(), item); err != nil {
t.Helper()
t.Fatalf("%s: %v", caller(), err)
Expand Down
45 changes: 24 additions & 21 deletions internal/query/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/config"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/options"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/result"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/session"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/tx"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/stack"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/types"
Expand All @@ -41,9 +42,9 @@ type (
With(ctx context.Context, f func(ctx context.Context, s *Session) error, opts ...retry.Option) error
}
Client struct {
config *config.Config
queryServiceClient Ydb_Query_V1.QueryServiceClient
pool sessionPool
config *config.Config
client Ydb_Query_V1.QueryServiceClient
pool sessionPool

done chan struct{}
}
Expand Down Expand Up @@ -100,7 +101,7 @@ func (c *Client) FetchScriptResults(ctx context.Context,
opID string, opts ...options.FetchScriptOption,
) (*options.FetchScriptResult, error) {
r, err := retry.RetryWithResult(ctx, func(ctx context.Context) (*options.FetchScriptResult, error) {
r, err := fetchScriptResults(ctx, c.queryServiceClient, opID,
r, err := fetchScriptResults(ctx, c.client, opID,
append(opts, func(request *options.FetchScriptResultsRequest) {
request.Trace = c.config.Trace()
})...,
Expand Down Expand Up @@ -175,7 +176,7 @@ func (c *Client) ExecuteScript(

request, grpcOpts := executeQueryScriptRequest(a, q, settings)

op, err = executeScript(ctx, c.queryServiceClient, request, grpcOpts...)
op, err = executeScript(ctx, c.client, request, grpcOpts...)
if err != nil {
return op, xerrors.WithStackTrace(err)
}
Expand All @@ -200,18 +201,18 @@ func do(
opts ...retry.Option,
) (finalErr error) {
err := pool.With(ctx, func(ctx context.Context, s *Session) error {
s.setStatus(statusInUse)
s.SetStatus(session.StatusInUse)

err := op(ctx, s)
if err != nil {
if xerrors.IsOperationError(err) {
s.setStatus(statusClosed)
s.SetStatus(session.StatusClosed)
}

return xerrors.WithStackTrace(err)
}

s.setStatus(statusIdle)
s.SetStatus(session.StatusIdle)

return nil
}, opts...)
Expand Down Expand Up @@ -337,7 +338,7 @@ func (c *Client) QueryRow(ctx context.Context, q string, opts ...options.Execute
func clientExec(ctx context.Context, pool sessionPool, q string, opts ...options.Execute) (finalErr error) {
settings := options.ExecuteSettings(opts...)
err := do(ctx, pool, func(ctx context.Context, s *Session) (err error) {
_, r, err := execute(ctx, s.id, s.queryServiceClient, q, settings, withTrace(s.cfg.Trace()))
_, r, err := execute(ctx, s.ID(), s.client, q, settings, withTrace(s.trace))
if err != nil {
return xerrors.WithStackTrace(err)
}
Expand Down Expand Up @@ -381,8 +382,8 @@ func clientQuery(ctx context.Context, pool sessionPool, q string, opts ...option
) {
settings := options.ExecuteSettings(opts...)
err = do(ctx, pool, func(ctx context.Context, s *Session) (err error) {
_, streamResult, err := execute(ctx, s.id, s.queryServiceClient, q,
options.ExecuteSettings(opts...), withTrace(s.cfg.Trace()),
_, streamResult, err := execute(ctx, s.ID(), s.client, q,
options.ExecuteSettings(opts...), withTrace(s.trace),
)
if err != nil {
return xerrors.WithStackTrace(err)
Expand Down Expand Up @@ -433,7 +434,7 @@ func clientQueryResultSet(
ctx context.Context, pool sessionPool, q string, settings executeSettings, resultOpts ...resultOption,
) (rs result.ClosableResultSet, finalErr error) {
err := do(ctx, pool, func(ctx context.Context, s *Session) error {
_, r, err := execute(ctx, s.id, s.queryServiceClient, q, settings, resultOpts...)
_, r, err := execute(ctx, s.ID(), s.client, q, settings, resultOpts...)
if err != nil {
return xerrors.WithStackTrace(err)
}
Expand Down Expand Up @@ -512,18 +513,18 @@ func (c *Client) DoTx(ctx context.Context, op query.TxOperation, opts ...options
return nil
}

func New(ctx context.Context, balancer grpc.ClientConnInterface, cfg *config.Config) *Client {
func New(ctx context.Context, cc grpc.ClientConnInterface, cfg *config.Config) *Client {
onDone := trace.QueryOnNew(cfg.Trace(), &ctx,
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/query.New"),
)
defer onDone()

grpcClient := Ydb_Query_V1.NewQueryServiceClient(balancer)
client := Ydb_Query_V1.NewQueryServiceClient(cc)

client := &Client{
config: cfg,
queryServiceClient: grpcClient,
done: make(chan struct{}),
return &Client{
config: cfg,
client: client,
done: make(chan struct{}),
pool: pool.New(ctx,
pool.WithLimit[*Session, Session](cfg.PoolLimit()),
pool.WithTrace[*Session, Session](poolTrace(cfg.Trace())),
Expand All @@ -541,7 +542,11 @@ func New(ctx context.Context, balancer grpc.ClientConnInterface, cfg *config.Con
}
defer cancelCreate()

s, err := createSession(createCtx, grpcClient, cfg)
s, err := createSession(createCtx, client,
session.WithConn(cc),
session.WithDeleteTimeout(cfg.SessionDeleteTimeout()),
session.WithTrace(cfg.Trace()),
)
if err != nil {
return nil, xerrors.WithStackTrace(err)
}
Expand All @@ -550,8 +555,6 @@ func New(ctx context.Context, balancer grpc.ClientConnInterface, cfg *config.Con
}),
),
}

return client
}

func poolTrace(t *trace.Query) *pool.Trace {
Expand Down
Loading

0 comments on commit fb252b8

Please sign in to comment.