Skip to content

Commit

Permalink
refactor(storagenode): remove unused lines
Browse files Browse the repository at this point in the history
Removed dead code that was commented out. This improves code readability and
maintainability by eliminating dead code.
  • Loading branch information
ijsong committed Feb 3, 2025
1 parent 647cba4 commit a5563c6
Show file tree
Hide file tree
Showing 6 changed files with 3 additions and 200 deletions.
106 changes: 0 additions & 106 deletions internal/storage/write_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,40 +36,11 @@ func (wb *WriteBatch) release() {
writeBatchPool.Put(wb)
}

/*
// Deferred returns a new DeferredWriteBatch that can be used to defer assigning LLSNs.
func (wb *WriteBatch) Deferred(poolIdx int) *DeferredWriteBatch {
if wb.count != 0 {
panic("storage: non empty write batch could not be deferred")
}
dwb := newDeferredWriteBatch(wb, poolIdx)
return dwb
}
*/

// Put writes the given LLSN and data to the batch.
//func (wb *WriteBatch) Put(llsn types.LLSN, data []byte) error {
// dk := encodeDataKeyInternal(llsn, wb.dk)
// if err := wb.batch.Set(dk, data, nil); err != nil {
// return err
// }
// return nil
//}

// Set writes the given LLSN and data to the batch.
func (wb *WriteBatch) Set(llsn types.LLSN, data []byte) error {
return wb.batch.Set(encodeDataKeyInternal(llsn, wb.dk), data, nil)
}

// SetDeferred writes the given LLSN and data to the batch.
//func (wb *WriteBatch) SetDeferred(llsn types.LLSN, data []byte) error {
// op := wb.batch.SetDeferred(dataKeyLength, len(data))
// op.Key = encodeDataKeyInternal(llsn, op.Key)
// copy(op.Value, data)
// op.Finish()
// return nil
//}

// Apply applies the batch to the underlying storage.
func (wb *WriteBatch) Apply() error {
return wb.batch.Commit(wb.writeOpts)
Expand All @@ -81,80 +52,3 @@ func (wb *WriteBatch) Close() error {
wb.release()
return err
}

/*
var deferredWriteBatchPools = [...]sync.Pool{
{
New: func() interface{} {
return &DeferredWriteBatch{
deferredOps: make([]*pebble.DeferredBatchOp, 0, batchlet.LengthClasses[0]),
poolIdx: 0,
}
},
},
{
New: func() interface{} {
return &DeferredWriteBatch{
deferredOps: make([]*pebble.DeferredBatchOp, 0, batchlet.LengthClasses[1]),
poolIdx: 1,
}
},
},
{
New: func() interface{} {
return &DeferredWriteBatch{
deferredOps: make([]*pebble.DeferredBatchOp, 0, batchlet.LengthClasses[2]),
poolIdx: 2,
}
},
},
{
New: func() interface{} {
return &DeferredWriteBatch{
deferredOps: make([]*pebble.DeferredBatchOp, 0, batchlet.LengthClasses[3]),
poolIdx: 3,
}
},
},
}
type DeferredWriteBatch struct {
wb *WriteBatch
deferredOps []*pebble.DeferredBatchOp
poolIdx int
}
func newDeferredWriteBatch(wb *WriteBatch, poolIdx int) *DeferredWriteBatch {
dwb := deferredWriteBatchPools[poolIdx].Get().(*DeferredWriteBatch)
dwb.wb = wb
return dwb
}
func (dwb *DeferredWriteBatch) release() {
dwb.wb = nil
dwb.deferredOps = dwb.deferredOps[0:0]
deferredWriteBatchPools[dwb.poolIdx].Put(dwb)
}
func (dwb *DeferredWriteBatch) PutData(data []byte) {
deferredOp := dwb.wb.batch.SetDeferred(dataKeyLength, len(data))
copy(deferredOp.Value, data)
dwb.deferredOps = append(dwb.deferredOps, deferredOp)
}
func (dwb *DeferredWriteBatch) SetLLSN(idx int, llsn types.LLSN) {
dwb.deferredOps[idx].Key = encodeDataKeyInternal(llsn, dwb.deferredOps[idx].Key)
dwb.deferredOps[idx].Finish()
dwb.deferredOps[idx] = nil
}
func (dwb *DeferredWriteBatch) Apply() error {
return dwb.wb.Apply()
}
func (dwb *DeferredWriteBatch) Close() error {
err := dwb.wb.Close()
dwb.release()
return err
}
*/
2 changes: 0 additions & 2 deletions internal/storagenode/logstream/append.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,11 +249,9 @@ func (lse *Executor) prepareAppendContext(dataBatch [][]byte, apc *appendContext
st.wwg = newWriteWaitGroup()
apc.wwg = st.wwg

// st.dwb = lse.stg.NewWriteBatch().Deferred(batchletClassIdx)
st.wb = lse.stg.NewWriteBatch()
st.cwts = newListQueue()
for i := 0; i < len(dataBatch); i++ {
// st.dwb.PutData(batchletData[i])
logEntrySize := int64(len(dataBatch[i]))
apc.totalBytes += logEntrySize
if lse.lsm != nil {
Expand Down
73 changes: 0 additions & 73 deletions internal/storagenode/logstream/commit_wait_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,76 +89,3 @@ func (cwq *commitWaitQueue) size() int {
cwq.mu.Unlock()
return ret
}

//type commitWaitQueueIterator struct {
// curr *list.Element
// queue *commitWaitQueue
//}
//
//func (iter commitWaitQueueIterator) task() *commitWaitTask {
// if !iter.valid() {
// return nil
// }
// return iter.curr.Value.(*commitWaitTask)
//}
//
//func (iter *commitWaitQueueIterator) next() bool {
// iter.queue.mu.Lock()
// defer iter.queue.mu.Unlock()
// if iter.curr != nil {
// iter.curr = iter.curr.Prev()
// }
// return iter.valid()
//}
//
//func (iter commitWaitQueueIterator) valid() bool {
// return iter.curr != nil
//}
//
//type commitWaitQueue struct {
// queue *list.List
// mu sync.Mutex
//}
//
//func newCommitWaitQueue() *commitWaitQueue {
// return &commitWaitQueue{
// queue: list.New(),
// }
//}
//
//func (cwq *commitWaitQueue) push(cwt *commitWaitTask) error {
// if cwt == nil {
// panic("log stream: commit wait queue: task is nil")
// }
// cwq.mu.Lock()
// cwq.queue.PushFront(cwt)
// cwq.mu.Unlock()
// return nil
//}
//
//func (cwq *commitWaitQueue) peekIterator() commitWaitQueueIterator {
// cwq.mu.Lock()
// iter := commitWaitQueueIterator{
// curr: cwq.queue.Back(),
// queue: cwq,
// }
// cwq.mu.Unlock()
// return iter
//}
//
//func (cwq *commitWaitQueue) pop() *commitWaitTask {
// cwq.mu.Lock()
// defer cwq.mu.Unlock()
// elem := cwq.queue.Back()
// if elem == nil {
// return nil
// }
// return cwq.queue.Remove(elem).(*commitWaitTask)
//}
//
//func (cwq *commitWaitQueue) size() int {
// cwq.mu.Lock()
// ret := cwq.queue.Len()
// cwq.mu.Unlock()
// return ret
//}
17 changes: 2 additions & 15 deletions internal/storagenode/logstream/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ func (sq *sequencer) sequenceLoopInternal(ctx context.Context, st *sequenceTask)
if err := st.wb.Set(sq.llsn, st.dataBatch[dataIdx]); err != nil {
// TODO: handle error
}
// st.dwb.SetLLSN(dataIdx, sq.llsn)
}

operationEndTime = time.Now()
Expand Down Expand Up @@ -248,9 +247,8 @@ var sequenceTaskPool = sync.Pool{
}

type sequenceTask struct {
wwg *writeWaitGroup
awgs []*appendWaitGroup
// dwb *storage.DeferredWriteBatch
wwg *writeWaitGroup
awgs []*appendWaitGroup
wb *storage.WriteBatch
dataBatch [][]byte
cwts *listQueue
Expand All @@ -262,20 +260,9 @@ func newSequenceTask() *sequenceTask {
return st
}

//func newSequenceTask(wwg *writeWaitGroup, dwb *storage.DeferredWriteBatch, awgs []*appendWaitGroup, cwts *listQueue, rts []*replicateTask) *sequenceTask {
// st := sequenceTaskPool.Get().(*sequenceTask)
// st.wwg = wwg
// st.awgs = awgs
// st.dwb = dwb
// st.cwts = cwts
// st.rts = rts
// return st
//}

func (st *sequenceTask) release() {
st.wwg = nil
st.awgs = nil
// st.dwb = nil
st.wb = nil
st.dataBatch = nil
st.cwts = nil
Expand Down
3 changes: 1 addition & 2 deletions internal/storagenode/logstream/sequencer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,8 @@ func testSequenceTask(stg *storage.Storage) *sequenceTask {
awg := newAppendWaitGroup(st.wwg)
st.awgs = append(st.awgs, awg)

st.wb = stg.NewWriteBatch() // .Deferred(0)
st.wb = stg.NewWriteBatch()
st.dataBatch = [][]byte{nil}
// st.dwb.PutData(nil)

st.cwts = newListQueue()
st.cwts.PushFront(newCommitWaitTask(awg))
Expand Down
2 changes: 0 additions & 2 deletions internal/storagenode/logstream/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ func (w *writer) writeLoopInternal(_ context.Context, st *sequenceTask) {
cnt := len(st.awgs)
defer func() {
st.wwg.done(err)
// _ = st.dwb.Close()
_ = st.wb.Close()
st.release()
inflight := w.inflight.Add(-1)
Expand All @@ -100,7 +99,6 @@ func (w *writer) writeLoopInternal(_ context.Context, st *sequenceTask) {
w.lse.lsm.WriterInflightOperations.Store(inflight)
}()

// err = st.dwb.Apply()
err = st.wb.Apply()
if err != nil {
w.logger.Error("could not apply data", zap.Error(err))
Expand Down

0 comments on commit a5563c6

Please sign in to comment.