From 93894ffc5c0e35ae2acb92b86ab70742e7e60ea0 Mon Sep 17 00:00:00 2001 From: Peter Broadhurst Date: Tue, 16 Jan 2024 12:54:55 -0500 Subject: [PATCH] Work through validation of batch logic Signed-off-by: Peter Broadhurst --- docs/reference/config.md | 4 +- internal/events/event_dispatcher_test.go | 165 +++++++++++++++++++++++ internal/events/event_poller_test.go | 59 ++++++++ 3 files changed, 226 insertions(+), 2 deletions(-) diff --git a/docs/reference/config.md b/docs/reference/config.md index 85524723d2..45f63e89b5 100644 --- a/docs/reference/config.md +++ b/docs/reference/config.md @@ -224,7 +224,7 @@ nav_order: 2 |Key|Description|Type|Default Value| |---|-----------|----|-------------| |batchSize|The maximum number of records to read from the DB before performing an aggregation run|[`BytesSize`](https://pkg.go.dev/github.com/docker/go-units#BytesSize)|`200` -|batchTimeout|How long to wait for new events to arrive before performing aggregation on a page of events|[`time.Duration`](https://pkg.go.dev/time#Duration)|`250ms` +|batchTimeout|How long to wait for new events to arrive before performing aggregation on a page of events|[`time.Duration`](https://pkg.go.dev/time#Duration)|`0ms` |firstEvent|The first event the aggregator should process, if no previous offest is stored in the DB. Valid options are `oldest` or `newest`|`string`|`oldest` |pollTimeout|The time to wait without a notification of new events, before trying a select on the table|[`time.Duration`](https://pkg.go.dev/time#Duration)|`30s` |rewindQueryLimit|Safety limit on the maximum number of records to search when performing queries to search for rewinds|`int`|`1000` @@ -249,7 +249,7 @@ nav_order: 2 |Key|Description|Type|Default Value| |---|-----------|----|-------------| -|batchTimeout|A short time to wait for new events to arrive before re-polling for new events|[`time.Duration`](https://pkg.go.dev/time#Duration)|`250ms` +|batchTimeout|A short time to wait for new events to arrive before re-polling for new events|[`time.Duration`](https://pkg.go.dev/time#Duration)|`0ms` |bufferLength|The number of events + attachments an individual dispatcher should hold in memory ready for delivery to the subscription|`int`|`5` |pollTimeout|The time to wait without a notification of new events, before trying a select on the table|[`time.Duration`](https://pkg.go.dev/time#Duration)|`30s` diff --git a/internal/events/event_dispatcher_test.go b/internal/events/event_dispatcher_test.go index d1cd02b233..a056c32518 100644 --- a/internal/events/event_dispatcher_test.go +++ b/internal/events/event_dispatcher_test.go @@ -362,6 +362,171 @@ func TestEventDispatcherNoReadAheadInOrder(t *testing.T) { mdm.AssertExpectations(t) } +func TestEventDispatcherBatchBased(t *testing.T) { + log.SetLevel("debug") + three := uint16(3) + longTime := "1m" + subID := fftypes.NewUUID() + truthy := true + sub := &subscription{ + dispatcherElection: make(chan bool, 1), + definition: &core.Subscription{ + SubscriptionRef: core.SubscriptionRef{ID: subID, Namespace: "ns1", Name: "sub1"}, + Options: core.SubscriptionOptions{ + SubscriptionCoreOptions: core.SubscriptionCoreOptions{ + Batch: &truthy, + ReadAhead: &three, + BatchTimeout: &longTime, // because the batch should fill + }, + }, + }, + eventMatcher: regexp.MustCompile(fmt.Sprintf("^%s|%s$", core.EventTypeMessageConfirmed, core.EventTypeMessageConfirmed)), + } + + ed, cancel := newTestEventDispatcher(sub) + defer cancel() + go ed.deliverEvents() + ed.eventPoller.offsetCommitted = make(chan int64, 3) + mdi := ed.database.(*databasemocks.Plugin) + mei := ed.transport.(*eventsmocks.Plugin) + mdm := ed.data.(*datamocks.Manager) + + eventDeliveries := make(chan []*core.CombinedEventDataDelivery) + deliveryRequestMock := mei.On("BatchDeliveryRequest", ed.ctx, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) + deliveryRequestMock.RunFn = func(a mock.Arguments) { + eventDeliveries <- a.Get(3).([]*core.CombinedEventDataDelivery) + } + + // Setup the IDs + ref1 := fftypes.NewUUID() + ev1 := fftypes.NewUUID() + ref2 := fftypes.NewUUID() + ev2 := fftypes.NewUUID() + ref3 := fftypes.NewUUID() + ev3 := fftypes.NewUUID() + ref4 := fftypes.NewUUID() + ev4 := fftypes.NewUUID() + + // Setup enrichment + mdm.On("GetMessageWithDataCached", mock.Anything, ref1).Return(&core.Message{ + Header: core.MessageHeader{ID: ref1}, + }, nil, true, nil) + mdm.On("GetMessageWithDataCached", mock.Anything, ref2).Return(&core.Message{ + Header: core.MessageHeader{ID: ref2}, + }, nil, true, nil) + mdm.On("GetMessageWithDataCached", mock.Anything, ref3).Return(&core.Message{ + Header: core.MessageHeader{ID: ref3}, + }, nil, true, nil) + mdm.On("GetMessageWithDataCached", mock.Anything, ref4).Return(&core.Message{ + Header: core.MessageHeader{ID: ref4}, + }, nil, true, nil) + + // Deliver a batch of messages + batch1Done := make(chan struct{}) + go func() { + repoll, err := ed.bufferedDelivery([]core.LocallySequenced{ + &core.Event{ID: ev1, Sequence: 10000001, Reference: ref1, Type: core.EventTypeMessageConfirmed}, // match + &core.Event{ID: ev2, Sequence: 10000002, Reference: ref2, Type: core.EventTypeMessageRejected}, + &core.Event{ID: ev3, Sequence: 10000003, Reference: ref3, Type: core.EventTypeMessageConfirmed}, // match + &core.Event{ID: ev4, Sequence: 10000004, Reference: ref4, Type: core.EventTypeMessageConfirmed}, // match + }) + assert.NoError(t, err) + assert.True(t, repoll) + close(batch1Done) + }() + + // Expect to get the batch dispatched - with the three matching events + events := <-eventDeliveries + assert.Len(t, events, 3) + assert.Equal(t, *ev1, *events[0].Event.ID) + assert.Equal(t, *ref1, *events[0].Event.Message.Header.ID) + assert.Equal(t, *ev3, *events[1].Event.ID) + assert.Equal(t, *ref3, *events[1].Event.Message.Header.ID) + assert.Equal(t, *ev4, *events[2].Event.ID) + assert.Equal(t, *ref4, *events[2].Event.Message.Header.ID) + + // Ack the batch + go func() { + ed.deliveryResponse(&core.EventDeliveryResponse{ID: events[0].Event.ID}) + ed.deliveryResponse(&core.EventDeliveryResponse{ID: events[1].Event.ID}) + ed.deliveryResponse(&core.EventDeliveryResponse{ID: events[2].Event.ID}) + }() + + assert.Equal(t, int64(10000001), <-ed.eventPoller.offsetCommitted) + assert.Equal(t, int64(10000003), <-ed.eventPoller.offsetCommitted) + assert.Equal(t, int64(10000004), <-ed.eventPoller.offsetCommitted) + + // This should complete the batch + <-batch1Done + + mdi.AssertExpectations(t) + mei.AssertExpectations(t) + mdm.AssertExpectations(t) +} + +func TestEventDispatcherBatchDispatchFail(t *testing.T) { + log.SetLevel("debug") + two := uint16(2) + longTime := "1m" + subID := fftypes.NewUUID() + truthy := true + sub := &subscription{ + dispatcherElection: make(chan bool, 1), + definition: &core.Subscription{ + SubscriptionRef: core.SubscriptionRef{ID: subID, Namespace: "ns1", Name: "sub1"}, + Options: core.SubscriptionOptions{ + SubscriptionCoreOptions: core.SubscriptionCoreOptions{ + Batch: &truthy, + ReadAhead: &two, + BatchTimeout: &longTime, // because the batch should fill + }, + }, + }, + eventMatcher: regexp.MustCompile(fmt.Sprintf("^%s|%s$", core.EventTypeMessageConfirmed, core.EventTypeMessageConfirmed)), + } + + ed, cancel := newTestEventDispatcher(sub) + defer cancel() + go ed.deliverEvents() + ed.eventPoller.offsetCommitted = make(chan int64, 3) + mdi := ed.database.(*databasemocks.Plugin) + mei := ed.transport.(*eventsmocks.Plugin) + mdm := ed.data.(*datamocks.Manager) + + eventDeliveries := make(chan []*core.CombinedEventDataDelivery) + deliveryRequestMock := mei.On("BatchDeliveryRequest", ed.ctx, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("pop")) + deliveryRequestMock.RunFn = func(a mock.Arguments) { + eventDeliveries <- a.Get(3).([]*core.CombinedEventDataDelivery) + } + + // Deliver a batch of messages + batch1Done := make(chan struct{}) + go func() { + repoll, err := ed.bufferedDelivery([]core.LocallySequenced{ + &core.Event{ID: fftypes.NewUUID(), Sequence: 10000001, Type: core.EventTypeMessageConfirmed}, + &core.Event{ID: fftypes.NewUUID(), Sequence: 10000002, Type: core.EventTypeMessageConfirmed}, + }) + assert.NoError(t, err) + assert.True(t, repoll) + close(batch1Done) + }() + + mdm.On("GetMessageWithDataCached", mock.Anything, mock.Anything).Return(&core.Message{ + Header: core.MessageHeader{ID: fftypes.NewUUID()}, + }, nil, true, nil) + + // Expect to get the batch dispatched - with the three matching events + events := <-eventDeliveries + assert.Len(t, events, 2) + + // This should complete the batch + <-batch1Done + + mdi.AssertExpectations(t) + mei.AssertExpectations(t) + mdm.AssertExpectations(t) +} + func TestEnrichEventsFailGetMessages(t *testing.T) { sub := &subscription{ diff --git a/internal/events/event_poller_test.go b/internal/events/event_poller_test.go index 04f2ac8fc4..de080a5a2e 100644 --- a/internal/events/event_poller_test.go +++ b/internal/events/event_poller_test.go @@ -219,6 +219,57 @@ func TestReadPageSingleCommitEvent(t *testing.T) { mdi.AssertExpectations(t) } +func TestReadPageBatchTimeoutNotFull(t *testing.T) { + mdi := &databasemocks.Plugin{} + processEventCalled := make(chan []core.LocallySequenced, 1) + ep, cancel := newTestEventPoller(t, mdi, func(events []core.LocallySequenced) (bool, error) { + processEventCalled <- events + return false, nil + }, nil) + ep.conf.eventBatchTimeout = 1 * time.Microsecond + ep.conf.eventBatchSize = 3 + ev1 := core.NewEvent(core.EventTypeMessageConfirmed, "ns1", fftypes.NewUUID(), nil, "") + ev2 := core.NewEvent(core.EventTypeMessageConfirmed, "ns1", fftypes.NewUUID(), nil, "") + mdi.On("GetEvents", mock.Anything, "unit", mock.Anything).Return([]*core.Event{ev1}, nil, nil).Once() // half batch + mdi.On("GetEvents", mock.Anything, "unit", mock.Anything).Return([]*core.Event{ev1, ev2}, nil, nil).Run(func(args mock.Arguments) { + ep.shoulderTap() + }).Once() + mdi.On("GetEvents", mock.Anything, "unit", mock.Anything).Return(nil, nil, fmt.Errorf("context done")).Run(func(args mock.Arguments) { + cancel() + }) + ep.eventLoop() + + events := <-processEventCalled + assert.Equal(t, *ev1.ID, *events[0].(*core.Event).ID) + assert.Equal(t, *ev2.ID, *events[1].(*core.Event).ID) + mdi.AssertExpectations(t) +} + +func TestReadPageBatchFull(t *testing.T) { + mdi := &databasemocks.Plugin{} + processEventCalled := make(chan []core.LocallySequenced, 1) + ep, cancel := newTestEventPoller(t, mdi, func(events []core.LocallySequenced) (bool, error) { + processEventCalled <- events + return false, nil + }, nil) + ep.conf.eventBatchTimeout = 1 * time.Microsecond + ep.conf.eventBatchSize = 2 + ev1 := core.NewEvent(core.EventTypeMessageConfirmed, "ns1", fftypes.NewUUID(), nil, "") + ev2 := core.NewEvent(core.EventTypeMessageConfirmed, "ns1", fftypes.NewUUID(), nil, "") + mdi.On("GetEvents", mock.Anything, "unit", mock.Anything).Return([]*core.Event{ev1, ev2}, nil, nil).Run(func(args mock.Arguments) { + ep.shoulderTap() + }).Once() + mdi.On("GetEvents", mock.Anything, "unit", mock.Anything).Return(nil, nil, fmt.Errorf("context done")).Run(func(args mock.Arguments) { + cancel() + }) + ep.eventLoop() + + events := <-processEventCalled + assert.Equal(t, *ev1.ID, *events[0].(*core.Event).ID) + assert.Equal(t, *ev2.ID, *events[1].(*core.Event).ID) + mdi.AssertExpectations(t) +} + func TestReadPageRewind(t *testing.T) { mdi := &databasemocks.Plugin{} processEventCalled := make(chan core.LocallySequenced, 1) @@ -325,6 +376,14 @@ func TestDoubleTap(t *testing.T) { ep.shoulderTap() // this should not block } +func TestWaitForBatchTimeoutClosedContext(t *testing.T) { + mdi := &databasemocks.Plugin{} + ep, cancel := newTestEventPoller(t, mdi, nil, nil) + ep.conf.eventBatchTimeout = 1 * time.Minute + cancel() + ep.waitForBatchTimeout() +} + func TestDoubleConfirm(t *testing.T) { mdi := &databasemocks.Plugin{} ep, cancel := newTestEventPoller(t, mdi, nil, nil)