diff --git a/internal/storagenode/logstream/append.go b/internal/storagenode/logstream/append.go index 58d3df9ac..6ca5b0877 100644 --- a/internal/storagenode/logstream/append.go +++ b/internal/storagenode/logstream/append.go @@ -268,7 +268,7 @@ func (lse *Executor) prepareAppendContextInternal(dataBatch [][]byte, begin, end // replicate tasks st.rts = newReplicateTaskSlice() for i := 0; i < numBackups; i++ { - rt := newReplicateTask(batchletClassIdx) + rt := newReplicateTask(end - begin) rt.tpid = lse.tpid rt.lsid = lse.lsid rt.dataList = batchletData diff --git a/internal/storagenode/logstream/replicate_client_test.go b/internal/storagenode/logstream/replicate_client_test.go index f2827cdac..04f275900 100644 --- a/internal/storagenode/logstream/replicate_client_test.go +++ b/internal/storagenode/logstream/replicate_client_test.go @@ -123,7 +123,7 @@ func TestReplicateClientRPCError(t *testing.T) { rc.queue = make(chan *replicateTask, 1) rc.streamClient = mockStreamClient - rt := newReplicateTask(0) + rt := newReplicateTaskDeprecated(0) rt.tpid = lse.tpid rt.lsid = lse.lsid rt.llsnList = append(rt.llsnList, 1) @@ -156,7 +156,7 @@ func TestReplicateClientDrain(t *testing.T) { rc.queue = make(chan *replicateTask, numTasks) for i := 0; i < numTasks; i++ { - rt := newReplicateTask(0) + rt := newReplicateTaskDeprecated(0) err := rc.send(context.Background(), rt) assert.NoError(t, err) } @@ -206,7 +206,7 @@ func TestReplicateClient(t *testing.T) { assert.NoError(t, err) defer rc.stop() - rt := newReplicateTask(0) + rt := newReplicateTaskDeprecated(0) rt.tpid = lse.tpid rt.lsid = lse.lsid rt.llsnList = append(rt.llsnList, 1) diff --git a/internal/storagenode/logstream/replicate_task.go b/internal/storagenode/logstream/replicate_task.go index f828b2a39..c5fbe9c23 100644 --- a/internal/storagenode/logstream/replicate_task.go +++ b/internal/storagenode/logstream/replicate_task.go @@ -17,15 +17,19 @@ type replicateTask struct { poolIdx int } -// newReplicateTask returns a new replicateTask. +// newReplicateTaskDeprecated returns a new replicateTask. // The argument poolIdx should be the index of replicateTaskPools, which is returned from batchlet.SelectLengthClass. -func newReplicateTask(poolIdx int) *replicateTask { +// +// Deprecated: Use newReplicateTask. +func newReplicateTaskDeprecated(poolIdx int) *replicateTask { rt := replicateTaskPools[poolIdx].Get().(*replicateTask) return rt } -// release releases the task to the pool. -func (rt *replicateTask) release() { +// releaseDeprecated releases the task to the pool. +// +// Deprecated: Use release. +func (rt *replicateTask) releaseDeprecated() { rt.tpid = 0 rt.lsid = 0 rt.llsnList = rt.llsnList[0:0] @@ -33,6 +37,25 @@ func (rt *replicateTask) release() { replicateTaskPools[rt.poolIdx].Put(rt) } +// newReplicateTask returns a new replicateTask. The capacity of the returned +// replicateTask's llsnList is equal to or greater than the argument size, and +// its length is zero. +// Since (snpb.ReplicateRequest).LLSN is deprecated, (*replicateTask).llsnList +// will be deprecated soon. Until that, newReplicateTask simplifies the pool +// management of replicateTask. +func newReplicateTask(size int) *replicateTask { + return defaultReplicateTaskPool.get(size) +} + +// release relreases the task to the pool. +func (rt *replicateTask) release() { + rt.tpid = 0 + rt.lsid = 0 + rt.llsnList = rt.llsnList[0:0] + rt.dataList = nil + defaultReplicateTaskPool.put(rt) +} + // releaseReplicateTasks releases all tasks in the list to the pool. func releaseReplicateTasks(rts []*replicateTask) { for i := range rts { @@ -40,42 +63,68 @@ func releaseReplicateTasks(rts []*replicateTask) { } } -var ( - replicateTaskPools = [...]sync.Pool{ - { - New: func() interface{} { - return &replicateTask{ - llsnList: make([]types.LLSN, 0, batchlet.LengthClasses[0]), - poolIdx: 0, - } - }, +// replicateTaskPool is a simple pool for replicateTask. +type replicateTaskPool struct { + pool sync.Pool +} + +var defaultReplicateTaskPool replicateTaskPool + +func (p *replicateTaskPool) get(size int) *replicateTask { + rt, ok := p.pool.Get().(*replicateTask) + if ok && cap(rt.llsnList) >= size { + rt.llsnList = rt.llsnList[0:0] + return rt + } + if ok { + p.pool.Put(rt) + } + return &replicateTask{ + llsnList: make([]types.LLSN, 0, size), + poolIdx: 0, + } +} + +func (p *replicateTaskPool) put(rt *replicateTask) { + p.pool.Put(rt) +} + +// replicateTaskPools is a set of pools for replicateTask. +// Deprecated: Use defaultReplicateTaskPool. +var replicateTaskPools = [...]sync.Pool{ + { + New: func() interface{} { + return &replicateTask{ + llsnList: make([]types.LLSN, 0, batchlet.LengthClasses[0]), + poolIdx: 0, + } }, - { - New: func() interface{} { - return &replicateTask{ - llsnList: make([]types.LLSN, 0, batchlet.LengthClasses[1]), - poolIdx: 1, - } - }, + }, + { + New: func() interface{} { + return &replicateTask{ + llsnList: make([]types.LLSN, 0, batchlet.LengthClasses[1]), + poolIdx: 1, + } }, - { - New: func() interface{} { - return &replicateTask{ - llsnList: make([]types.LLSN, 0, batchlet.LengthClasses[2]), - poolIdx: 2, - } - }, + }, + { + New: func() interface{} { + return &replicateTask{ + llsnList: make([]types.LLSN, 0, batchlet.LengthClasses[2]), + poolIdx: 2, + } }, - { - New: func() interface{} { - return &replicateTask{ - llsnList: make([]types.LLSN, 0, batchlet.LengthClasses[3]), - poolIdx: 3, - } - }, + }, + { + New: func() interface{} { + return &replicateTask{ + llsnList: make([]types.LLSN, 0, batchlet.LengthClasses[3]), + poolIdx: 3, + } }, - } -) + }, +} const defaultLengthOfReplicationTaskSlice = 3 diff --git a/internal/storagenode/logstream/replicate_task_test.go b/internal/storagenode/logstream/replicate_task_test.go index 7fbcfec9c..0e5228d10 100644 --- a/internal/storagenode/logstream/replicate_task_test.go +++ b/internal/storagenode/logstream/replicate_task_test.go @@ -9,9 +9,20 @@ import ( ) func TestReplicateTaskPools(t *testing.T) { - for poolIdx, batchletLen := range batchlet.LengthClasses { - rt := newReplicateTask(poolIdx) - assert.Empty(t, rt.llsnList) - assert.Equal(t, batchletLen, cap(rt.llsnList)) + const repeatCount = 1000 + + for range repeatCount { + for poolIdx, batchletLen := range batchlet.LengthClasses { + rt1 := newReplicateTaskDeprecated(poolIdx) + assert.Empty(t, rt1.llsnList) + assert.Equal(t, batchletLen, cap(rt1.llsnList)) + + rt2 := newReplicateTask(batchletLen) + assert.Empty(t, rt2.llsnList) + assert.GreaterOrEqual(t, cap(rt2.llsnList), batchletLen) + + rt1.releaseDeprecated() + rt2.release() + } } }