Skip to content

Commit

Permalink
Merge pull request #374 from ydb-platform/close-session-by-nodeid
Browse files Browse the repository at this point in the history
marked truncated result as retryable error + added closing sessions i…
  • Loading branch information
asmyasnikov authored Sep 7, 2022
2 parents e0268b8 + 9926384 commit 510ab4b
Show file tree
Hide file tree
Showing 15 changed files with 147 additions and 107 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
* Marked the truncated result as retryable error
* Added closing sessions if node removed from discovery results
* Moved session status type from `table/options` package to `table`
* Changed session status source type from `uint32` to `string` alias

## v3.37.6
* Added to balancer notifying mechanism for listening in table client event about removing some nodes and closing sessions on them
* Removed from public client interfaces `closer.Closer` (for exclude undefined behaviour on client-side)
Expand Down
2 changes: 1 addition & 1 deletion internal/balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type Balancer struct {
onDiscovery []func(ctx context.Context, endpoints []endpoint.Info)
}

func (b *Balancer) OnDiscovery(onDiscovery func(ctx context.Context, endpoints []endpoint.Info)) {
func (b *Balancer) OnUpdate(onDiscovery func(ctx context.Context, endpoints []endpoint.Info)) {
b.mu.WithLock(func() {
b.onDiscovery = append(b.onDiscovery, onDiscovery)
})
Expand Down
2 changes: 1 addition & 1 deletion internal/endpoint/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func withLastUpdated(ts time.Time) Option {
}
}

func New(address string, opts ...Option) Endpoint {
func New(address string, opts ...Option) *endpoint {
e := &endpoint{
address: address,
lastUpdated: time.Now(),
Expand Down
139 changes: 87 additions & 52 deletions internal/table/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xsync"
"github.com/ydb-platform/ydb-go-sdk/v3/retry"
"github.com/ydb-platform/ydb-go-sdk/v3/table"
"github.com/ydb-platform/ydb-go-sdk/v3/table/options"
"github.com/ydb-platform/ydb-go-sdk/v3/testutil/timeutil"
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
)
Expand All @@ -32,7 +31,7 @@ type sessionBuilder func(ctx context.Context, opts ...sessionBuilderOption) (*se
type balancerNotifier interface {
grpc.ClientConnInterface

OnDiscovery(onDiscovery func(ctx context.Context, endpoints []endpoint.Info))
OnUpdate(onDiscovery func(ctx context.Context, endpoints []endpoint.Info))
}

func New(balancer balancerNotifier, config config.Config) *Client {
Expand All @@ -55,6 +54,7 @@ func newClient(
cc: balancer,
build: builder,
index: make(map[*session]sessionInfo),
nodes: make(map[uint32]map[*session]struct{}),
idle: list.New(),
waitq: list.New(),
limit: config.SizeLimit(),
Expand All @@ -67,7 +67,7 @@ func newClient(
done: make(chan struct{}),
}
if balancer != nil {
balancer.OnDiscovery(c.onDiscovery)
balancer.OnUpdate(c.updateNodes)
}
if idleThreshold := config.IdleThreshold(); idleThreshold > 0 {
c.spawnedGoroutines.Add(1)
Expand All @@ -88,6 +88,7 @@ type Client struct {
// read-write fields
mu xsync.Mutex
index map[*session]sessionInfo
nodes map[uint32]map[*session]struct{}
createInProgress int // KIKIMR-9163: in-create-process counter
limit int // Upper bound for Client size.
idle *list.List // list<*session>
Expand Down Expand Up @@ -118,7 +119,7 @@ func withCreateSessionOnClose(onClose func(s *session)) createSessionOption {
}
}

func (c *Client) onDiscovery(ctx context.Context, endpoints []endpoint.Info) {
func (c *Client) updateNodes(ctx context.Context, endpoints []endpoint.Info) {
nodeIDs := make([]uint32, len(endpoints))
for i, e := range endpoints {
nodeIDs[i] = e.NodeID()
Expand All @@ -127,26 +128,17 @@ func (c *Client) onDiscovery(ctx context.Context, endpoints []endpoint.Info) {
return nodeIDs[i] < nodeIDs[j]
})
c.mu.WithLock(func() {
touched := make(map[*session]struct{}, len(c.index))
for e := c.idle.Front(); e != nil; e = e.Next() {
s := e.Value.(*session)
nodeID := s.NodeID()
for nodeID := range c.nodes {
if sort.Search(len(nodeIDs), func(i int) bool {
return nodeIDs[i] >= nodeID
}) == len(nodeIDs) {
c.internalPoolAsyncCloseSession(ctx, s)
}
touched[s] = struct{}{}
}
for s := range c.index {
if _, has := touched[s]; has {
continue
}
nodeID := s.NodeID()
if sort.Search(len(nodeIDs), func(i int) bool {
return nodeIDs[i] >= nodeID
}) == len(nodeIDs) {
s.SetStatus(options.SessionClosing)
for s := range c.nodes[nodeID] {
if info, has := c.index[s]; has && info.idle != nil {
c.internalPoolAsyncCloseSession(ctx, s)
} else {
s.SetStatus(table.SessionClosing)
}
}
}
}
})
Expand Down Expand Up @@ -256,13 +248,51 @@ func (c *Client) createSession(ctx context.Context, opts ...createSessionOption)
}
}

func (c *Client) appendSessionToNodes(s *session) {
c.mu.WithLock(func() {
nodeID := s.NodeID()
sessions, has := c.nodes[nodeID]
if !has {
sessions = make(map[*session]struct{})
}
sessions[s] = struct{}{}
c.nodes[nodeID] = sessions
})
}

func (c *Client) removeSessionFromNodes(s *session) {
c.mu.WithLock(func() {
nodeID := s.NodeID()
sessions, has := c.nodes[nodeID]
if !has {
sessions = make(map[*session]struct{})
}
delete(sessions, s)
if len(sessions) == 0 {
delete(c.nodes, nodeID)
} else {
c.nodes[nodeID] = sessions
}
})
}

func (c *Client) CreateSession(ctx context.Context, opts ...table.Option) (_ table.ClosableSession, err error) {
if c == nil {
return nil, xerrors.WithStackTrace(errNilClient)
}
var s *session
createSession := func(ctx context.Context) (*session, error) {
s, err = c.createSession(ctx,
withCreateSessionOnCreate(c.appendSessionToNodes),
withCreateSessionOnClose(c.removeSessionFromNodes),
)
if err != nil {
return nil, xerrors.WithStackTrace(err)
}
return s, nil
}
if !c.config.AutoRetry() {
s, err = c.createSession(ctx)
s, err = createSession(ctx)
if err != nil {
return nil, xerrors.WithStackTrace(err)
}
Expand All @@ -272,7 +302,7 @@ func (c *Client) CreateSession(ctx context.Context, opts ...table.Option) (_ tab
err = retry.Retry(
ctx,
func(ctx context.Context) (err error) {
s, err = c.createSession(ctx)
s, err = createSession(ctx)
if err != nil {
return xerrors.WithStackTrace(err)
}
Expand Down Expand Up @@ -328,40 +358,45 @@ func (c *Client) internalPoolCreateSession(ctx context.Context) (s *session, err
})
}()

s, err = c.createSession(meta.WithAllowFeatures(ctx,
meta.HintSessionBalancer,
), withCreateSessionOnCreate(func(s *session) {
c.mu.WithLock(func() {
c.index[s] = sessionInfo{
touched: timeutil.Now(),
}
trace.TableOnPoolSessionAdd(c.config.Trace(), s)
trace.TableOnPoolStateChange(c.config.Trace(), len(c.index), "append")
})
}), withCreateSessionOnClose(func(s *session) {
c.mu.WithLock(func() {
info, has := c.index[s]
if !has {
panic("session not found in pool")
}
s, err = c.createSession(
meta.WithAllowFeatures(ctx,
meta.HintSessionBalancer,
),
withCreateSessionOnCreate(c.appendSessionToNodes),
withCreateSessionOnClose(c.removeSessionFromNodes),
withCreateSessionOnCreate(func(s *session) {
c.mu.WithLock(func() {
c.index[s] = sessionInfo{
touched: timeutil.Now(),
}
trace.TableOnPoolSessionAdd(c.config.Trace(), s)
trace.TableOnPoolStateChange(c.config.Trace(), len(c.index), "append")
})
}), withCreateSessionOnClose(func(s *session) {
c.mu.WithLock(func() {
info, has := c.index[s]
if !has {
panic("session not found in pool")
}

delete(c.index, s)
delete(c.index, s)

trace.TableOnPoolSessionRemove(c.config.Trace(), s)
trace.TableOnPoolStateChange(c.config.Trace(), len(c.index), "remove")
trace.TableOnPoolSessionRemove(c.config.Trace(), s)
trace.TableOnPoolStateChange(c.config.Trace(), len(c.index), "remove")

if !c.isClosed() {
c.internalPoolNotify(nil)
}
if !c.isClosed() {
c.internalPoolNotify(nil)
}

if info.idle != nil {
c.idle.Remove(info.idle)
}
})
}))
if info.idle != nil {
c.idle.Remove(info.idle)
}
})
}))
if err != nil {
return nil, xerrors.WithStackTrace(err)
}

return s, nil
}

Expand Down Expand Up @@ -596,7 +631,7 @@ func (c *Client) Close(ctx context.Context) (err error) {
for e := c.idle.Front(); e != nil; e = e.Next() {
wg.Add(1)
s := e.Value.(*session)
s.SetStatus(options.SessionClosing)
s.SetStatus(table.SessionClosing)
go func() {
defer wg.Done()
c.internalPoolSyncCloseSession(ctx, s)
Expand Down Expand Up @@ -770,7 +805,7 @@ func (c *Client) internalPoolNotify(s *session) (notified bool) {
}

func (c *Client) internalPoolAsyncCloseSession(ctx context.Context, s *session) {
s.SetStatus(options.SessionClosing)
s.SetStatus(table.SessionClosing)
c.spawnedGoroutines.Add(1)
go func() {
defer c.spawnedGoroutines.Done()
Expand Down
3 changes: 1 addition & 2 deletions internal/table/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xrand"
"github.com/ydb-platform/ydb-go-sdk/v3/retry"
"github.com/ydb-platform/ydb-go-sdk/v3/table"
"github.com/ydb-platform/ydb-go-sdk/v3/table/options"
"github.com/ydb-platform/ydb-go-sdk/v3/testutil"
)

Expand Down Expand Up @@ -154,7 +153,7 @@ func TestRetryerSessionClosing(t *testing.T) {
config.New(),
func(ctx context.Context, s table.Session) error {
sessions = append(sessions, s)
s.(*session).SetStatus(options.SessionClosing)
s.(*session).SetStatus(table.SessionClosing)
return nil
},
table.Options{},
Expand Down
2 changes: 1 addition & 1 deletion internal/table/scanner/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"github.com/ydb-platform/ydb-go-sdk/v3/table/types"
)

var errTruncated = xerrors.Wrap(errors.New("truncated result"))
var errTruncated = xerrors.Retryable(errors.New("truncated result"))

type scanner struct {
set *Ydb.ResultSet
Expand Down
32 changes: 19 additions & 13 deletions internal/table/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ type session struct {
tableService Ydb_Table_V1.TableServiceClient
config config.Config

status options.SessionStatus
nodeID uint32
status table.SessionStatus
statusMtx sync.RWMutex
nodeID uint32

onClose []func(s *session)
closeOnce sync.Once
Expand All @@ -72,23 +73,27 @@ func (s *session) NodeID() uint32 {
return uint32(nodeID)
}

func (s *session) Status() string {
func (s *session) Status() table.SessionStatus {
if s == nil {
return ""
return table.SessionStatusUnknown
}
return options.SessionStatus(atomic.LoadUint32((*uint32)(&s.status))).String()
s.statusMtx.RLock()
defer s.statusMtx.RUnlock()
return s.status
}

func (s *session) SetStatus(status options.SessionStatus) {
atomic.StoreUint32((*uint32)(&s.status), uint32(status))
func (s *session) SetStatus(status table.SessionStatus) {
s.statusMtx.Lock()
defer s.statusMtx.Unlock()
s.status = status
}

func (s *session) isClosed() bool {
return options.SessionStatus(atomic.LoadUint32((*uint32)(&s.status))) == options.SessionClosed
return s.Status() == table.SessionClosed
}

func (s *session) isClosing() bool {
return options.SessionStatus(atomic.LoadUint32((*uint32)(&s.status))) == options.SessionClosing
return s.Status() == table.SessionClosing
}

func newSession(ctx context.Context, cc grpc.ClientConnInterface, config config.Config, opts ...sessionBuilderOption) (
Expand Down Expand Up @@ -129,6 +134,7 @@ func newSession(ctx context.Context, cc grpc.ClientConnInterface, config config.
id: result.GetSessionId(),
tableService: c,
config: config,
status: table.SessionReady,
}

for _, o := range opts {
Expand Down Expand Up @@ -159,7 +165,7 @@ func (s *session) Close(ctx context.Context) (err error) {

s.closeOnce.Do(func() {
defer func() {
s.SetStatus(options.SessionClosed)
s.SetStatus(table.SessionClosed)
}()

onDone := trace.TableOnSessionDelete(s.config.Trace(), &ctx, s)
Expand Down Expand Up @@ -231,9 +237,9 @@ func (s *session) KeepAlive(ctx context.Context) (err error) {
}
switch result.SessionStatus {
case Ydb_Table.KeepAliveResult_SESSION_STATUS_READY:
s.SetStatus(options.SessionReady)
s.SetStatus(table.SessionReady)
case Ydb_Table.KeepAliveResult_SESSION_STATUS_BUSY:
s.SetStatus(options.SessionBusy)
s.SetStatus(table.SessionBusy)
}
return nil
}
Expand Down Expand Up @@ -446,7 +452,7 @@ func (s *session) checkError(err error) {
return
}
if m := retry.Check(err); m.MustDeleteSession() {
s.SetStatus(options.SessionClosing)
s.SetStatus(table.SessionClosing)
}
}

Expand Down
8 changes: 4 additions & 4 deletions internal/table/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,17 +65,17 @@ func TestSessionKeepAlive(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if s.Status() != options.SessionReady.String() {
t.Fatalf("Result %v differ from, expectd %v", s.Status(), options.SessionReady.String())
if s.Status() != table.SessionReady {
t.Fatalf("Result %v differ from, expectd %v", s.Status(), table.SessionReady)
}

status, e = Ydb_Table.KeepAliveResult_SESSION_STATUS_BUSY, nil
err = s.KeepAlive(ctx)
if err != nil {
t.Fatal(err)
}
if s.Status() != options.SessionBusy.String() {
t.Fatalf("Result %v differ from, expectd %v", s.Status(), options.SessionBusy.String())
if s.Status() != table.SessionBusy {
t.Fatalf("Result %v differ from, expectd %v", s.Status(), table.SessionBusy)
}
}

Expand Down
Loading

0 comments on commit 510ab4b

Please sign in to comment.