Skip to content

Commit

Permalink
Fix bucket.Flush to ensure its atomicity
Browse files Browse the repository at this point in the history
  • Loading branch information
RussellLuo committed Feb 18, 2022
1 parent a56d53c commit 54845bd
Showing 1 changed file with 9 additions and 10 deletions.
19 changes: 9 additions & 10 deletions bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (b *bucket) Add(t *Timer) {

func (b *bucket) remove(t *Timer) bool {
if t.getBucket() != b {
// If remove is called from t.Stop, and this happens just after the timing wheel's goroutine has:
// If remove is called from within t.Stop, and this happens just after the timing wheel's goroutine has:
// 1. removed t from b (through b.Flush -> b.remove)
// 2. moved t from b to another bucket ab (through b.Flush -> b.remove and ab.Add)
// then t.getBucket will return nil for case 1, or ab (non-nil) for case 2.
Expand All @@ -112,23 +112,22 @@ func (b *bucket) Remove(t *Timer) bool {
}

func (b *bucket) Flush(reinsert func(*Timer)) {
var ts []*Timer

b.mu.Lock()
defer b.mu.Unlock()

for e := b.timers.Front(); e != nil; {
next := e.Next()

t := e.Value.(*Timer)
b.remove(t)
ts = append(ts, t)
// Note that this operation will either execute the timer's task, or
// insert the timer into another bucket belonging to a lower-level wheel.
//
// In either case, no further lock operation will happen to b.mu.
reinsert(t)

e = next
}
b.mu.Unlock()

b.SetExpiration(-1) // TODO: Improve the coordination with b.Add()

for _, t := range ts {
reinsert(t)
}
b.SetExpiration(-1)
}

0 comments on commit 54845bd

Please sign in to comment.