From 7672fca7e0dfcad8a1ae5c4e1c0c5280e8f97796 Mon Sep 17 00:00:00 2001 From: Tony Holdstock-Brown Date: Tue, 27 Feb 2024 13:39:13 -0800 Subject: [PATCH 1/2] pubsub: Ensure batcher flushes on shutdown, even if min batch size isn't met This PR ensures that the batcher flushes on shutdown, even if the pending length is less than the min batch size specified. Sending events is preferred to dropping, even if limits are not obeyed. --- pubsub/batcher/batcher.go | 40 ++++++++++++++++++++++++++-------- pubsub/batcher/batcher_test.go | 28 ++++++++++++++++++++++++ 2 files changed, 59 insertions(+), 9 deletions(-) diff --git a/pubsub/batcher/batcher.go b/pubsub/batcher/batcher.go index 917cef822a..ad1667e0b9 100644 --- a/pubsub/batcher/batcher.go +++ b/pubsub/batcher/batcher.go @@ -200,26 +200,40 @@ func (b *Batcher) AddNoWait(item interface{}) <-chan error { if b.nHandlers < b.opts.MaxHandlers { // If we can start a handler, do so with the item just added and any others that are pending. batch := b.nextBatch() - if batch != nil { - b.wg.Add(1) - go func() { - b.callHandler(batch) - b.wg.Done() - }() - b.nHandlers++ - } + b.handleBatch(batch) } // If we can't start a handler, then one of the currently running handlers will // take our item. return c } +func (b *Batcher) handleBatch(batch []waiter) { + if batch == nil || len(batch) == 0 { + return + } + + b.wg.Add(1) + go func() { + b.callHandler(batch) + b.wg.Done() + }() + b.nHandlers++ +} + // nextBatch returns the batch to process, and updates b.pending. // It returns nil if there's no batch ready for processing. // b.mu must be held. func (b *Batcher) nextBatch() []waiter { if len(b.pending) < b.opts.MinBatchSize { - return nil + // We handle minimum batch sizes depending on specific + // situations. + // XXX: If we allow max batch lifetimes, handle that here. + if b.shutdown == false { + // If we're not shutting down, respect minimums. If we're + // shutting down, though, we ignore minimums to flush the + // entire batch. + return nil + } } if b.opts.MaxBatchByteSize == 0 && (b.opts.MaxBatchSize == 0 || len(b.pending) <= b.opts.MaxBatchSize) { @@ -283,5 +297,13 @@ func (b *Batcher) Shutdown() { b.mu.Lock() b.shutdown = true b.mu.Unlock() + + // On shutdown, ensure that we attempt to flush any pending items + // if there's a minimum batch size. + if b.nHandlers < b.opts.MaxHandlers { + batch := b.nextBatch() + b.handleBatch(batch) + } + b.wg.Wait() } diff --git a/pubsub/batcher/batcher_test.go b/pubsub/batcher/batcher_test.go index e7c5dd96c1..9b0ee3c055 100644 --- a/pubsub/batcher/batcher_test.go +++ b/pubsub/batcher/batcher_test.go @@ -171,6 +171,34 @@ func TestMinBatchSize(t *testing.T) { } } +// TestMinBatchSizeFlushesOnShutdown ensures that Shutdown() flushes batches, even if +// the pending count is less than the minimum batch size. +func TestMinBatchSizeFlushesOnShutdown(t *testing.T) { + var got [][]int + + batchSize := 3 + + b := batcher.New(reflect.TypeOf(int(0)), &batcher.Options{MinBatchSize: batchSize}, func(items interface{}) error { + got = append(got, items.([]int)) + return nil + }) + for i := 0; i < (batchSize - 1); i++ { + b.AddNoWait(i) + } + + // Ensure that we've received nothing + if len(got) > 0 { + t.Errorf("got batch unexpectedly: %+v", got) + } + + b.Shutdown() + + want := [][]int{{0, 1}} + if !cmp.Equal(got, want) { + t.Errorf("got %+v, want %+v on shutdown", got, want) + } +} + func TestSaturation(t *testing.T) { // Verify that under high load the maximum number of handlers are running. ctx := context.Background() From bfb28adae2dff8fb6fad1bb1cfd517d629fc7626 Mon Sep 17 00:00:00 2001 From: Tony Holdstock-Brown Date: Fri, 1 Mar 2024 15:18:41 -0800 Subject: [PATCH 2/2] Atomically update nHandlers --- pubsub/batcher/batcher.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/pubsub/batcher/batcher.go b/pubsub/batcher/batcher.go index ad1667e0b9..489c311865 100644 --- a/pubsub/batcher/batcher.go +++ b/pubsub/batcher/batcher.go @@ -23,6 +23,7 @@ import ( "errors" "reflect" "sync" + "sync/atomic" ) // Split determines how to split n (representing n items) into batches based on @@ -70,7 +71,7 @@ type Batcher struct { mu sync.Mutex pending []waiter // items waiting to be handled - nHandlers int // number of currently running handler goroutines + nHandlers int32 // number of currently running handler goroutines shutdown bool } @@ -197,7 +198,7 @@ func (b *Batcher) AddNoWait(item interface{}) <-chan error { // Add the item to the pending list. b.pending = append(b.pending, waiter{item, c}) - if b.nHandlers < b.opts.MaxHandlers { + if atomic.LoadInt32(&b.nHandlers) < int32(b.opts.MaxHandlers) { // If we can start a handler, do so with the item just added and any others that are pending. batch := b.nextBatch() b.handleBatch(batch) @@ -217,7 +218,7 @@ func (b *Batcher) handleBatch(batch []waiter) { b.callHandler(batch) b.wg.Done() }() - b.nHandlers++ + atomic.AddInt32(&b.nHandlers, 1) } // nextBatch returns the batch to process, and updates b.pending. @@ -284,7 +285,7 @@ func (b *Batcher) callHandler(batch []waiter) { // always get to run. batch = b.nextBatch() if batch == nil { - b.nHandlers-- + atomic.AddInt32(&b.nHandlers, -1) } b.mu.Unlock() } @@ -300,7 +301,7 @@ func (b *Batcher) Shutdown() { // On shutdown, ensure that we attempt to flush any pending items // if there's a minimum batch size. - if b.nHandlers < b.opts.MaxHandlers { + if atomic.LoadInt32(&b.nHandlers) < int32(b.opts.MaxHandlers) { batch := b.nextBatch() b.handleBatch(batch) }