Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

extracted query session controller logic into separated package #1457

Merged
merged 1 commit into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 23 additions & 20 deletions internal/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@ import (
)

type (
Item[T any] interface {
*T
Item interface {
IsAlive() bool
Close(ctx context.Context) error
}
Config[PT Item[T], T any] struct {
ItemConstraint[T any] interface {
*T
Item
}
Config[PT ItemConstraint[T], T any] struct {
trace *Trace
clock clockwork.Clock
limit int
Expand All @@ -32,15 +35,15 @@ type (
closeItem func(ctx context.Context, item PT)
idleThreshold time.Duration
}
itemInfo[PT Item[T], T any] struct {
itemInfo[PT ItemConstraint[T], T any] struct {
idle *xlist.Element[PT]
touched time.Time
}
waitChPool[PT Item[T], T any] interface {
waitChPool[PT ItemConstraint[T], T any] interface {
GetOrNew() *chan PT
Put(t *chan PT)
}
Pool[PT Item[T], T any] struct {
Pool[PT ItemConstraint[T], T any] struct {
config Config[PT, T]

createItem func(ctx context.Context) (PT, error)
Expand All @@ -55,62 +58,62 @@ type (

done chan struct{}
}
option[PT Item[T], T any] func(c *Config[PT, T])
Option[PT ItemConstraint[T], T any] func(c *Config[PT, T])
)

func WithCreateItemFunc[PT Item[T], T any](f func(ctx context.Context) (PT, error)) option[PT, T] {
func WithCreateItemFunc[PT ItemConstraint[T], T any](f func(ctx context.Context) (PT, error)) Option[PT, T] {
return func(c *Config[PT, T]) {
c.createItem = f
}
}

func WithSyncCloseItem[PT Item[T], T any]() option[PT, T] {
func WithSyncCloseItem[PT ItemConstraint[T], T any]() Option[PT, T] {
return func(c *Config[PT, T]) {
c.closeItem = func(ctx context.Context, item PT) {
_ = item.Close(ctx)
}
}
}

func WithCreateItemTimeout[PT Item[T], T any](t time.Duration) option[PT, T] {
func WithCreateItemTimeout[PT ItemConstraint[T], T any](t time.Duration) Option[PT, T] {
return func(c *Config[PT, T]) {
c.createTimeout = t
}
}

func WithCloseItemTimeout[PT Item[T], T any](t time.Duration) option[PT, T] {
func WithCloseItemTimeout[PT ItemConstraint[T], T any](t time.Duration) Option[PT, T] {
return func(c *Config[PT, T]) {
c.closeTimeout = t
}
}

func WithLimit[PT Item[T], T any](size int) option[PT, T] {
func WithLimit[PT ItemConstraint[T], T any](size int) Option[PT, T] {
return func(c *Config[PT, T]) {
c.limit = size
}
}

func WithTrace[PT Item[T], T any](t *Trace) option[PT, T] {
func WithTrace[PT ItemConstraint[T], T any](t *Trace) Option[PT, T] {
return func(c *Config[PT, T]) {
c.trace = t
}
}

func WithIdleThreshold[PT Item[T], T any](idleThreshold time.Duration) option[PT, T] {
func WithIdleThreshold[PT ItemConstraint[T], T any](idleThreshold time.Duration) Option[PT, T] {
return func(c *Config[PT, T]) {
c.idleThreshold = idleThreshold
}
}

func WithClock[PT Item[T], T any](clock clockwork.Clock) option[PT, T] {
func WithClock[PT ItemConstraint[T], T any](clock clockwork.Clock) Option[PT, T] {
return func(c *Config[PT, T]) {
c.clock = clock
}
}

func New[PT Item[T], T any](
func New[PT ItemConstraint[T], T any](
ctx context.Context,
opts ...option[PT, T],
opts ...Option[PT, T],
) *Pool[PT, T] {
p := &Pool[PT, T]{
config: Config[PT, T]{
Expand Down Expand Up @@ -162,14 +165,14 @@ func New[PT Item[T], T any](
}

// defaultCreateItem returns a new item
func defaultCreateItem[T any, PT Item[T]](context.Context) (PT, error) {
func defaultCreateItem[T any, PT ItemConstraint[T]](context.Context) (PT, error) {
var item T

return &item, nil
}

// makeAsyncCreateItemFunc wraps the createItem function with timeout handling
func makeAsyncCreateItemFunc[PT Item[T], T any]( //nolint:funlen
func makeAsyncCreateItemFunc[PT ItemConstraint[T], T any]( //nolint:funlen
p *Pool[PT, T],
) func(ctx context.Context) (PT, error) {
return func(ctx context.Context) (PT, error) {
Expand Down Expand Up @@ -277,7 +280,7 @@ func (p *Pool[PT, T]) Stats() Stats {
return p.stats()
}

func makeAsyncCloseItemFunc[PT Item[T], T any](
func makeAsyncCloseItemFunc[PT ItemConstraint[T], T any](
p *Pool[PT, T],
) func(ctx context.Context, item PT) {
return func(ctx context.Context, item PT) {
Expand Down
4 changes: 2 additions & 2 deletions internal/pool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func caller() string {
return fmt.Sprintf("%s:%d", path.Base(file), line)
}

func mustGetItem[PT Item[T], T any](t testing.TB, p *Pool[PT, T]) PT {
func mustGetItem[PT ItemConstraint[T], T any](t testing.TB, p *Pool[PT, T]) PT {
s, err := p.getItem(context.Background())
if err != nil {
t.Helper()
Expand All @@ -145,7 +145,7 @@ func mustGetItem[PT Item[T], T any](t testing.TB, p *Pool[PT, T]) PT {
return s
}

func mustPutItem[PT Item[T], T any](t testing.TB, p *Pool[PT, T], item PT) {
func mustPutItem[PT ItemConstraint[T], T any](t testing.TB, p *Pool[PT, T], item PT) {
if err := p.putItem(context.Background(), item); err != nil {
t.Helper()
t.Fatalf("%s: %v", caller(), err)
Expand Down
45 changes: 24 additions & 21 deletions internal/query/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/config"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/options"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/result"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/session"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/tx"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/stack"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/types"
Expand All @@ -41,9 +42,9 @@ type (
With(ctx context.Context, f func(ctx context.Context, s *Session) error, opts ...retry.Option) error
}
Client struct {
config *config.Config
queryServiceClient Ydb_Query_V1.QueryServiceClient
pool sessionPool
config *config.Config
client Ydb_Query_V1.QueryServiceClient
pool sessionPool

done chan struct{}
}
Expand Down Expand Up @@ -100,7 +101,7 @@ func (c *Client) FetchScriptResults(ctx context.Context,
opID string, opts ...options.FetchScriptOption,
) (*options.FetchScriptResult, error) {
r, err := retry.RetryWithResult(ctx, func(ctx context.Context) (*options.FetchScriptResult, error) {
r, err := fetchScriptResults(ctx, c.queryServiceClient, opID,
r, err := fetchScriptResults(ctx, c.client, opID,
append(opts, func(request *options.FetchScriptResultsRequest) {
request.Trace = c.config.Trace()
})...,
Expand Down Expand Up @@ -175,7 +176,7 @@ func (c *Client) ExecuteScript(

request, grpcOpts := executeQueryScriptRequest(a, q, settings)

op, err = executeScript(ctx, c.queryServiceClient, request, grpcOpts...)
op, err = executeScript(ctx, c.client, request, grpcOpts...)
if err != nil {
return op, xerrors.WithStackTrace(err)
}
Expand All @@ -200,18 +201,18 @@ func do(
opts ...retry.Option,
) (finalErr error) {
err := pool.With(ctx, func(ctx context.Context, s *Session) error {
s.setStatus(statusInUse)
s.SetStatus(session.StatusInUse)

err := op(ctx, s)
if err != nil {
if xerrors.IsOperationError(err) {
s.setStatus(statusClosed)
s.SetStatus(session.StatusClosed)
}

return xerrors.WithStackTrace(err)
}

s.setStatus(statusIdle)
s.SetStatus(session.StatusIdle)

return nil
}, opts...)
Expand Down Expand Up @@ -337,7 +338,7 @@ func (c *Client) QueryRow(ctx context.Context, q string, opts ...options.Execute
func clientExec(ctx context.Context, pool sessionPool, q string, opts ...options.Execute) (finalErr error) {
settings := options.ExecuteSettings(opts...)
err := do(ctx, pool, func(ctx context.Context, s *Session) (err error) {
_, r, err := execute(ctx, s.id, s.queryServiceClient, q, settings, withTrace(s.cfg.Trace()))
_, r, err := execute(ctx, s.ID(), s.client, q, settings, withTrace(s.trace))
if err != nil {
return xerrors.WithStackTrace(err)
}
Expand Down Expand Up @@ -381,8 +382,8 @@ func clientQuery(ctx context.Context, pool sessionPool, q string, opts ...option
) {
settings := options.ExecuteSettings(opts...)
err = do(ctx, pool, func(ctx context.Context, s *Session) (err error) {
_, streamResult, err := execute(ctx, s.id, s.queryServiceClient, q,
options.ExecuteSettings(opts...), withTrace(s.cfg.Trace()),
_, streamResult, err := execute(ctx, s.ID(), s.client, q,
options.ExecuteSettings(opts...), withTrace(s.trace),
)
if err != nil {
return xerrors.WithStackTrace(err)
Expand Down Expand Up @@ -433,7 +434,7 @@ func clientQueryResultSet(
ctx context.Context, pool sessionPool, q string, settings executeSettings, resultOpts ...resultOption,
) (rs result.ClosableResultSet, finalErr error) {
err := do(ctx, pool, func(ctx context.Context, s *Session) error {
_, r, err := execute(ctx, s.id, s.queryServiceClient, q, settings, resultOpts...)
_, r, err := execute(ctx, s.ID(), s.client, q, settings, resultOpts...)
if err != nil {
return xerrors.WithStackTrace(err)
}
Expand Down Expand Up @@ -512,18 +513,18 @@ func (c *Client) DoTx(ctx context.Context, op query.TxOperation, opts ...options
return nil
}

func New(ctx context.Context, balancer grpc.ClientConnInterface, cfg *config.Config) *Client {
func New(ctx context.Context, cc grpc.ClientConnInterface, cfg *config.Config) *Client {
onDone := trace.QueryOnNew(cfg.Trace(), &ctx,
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/query.New"),
)
defer onDone()

grpcClient := Ydb_Query_V1.NewQueryServiceClient(balancer)
client := Ydb_Query_V1.NewQueryServiceClient(cc)

client := &Client{
config: cfg,
queryServiceClient: grpcClient,
done: make(chan struct{}),
return &Client{
config: cfg,
client: client,
done: make(chan struct{}),
pool: pool.New(ctx,
pool.WithLimit[*Session, Session](cfg.PoolLimit()),
pool.WithTrace[*Session, Session](poolTrace(cfg.Trace())),
Expand All @@ -541,7 +542,11 @@ func New(ctx context.Context, balancer grpc.ClientConnInterface, cfg *config.Con
}
defer cancelCreate()

s, err := createSession(createCtx, grpcClient, cfg)
s, err := createSession(createCtx, client,
session.WithConn(cc),
session.WithDeleteTimeout(cfg.SessionDeleteTimeout()),
session.WithTrace(cfg.Trace()),
)
if err != nil {
return nil, xerrors.WithStackTrace(err)
}
Expand All @@ -550,8 +555,6 @@ func New(ctx context.Context, balancer grpc.ClientConnInterface, cfg *config.Con
}),
),
}

return client
}

func poolTrace(t *trace.Query) *pool.Trace {
Expand Down
Loading
Loading