Skip to content

Commit

Permalink
Merge pull request #1408 from ydb-platform/fix-session-pool
Browse files Browse the repository at this point in the history
Do not create new items if index is full
  • Loading branch information
asmyasnikov authored Aug 14, 2024
2 parents b6be3a9 + 8831525 commit 4c5f351
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 36 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
* Fixed out of index item creation in `internal/pool.Pool`
* Fixed tracing of `(*grpcClientStream).finish` event

## v3.76.4
Expand Down
75 changes: 39 additions & 36 deletions internal/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,48 +294,51 @@ func (p *Pool[PT, T]) getItem(ctx context.Context) (_ PT, finalErr error) {
return nil, xerrors.WithStackTrace(err)
}

select {
case <-p.done:
return nil, xerrors.WithStackTrace(errClosedPool)
case <-ctx.Done():
return nil, xerrors.WithStackTrace(ctx.Err())
default:
var item PT
p.mu.WithLock(func() {
if len(p.idle) > 0 {
item, p.idle = p.idle[0], p.idle[1:]
p.stats.Idle().Dec()
}
})
for {
select {
case <-p.done:
return nil, xerrors.WithStackTrace(errClosedPool)
case <-ctx.Done():
return nil, xerrors.WithStackTrace(ctx.Err())
default:
var item PT
p.mu.WithLock(func() {
if len(p.idle) > 0 {
item, p.idle = p.idle[0], p.idle[1:]
p.stats.Idle().Dec()
}
})

if item != nil {
if item.IsAlive() {
return item, nil
if item != nil {
if item.IsAlive() {
return item, nil
}
_ = p.closeItem(ctx, item)
p.mu.WithLock(func() {
delete(p.index, item)
})
p.stats.Index().Dec()
}
_ = p.closeItem(ctx, item)
var err error
var newItem PT
p.mu.WithLock(func() {
delete(p.index, item)
if len(p.index) >= p.limit {
return
}
newItem, err = p.createItem(ctx)
if err != nil {
return
}
p.index[newItem] = struct{}{}
p.stats.Index().Inc()
})
p.stats.Index().Dec()
}

item, err := p.createItem(ctx)
if err != nil {
return nil, xerrors.WithStackTrace(err)
}

addedToIndex := false
p.mu.WithLock(func() {
if len(p.index) < p.limit {
p.index[item] = struct{}{}
addedToIndex = true
if err != nil {
return nil, xerrors.WithStackTrace(err)
}
if newItem != nil {
return newItem, nil
}
})
if addedToIndex {
p.stats.Index().Inc()
}

return item, nil
}
}

Expand Down
26 changes: 26 additions & 0 deletions internal/pool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,32 @@ func TestPool(t *testing.T) {
wg.Wait()
}, xtest.StopAfter(42*time.Second))
})
t.Run("ParallelCreation", func(t *testing.T) {
xtest.TestManyTimes(t, func(t testing.TB) {
p := New[*testItem, testItem](rootCtx)
var wg sync.WaitGroup
for range make([]struct{}, DefaultLimit*10) {
wg.Add(1)
go func() {
defer wg.Done()
err := p.With(rootCtx, func(ctx context.Context, testItem *testItem) error {
return nil
})
if err != nil && !xerrors.Is(err, errClosedPool, context.Canceled) {
t.Failed()
}
stats := p.Stats()
require.LessOrEqual(t, stats.Idle+stats.InUse, DefaultLimit)
}()
}

wg.Wait()
stats := p.Stats()
require.Equal(t, DefaultLimit, stats.Limit)
require.Equal(t, 0, stats.InUse)
require.LessOrEqual(t, stats.Idle, DefaultLimit)
}, xtest.StopAfter(30*time.Second))
})
}

func TestSafeStatsRace(t *testing.T) {
Expand Down

0 comments on commit 4c5f351

Please sign in to comment.