Skip to content

Commit

Permalink
refactor(storagenode): committer to remove requireCommitWaitTasks par…
Browse files Browse the repository at this point in the history
…ameter

This PR removes the requireCommitWaitTasks parameter from the commitInternal
method. The parameter was used to support sync replication, which is the
recovery between two replicas in a log stream. However, sync replication doesn't
use commitInternal anymore.
  • Loading branch information
ijsong committed Feb 3, 2025
1 parent 176898b commit efdf19f
Showing 1 changed file with 7 additions and 17 deletions.
24 changes: 7 additions & 17 deletions internal/storagenode/logstream/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,10 +218,10 @@ func (cm *committer) commit(_ context.Context, ct *commitTask) error {
CommittedLLSNBegin: uncommittedLLSNBegin,
}

return cm.commitInternal(commitContext, true)
return cm.commitInternal(commitContext)
}

func (cm *committer) commitInternal(cc storage.CommitContext, requireCommitWaitTasks bool) (err error) {
func (cm *committer) commitInternal(cc storage.CommitContext) (err error) {
_, _, uncommittedBegin, _ := cm.lse.lsc.reportCommitBase()
uncommttedLLSNBegin := uncommittedBegin.LLSN

Expand All @@ -236,7 +236,7 @@ func (cm *committer) commitInternal(cc storage.CommitContext, requireCommitWaitT
// size of commitWaitQueue. For instance, a batch of log entries could be written to the
// storage, however, it is failed to push them into the commitWaitQueue.
// See #VARLOG-444.
if requireCommitWaitTasks && cm.commitWaitQ.size() < numCommits {
if cm.commitWaitQ.size() < numCommits {
return nil
}

Expand Down Expand Up @@ -269,14 +269,10 @@ func (cm *committer) commitInternal(cc storage.CommitContext, requireCommitWaitT
return err
}

// If requireCommitWaitTasks is true, since the number of tasks in commitWaitQ is
// inspected above, cwt must exist.
// If cwt is null, it means that there is no task in commitWaitQ anymore. When this
// method is executed by SyncReplicate, it is okay for cwt not to exist.
// Since the number of tasks in commitWaitQ is inspected above, cwt
// must exist.
cwt := iter.task()
if cwt != nil {
cwt.awg.setGLSN(glsn)
}
cwt.awg.setGLSN(glsn)

err = cb.Set(llsn, glsn)
if err != nil {
Expand All @@ -295,12 +291,7 @@ func (cm *committer) commitInternal(cc storage.CommitContext, requireCommitWaitT
// NOTE: This cwt should not be nil, because the size of commitWaitQ is inspected
// above.
cwt := cm.commitWaitQ.pop()
if cwt != nil {
// FIXME (jun): If sync replication is occurred, cwt may be nil.
// It means that above codes tried to pop from empty commitWaitQ.
// It is very difficult to detect bug, so it should be re-organized.
committedTasks = append(committedTasks, cwt)
}
committedTasks = append(committedTasks, cwt)
}

if numCommits > 0 {
Expand All @@ -323,7 +314,6 @@ func (cm *committer) commitInternal(cc storage.CommitContext, requireCommitWaitT
cwt.release()
}

// NOTE: When sync replication is occurred, it can be zero.
if len(committedTasks) > 0 {
cm.inflightCommitWait.Add(int64(-numCommits))
}
Expand Down

0 comments on commit efdf19f

Please sign in to comment.