Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PR 1424 and 1447 #100

Merged
merged 10 commits into from
Jan 19, 2024
7 changes: 4 additions & 3 deletions docs/reference/config.md
Original file line number Diff line number Diff line change
@@ -224,7 +224,7 @@ nav_order: 2
|Key|Description|Type|Default Value|
|---|-----------|----|-------------|
|batchSize|The maximum number of records to read from the DB before performing an aggregation run|[`BytesSize`](https://pkg.go.dev/github.com/docker/go-units#BytesSize)|`200`
|batchTimeout|How long to wait for new events to arrive before performing aggregation on a page of events|[`time.Duration`](https://pkg.go.dev/time#Duration)|`250ms`
|batchTimeout|How long to wait for new events to arrive before performing aggregation on a page of events|[`time.Duration`](https://pkg.go.dev/time#Duration)|`0ms`
|firstEvent|The first event the aggregator should process, if no previous offest is stored in the DB. Valid options are `oldest` or `newest`|`string`|`oldest`
|pollTimeout|The time to wait without a notification of new events, before trying a select on the table|[`time.Duration`](https://pkg.go.dev/time#Duration)|`30s`
|rewindQueryLimit|Safety limit on the maximum number of records to search when performing queries to search for rewinds|`int`|`1000`
@@ -249,7 +249,7 @@ nav_order: 2

|Key|Description|Type|Default Value|
|---|-----------|----|-------------|
|batchTimeout|A short time to wait for new events to arrive before re-polling for new events|[`time.Duration`](https://pkg.go.dev/time#Duration)|`250ms`
|batchTimeout|A short time to wait for new events to arrive before re-polling for new events|[`time.Duration`](https://pkg.go.dev/time#Duration)|`0ms`
|bufferLength|The number of events + attachments an individual dispatcher should hold in memory ready for delivery to the subscription|`int`|`5`
|pollTimeout|The time to wait without a notification of new events, before trying a select on the table|[`time.Duration`](https://pkg.go.dev/time#Duration)|`30s`

@@ -1356,7 +1356,8 @@ nav_order: 2

|Key|Description|Type|Default Value|
|---|-----------|----|-------------|
|batchSize|Default read ahead to enable for subscriptions that do not explicitly configure readahead|`int`|`0`
|batchSize|Default read ahead to enable for subscriptions that do not explicitly configure readahead|`int`|`50`
|batchTimeout|Default batch timeout|`int`|`50ms`

## subscription.retry

2 changes: 1 addition & 1 deletion docs/reference/types/wsack.md
Original file line number Diff line number Diff line change
@@ -37,7 +37,7 @@ nav_order: 24

| Field Name | Description | Type |
|------------|-------------|------|
| `type` | WSActionBase.type | `FFEnum`:<br/>`"start"`<br/>`"ack"`<br/>`"protocol_error"` |
| `type` | WSActionBase.type | `FFEnum`:<br/>`"start"`<br/>`"ack"`<br/>`"protocol_error"`<br/>`"event_batch"` |
| `id` | WSAck.id | [`UUID`](simpletypes#uuid) |
| `subscription` | WSAck.subscription | [`SubscriptionRef`](#subscriptionref) |

2 changes: 1 addition & 1 deletion docs/reference/types/wserror.md
Original file line number Diff line number Diff line change
@@ -33,6 +33,6 @@ nav_order: 25

| Field Name | Description | Type |
|------------|-------------|------|
| `type` | WSAck.type | `FFEnum`:<br/>`"start"`<br/>`"ack"`<br/>`"protocol_error"` |
| `type` | WSAck.type | `FFEnum`:<br/>`"start"`<br/>`"ack"`<br/>`"protocol_error"`<br/>`"event_batch"` |
| `error` | WSAck.error | `string` |

2 changes: 1 addition & 1 deletion docs/reference/types/wsstart.md
Original file line number Diff line number Diff line change
@@ -42,7 +42,7 @@ nav_order: 23

| Field Name | Description | Type |
|------------|-------------|------|
| `type` | WSActionBase.type | `FFEnum`:<br/>`"start"`<br/>`"ack"`<br/>`"protocol_error"` |
| `type` | WSActionBase.type | `FFEnum`:<br/>`"start"`<br/>`"ack"`<br/>`"protocol_error"`<br/>`"event_batch"` |
| `autoack` | WSStart.autoack | `bool` |
| `namespace` | WSStart.namespace | `string` |
| `name` | WSStart.name | `string` |
15 changes: 9 additions & 6 deletions internal/coreconfig/coreconfig.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
@@ -327,8 +327,10 @@ var (
OrgDescription = ffc("org.description")
// OrchestratorStartupAttempts is how many time to attempt to connect to core infrastructure on startup
OrchestratorStartupAttempts = ffc("orchestrator.startupAttempts")
// SubscriptionDefaultsReadAhead default read ahead to enable for subscriptions that do not explicitly configure readahead
SubscriptionDefaultsReadAhead = ffc("subscription.defaults.batchSize")
// SubscriptionDefaultsBatchSize default read ahead to enable for subscriptions that do not explicitly configure readahead
SubscriptionDefaultsBatchSize = ffc("subscription.defaults.batchSize")
// SubscriptionDefaultsBatchTimeout default batch timeout
SubscriptionDefaultsBatchTimeout = ffc("subscription.defaults.batchTimeout")
// SubscriptionMax maximum number of pre-defined subscriptions that can exist (note for high fan-out consider connecting a dedicated pub/sub broker to the dispatcher)
SubscriptionMax = ffc("subscription.max")
// SubscriptionsRetryInitialDelay is the initial retry delay
@@ -404,7 +406,7 @@ func setDefaults() {
viper.SetDefault(string(DownloadRetryFactor), 2.0)
viper.SetDefault(string(EventAggregatorFirstEvent), core.SubOptsFirstEventOldest)
viper.SetDefault(string(EventAggregatorBatchSize), 200)
viper.SetDefault(string(EventAggregatorBatchTimeout), "250ms")
viper.SetDefault(string(EventAggregatorBatchTimeout), "0ms")
viper.SetDefault(string(EventAggregatorPollTimeout), "30s")
viper.SetDefault(string(EventAggregatorRewindTimeout), "50ms")
viper.SetDefault(string(EventAggregatorRewindQueueLength), 10)
@@ -414,7 +416,7 @@ func setDefaults() {
viper.SetDefault(string(EventAggregatorRetryMaxDelay), "30s")
viper.SetDefault(string(EventDBEventsBufferSize), 100)
viper.SetDefault(string(EventDispatcherBufferLength), 5)
viper.SetDefault(string(EventDispatcherBatchTimeout), "250ms")
viper.SetDefault(string(EventDispatcherBatchTimeout), "0ms")
viper.SetDefault(string(EventDispatcherPollTimeout), "30s")
viper.SetDefault(string(EventTransportsEnabled), []string{"websockets", "webhooks"})
viper.SetDefault(string(EventTransportsDefault), "websockets")
@@ -451,7 +453,8 @@ func setDefaults() {
viper.SetDefault(string(PrivateMessagingBatchSize), 200)
viper.SetDefault(string(PrivateMessagingBatchTimeout), "1s")
viper.SetDefault(string(PrivateMessagingBatchPayloadLimit), "800Kb")
viper.SetDefault(string(SubscriptionDefaultsReadAhead), 0)
viper.SetDefault(string(SubscriptionDefaultsBatchSize), 50)
viper.SetDefault(string(SubscriptionDefaultsBatchTimeout), "50ms")
viper.SetDefault(string(SubscriptionMax), 500)
viper.SetDefault(string(SubscriptionsRetryInitialDelay), "250ms")
viper.SetDefault(string(SubscriptionsRetryMaxDelay), "30s")
7 changes: 4 additions & 3 deletions internal/coremsgs/en_config_descriptions.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
@@ -383,8 +383,9 @@ var (
ConfigPluginSharedstorageIpfsGatewayURL = ffc("config.plugins.sharedstorage[].ipfs.gateway.url", "The URL for the IPFS Gateway", urlStringType)
ConfigPluginSharedstorageIpfsGatewayProxyURL = ffc("config.plugins.sharedstorage[].ipfs.gateway.proxy.url", "Optional HTTP proxy server to use when connecting to the IPFS Gateway", urlStringType)

ConfigSubscriptionMax = ffc("config.subscription.max", "The maximum number of pre-defined subscriptions that can exist (note for high fan-out consider connecting a dedicated pub/sub broker to the dispatcher)", i18n.IntType)
ConfigSubscriptionDefaultsBatchSize = ffc("config.subscription.defaults.batchSize", "Default read ahead to enable for subscriptions that do not explicitly configure readahead", i18n.IntType)
ConfigSubscriptionMax = ffc("config.subscription.max", "The maximum number of pre-defined subscriptions that can exist (note for high fan-out consider connecting a dedicated pub/sub broker to the dispatcher)", i18n.IntType)
ConfigSubscriptionDefaultsBatchSize = ffc("config.subscription.defaults.batchSize", "Default read ahead to enable for subscriptions that do not explicitly configure readahead", i18n.IntType)
ConfigSubscriptionDefaultsBatchTimeout = ffc("config.subscription.defaults.batchTimeout", "Default batch timeout", i18n.IntType)

ConfigTokensName = ffc("config.tokens[].name", "A name to identify this token plugin", i18n.StringType)
ConfigTokensPlugin = ffc("config.tokens[].plugin", "The type of the token plugin to use", i18n.StringType)
153 changes: 69 additions & 84 deletions internal/events/event_dispatcher.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
@@ -65,25 +65,21 @@ type eventDispatcher struct {
elected bool
eventPoller *eventPoller
inflight map[fftypes.UUID]*core.Event
eventDelivery chan *core.EventDelivery
eventDelivery chan []*core.EventDelivery
mux sync.Mutex
namespace string
readAhead int
batch bool
batchTimeout time.Duration
subscription *subscription
txHelper txcommon.Helper
}

func newEventDispatcher(ctx context.Context, enricher *eventEnricher, ei events.Plugin, di database.Plugin, dm data.Manager, bm broadcast.Manager, pm privatemessaging.Manager, connID string, sub *subscription, en *eventNotifier, txHelper txcommon.Helper) *eventDispatcher {
ctx, cancelCtx := context.WithCancel(ctx)
readAhead := config.GetUint(coreconfig.SubscriptionDefaultsReadAhead)
readAhead := uint(0)
if sub.definition.Options.ReadAhead != nil {
readAhead = uint(*sub.definition.Options.ReadAhead)
}
if readAhead > maxReadAhead {
readAhead = maxReadAhead
}

batchTimeout := defaultBatchTimeout
if sub.definition.Options.BatchTimeout != nil && *sub.definition.Options.BatchTimeout != "" {
@@ -108,13 +104,12 @@ func newEventDispatcher(ctx context.Context, enricher *eventEnricher, ei events.
subscription: sub,
namespace: sub.definition.Namespace,
inflight: make(map[fftypes.UUID]*core.Event),
eventDelivery: make(chan *core.EventDelivery, readAhead+1),
eventDelivery: make(chan []*core.EventDelivery, readAhead+1),
readAhead: int(readAhead),
acksNacks: make(chan ackNack),
closed: make(chan struct{}),
txHelper: txHelper,
batch: batch,
batchTimeout: batchTimeout,
}

pollerConf := &eventPollerConf{
@@ -138,6 +133,20 @@ func newEventDispatcher(ctx context.Context, enricher *eventEnricher, ei events.
firstEvent: sub.definition.Options.FirstEvent,
}

// Users can tune the batch related settings.
// This is always true in batch:true cases, and optionally you can use the batchTimeout setting
// to tweak how we optimize ourselves for readahead / latency detection without batching
// (most likely users with this requirement would be best to just move to batch:true).
if batchTimeout > 0 {
pollerConf.eventBatchTimeout = batchTimeout
if batchTimeout > pollerConf.eventPollTimeout {
pollerConf.eventPollTimeout = batchTimeout
}
}
if batch || pollerConf.eventBatchSize < int(readAhead) {
pollerConf.eventBatchSize = ed.readAhead
}

ed.eventPoller = newEventPoller(ctx, di, en, pollerConf)
return ed
}
@@ -165,11 +174,8 @@ func (ed *eventDispatcher) electAndStart() {
ed.elected = true
ed.eventPoller.start()

if ed.batch {
go ed.deliverBatchedEvents()
} else {
go ed.deliverEvents()
}
go ed.deliverEvents()

// Wait until the event poller closes
<-ed.eventPoller.closed
}
@@ -326,7 +332,15 @@ func (ed *eventDispatcher) bufferedDelivery(events []core.LocallySequenced) (boo
ed.mux.Unlock()

dispatched++
ed.eventDelivery <- event
if !ed.batch {
// dispatch individually
ed.eventDelivery <- []*core.EventDelivery{event}
}
}

if ed.batch && len(dispatchable) > 0 {
// Dispatch the whole batch now marked in-flight
ed.eventDelivery <- dispatchable
}

if inflightCount == 0 {
@@ -384,88 +398,59 @@ func (ed *eventDispatcher) handleAckOffsetUpdate(ack ackNack) {
}
}

func (ed *eventDispatcher) deliverBatchedEvents() {
func (ed *eventDispatcher) deliverEvents() {
withData := ed.subscription.definition.Options.WithData != nil && *ed.subscription.definition.Options.WithData

var events []*core.CombinedEventDataDelivery
var batchTimeoutContext context.Context
var batchTimeoutCancel func()
for {
var timeoutContext context.Context
var timedOut bool
if batchTimeoutContext != nil {
timeoutContext = batchTimeoutContext
} else {
timeoutContext = ed.ctx
}
select {
case event, ok := <-ed.eventDelivery:
case events, ok := <-ed.eventDelivery:
if !ok {
if batchTimeoutCancel != nil {
batchTimeoutCancel()
}
return
}

if events == nil {
events = []*core.CombinedEventDataDelivery{}
batchTimeoutContext, batchTimeoutCancel = context.WithTimeout(ed.ctx, ed.batchTimeout)
}

log.L(ed.ctx).Debugf("Dispatching %s event in a batch: %.10d/%s [%s]: ref=%s/%s", ed.transport.Name(), event.Sequence, event.ID, event.Type, event.Namespace, event.Reference)

var data []*core.Data
// As soon as we hit an error, we need to trigger into nack mode
var err error
if withData && event.Message != nil {
data, _, err = ed.data.GetMessageDataCached(ed.ctx, event.Message)
}

events = append(events, &core.CombinedEventDataDelivery{Event: event, Data: data})

if err != nil {
ed.deliveryResponse(&core.EventDeliveryResponse{ID: event.ID, Rejected: true})
}

case <-timeoutContext.Done():
timedOut = true
case <-ed.ctx.Done():
if batchTimeoutCancel != nil {
batchTimeoutCancel()
}
return
}

if len(events) == ed.readAhead || (timedOut && len(events) > 0) {
_ = ed.transport.BatchDeliveryRequest(ed.ctx, ed.connID, ed.subscription.definition, events)
// If err handle all the delivery responses for all the events??
events = nil
}
}
}

// TODO issue here, we can't just call DeliveryRequest with one thing.
func (ed *eventDispatcher) deliverEvents() {
withData := ed.subscription.definition.Options.WithData != nil && *ed.subscription.definition.Options.WithData
for {
select {
case event, ok := <-ed.eventDelivery:
if !ok {
return
// Loop through the events enriching them, and dispatching individually in non-batch mode
eventsWithData := make([]*core.CombinedEventDataDelivery, len(events))
for i := 0; i < len(events); i++ {
e := &core.CombinedEventDataDelivery{
Event: events[i],
}
eventsWithData[i] = e
// The first error we encounter stops us attempting to enrich or dispatch any more events
if err == nil {
log.L(ed.ctx).Debugf("Dispatching %s event: %.10d/%s [%s]: ref=%s/%s", ed.transport.Name(), e.Event.Sequence, e.Event.ID, e.Event.Type, e.Event.Namespace, e.Event.Reference)
if withData && e.Event.Message != nil {
e.Data, _, err = ed.data.GetMessageDataCached(ed.ctx, e.Event.Message)
}
}
// If we are non-batched, we have to deliver each event individually...
if !ed.batch {
// .. only attempt to deliver if we've not triggered into an error scenario for one of the events already
if err == nil {
err = ed.transport.DeliveryRequest(ed.ctx, ed.connID, ed.subscription.definition, e.Event, e.Data)
}
// ... if we've triggered into an error scenario, we need to nack immediately for this and all the rest of the events
if err != nil {
ed.deliveryResponse(&core.EventDeliveryResponse{ID: e.Event.ID, Rejected: true})
}
}
}

log.L(ed.ctx).Debugf("Dispatching %s event: %.10d/%s [%s]: ref=%s/%s", ed.transport.Name(), event.Sequence, event.ID, event.Type, event.Namespace, event.Reference)
var data []*core.Data
var err error
if withData && event.Message != nil {
data, _, err = ed.data.GetMessageDataCached(ed.ctx, event.Message)
// In batch mode we do one dispatch of the whole set as one
if ed.batch {
// Only attempt to deliver if we're in a non error case (enrich might have failed above)
if err == nil {
err = ed.transport.BatchDeliveryRequest(ed.ctx, ed.connID, ed.subscription.definition, eventsWithData)
}
// If we're in an error case we have to nack everything immediately
if err != nil {
for _, e := range events {
ed.deliveryResponse(&core.EventDeliveryResponse{ID: e.Event.ID, Rejected: true})
}
}
}

if err == nil {
err = ed.transport.DeliveryRequest(ed.ctx, ed.connID, ed.subscription.definition, event, data)
}
if err != nil {
ed.deliveryResponse(&core.EventDeliveryResponse{ID: event.ID, Rejected: true})
}
case <-ed.ctx.Done():
return
}
368 changes: 177 additions & 191 deletions internal/events/event_dispatcher_test.go

Large diffs are not rendered by default.

40 changes: 25 additions & 15 deletions internal/events/event_poller.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
@@ -196,7 +196,12 @@ func (ep *eventPoller) eventLoop() {
close(ep.offsetCommitted)
}()

doBatchDelay := false
for {
if doBatchDelay {
ep.waitForBatchTimeout()
}

// Read messages from the DB - in an error condition we retry until success, or a closed context
events, err := ep.readPage()
if err != nil {
@@ -205,6 +210,15 @@ func (ep *eventPoller) eventLoop() {
}

eventCount := len(events)

// We might want to wait for the batch to fill - so we delay and re-poll
if ep.conf.eventBatchTimeout > 0 && !doBatchDelay && eventCount < ep.conf.eventBatchSize {
doBatchDelay = true
l.Tracef("Batch delay: detected=%d, batchSize=%d batchTimeout=%s", eventCount, ep.conf.eventBatchSize, ep.conf.eventBatchTimeout)
continue
}
doBatchDelay = false // clear any batch delay for next iteration

repoll := false
if eventCount > 0 {
// We process all the events in the page in a single database run group, and
@@ -280,6 +294,16 @@ func (ep *eventPoller) shoulderTap() {
}
}

func (ep *eventPoller) waitForBatchTimeout() {
// For throughput optimized environments, we can set an eventBatchingTimeout to allow
// dispatching of incomplete batches at a shorter timeout than the
// long timeout between polling cycles (at the cost of some dispatch latency)
select {
case <-time.After(ep.conf.eventBatchTimeout):
case <-ep.ctx.Done():
}
}

func (ep *eventPoller) waitForShoulderTapOrPollTimeout(lastEventCount int) bool {
l := log.L(ep.ctx)
longTimeoutDuration := ep.conf.eventPollTimeout
@@ -289,20 +313,6 @@ func (ep *eventPoller) waitForShoulderTapOrPollTimeout(lastEventCount int) bool
return true
}

// For throughput optimized environments, we can set an eventBatchingTimeout to allow messages to arrive
// between polling cycles (at the cost of some dispatch latency)
if ep.conf.eventBatchTimeout > 0 && lastEventCount > 0 {
shortTimeout := time.NewTimer(ep.conf.eventBatchTimeout)
select {
case <-shortTimeout.C:
l.Tracef("Woken after batch timeout")
case <-ep.ctx.Done():
l.Debugf("Exiting due to cancelled context")
return false
}
longTimeoutDuration -= ep.conf.eventBatchTimeout
}

longTimeout := time.NewTimer(longTimeoutDuration)
select {
case <-longTimeout.C:
61 changes: 60 additions & 1 deletion internal/events/event_poller_test.go
Original file line number Diff line number Diff line change
@@ -36,7 +36,7 @@ func newTestEventPoller(t *testing.T, mdi *databasemocks.Plugin, neh newEventsHa
ctx, cancel := context.WithCancel(context.Background())
ep = newEventPoller(ctx, mdi, newEventNotifier(ctx, "ut"), &eventPollerConf{
eventBatchSize: 10,
eventBatchTimeout: 1 * time.Millisecond,
eventBatchTimeout: 0, // customized for individual tests that enable this
eventPollTimeout: 10 * time.Second,
startupOffsetRetryAttempts: 1,
retry: retry.Retry{
@@ -219,6 +219,57 @@ func TestReadPageSingleCommitEvent(t *testing.T) {
mdi.AssertExpectations(t)
}

func TestReadPageBatchTimeoutNotFull(t *testing.T) {
mdi := &databasemocks.Plugin{}
processEventCalled := make(chan []core.LocallySequenced, 1)
ep, cancel := newTestEventPoller(t, mdi, func(events []core.LocallySequenced) (bool, error) {
processEventCalled <- events
return false, nil
}, nil)
ep.conf.eventBatchTimeout = 1 * time.Microsecond
ep.conf.eventBatchSize = 3
ev1 := core.NewEvent(core.EventTypeMessageConfirmed, "ns1", fftypes.NewUUID(), nil, "")
ev2 := core.NewEvent(core.EventTypeMessageConfirmed, "ns1", fftypes.NewUUID(), nil, "")
mdi.On("GetEvents", mock.Anything, "unit", mock.Anything).Return([]*core.Event{ev1}, nil, nil).Once() // half batch
mdi.On("GetEvents", mock.Anything, "unit", mock.Anything).Return([]*core.Event{ev1, ev2}, nil, nil).Run(func(args mock.Arguments) {
ep.shoulderTap()
}).Once()
mdi.On("GetEvents", mock.Anything, "unit", mock.Anything).Return(nil, nil, fmt.Errorf("context done")).Run(func(args mock.Arguments) {
cancel()
})
ep.eventLoop()

events := <-processEventCalled
assert.Equal(t, *ev1.ID, *events[0].(*core.Event).ID)
assert.Equal(t, *ev2.ID, *events[1].(*core.Event).ID)
mdi.AssertExpectations(t)
}

func TestReadPageBatchFull(t *testing.T) {
mdi := &databasemocks.Plugin{}
processEventCalled := make(chan []core.LocallySequenced, 1)
ep, cancel := newTestEventPoller(t, mdi, func(events []core.LocallySequenced) (bool, error) {
processEventCalled <- events
return false, nil
}, nil)
ep.conf.eventBatchTimeout = 1 * time.Microsecond
ep.conf.eventBatchSize = 2
ev1 := core.NewEvent(core.EventTypeMessageConfirmed, "ns1", fftypes.NewUUID(), nil, "")
ev2 := core.NewEvent(core.EventTypeMessageConfirmed, "ns1", fftypes.NewUUID(), nil, "")
mdi.On("GetEvents", mock.Anything, "unit", mock.Anything).Return([]*core.Event{ev1, ev2}, nil, nil).Run(func(args mock.Arguments) {
ep.shoulderTap()
}).Once()
mdi.On("GetEvents", mock.Anything, "unit", mock.Anything).Return(nil, nil, fmt.Errorf("context done")).Run(func(args mock.Arguments) {
cancel()
})
ep.eventLoop()

events := <-processEventCalled
assert.Equal(t, *ev1.ID, *events[0].(*core.Event).ID)
assert.Equal(t, *ev2.ID, *events[1].(*core.Event).ID)
mdi.AssertExpectations(t)
}

func TestReadPageRewind(t *testing.T) {
mdi := &databasemocks.Plugin{}
processEventCalled := make(chan core.LocallySequenced, 1)
@@ -325,6 +376,14 @@ func TestDoubleTap(t *testing.T) {
ep.shoulderTap() // this should not block
}

func TestWaitForBatchTimeoutClosedContext(t *testing.T) {
mdi := &databasemocks.Plugin{}
ep, cancel := newTestEventPoller(t, mdi, nil, nil)
ep.conf.eventBatchTimeout = 1 * time.Minute
cancel()
ep.waitForBatchTimeout()
}

func TestDoubleConfirm(t *testing.T) {
mdi := &databasemocks.Plugin{}
ep, cancel := newTestEventPoller(t, mdi, nil, nil)
20 changes: 19 additions & 1 deletion internal/events/subscription_manager.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
@@ -20,6 +20,7 @@ import (
"context"
"regexp"
"sync"
"time"

"github.com/hyperledger/firefly-common/pkg/config"
"github.com/hyperledger/firefly-common/pkg/fftypes"
@@ -90,6 +91,9 @@ type subscriptionManager struct {
newOrUpdatedSubscriptions chan *fftypes.UUID
deletedSubscriptions chan *fftypes.UUID
retry retry.Retry

defaultBatchSize uint16
defaultBatchTimeout time.Duration
}

func newSubscriptionManager(ctx context.Context, ns *core.Namespace, enricher *eventEnricher, di database.Plugin, dm data.Manager, en *eventNotifier, bm broadcast.Manager, pm privatemessaging.Manager, txHelper txcommon.Helper, transports map[string]events.Plugin) (*subscriptionManager, error) {
@@ -116,6 +120,8 @@ func newSubscriptionManager(ctx context.Context, ns *core.Namespace, enricher *e
MaximumDelay: config.GetDuration(coreconfig.SubscriptionsRetryMaxDelay),
Factor: config.GetFloat64(coreconfig.SubscriptionsRetryFactor),
},
defaultBatchSize: uint16(config.GetInt(coreconfig.SubscriptionDefaultsBatchSize)),
defaultBatchTimeout: config.GetDuration(coreconfig.SubscriptionDefaultsBatchTimeout),
}

for _, ei := range sm.transports {
@@ -270,6 +276,18 @@ func (sm *subscriptionManager) parseSubscriptionDef(ctx context.Context, subDef
subDef.Options.TLSConfig = sm.namespace.TLSConfigs[subDef.Options.TLSConfigName]
}

// Defaults that only apply in batch mode
if subDef.Options.Batch != nil && *subDef.Options.Batch {
if subDef.Options.ReadAhead == nil || *subDef.Options.ReadAhead == 0 {
defaultBatchSize := sm.defaultBatchSize
subDef.Options.ReadAhead = &defaultBatchSize
}
if subDef.Options.BatchTimeout == nil || *subDef.Options.BatchTimeout == "" {
defaultBatchTimeout := sm.defaultBatchTimeout.String()
subDef.Options.BatchTimeout = &defaultBatchTimeout
}
}

if err := transport.ValidateOptions(ctx, &subDef.Options); err != nil {
return nil, err
}
24 changes: 24 additions & 0 deletions internal/events/subscription_manager_test.go
Original file line number Diff line number Diff line change
@@ -551,6 +551,30 @@ func TestCreateSubscriptionSuccessTLSConfig(t *testing.T) {
assert.NotNil(t, sub.definition.Options.TLSConfig)
}

func TestCreateSubscriptionSuccessBatch(t *testing.T) {
coreconfig.Reset()

mei := &eventsmocks.Plugin{}
sm, cancel := newTestSubManager(t, mei)
defer cancel()

mei.On("GetFFRestyConfig", mock.Anything).Return(&ffresty.Config{})
mei.On("ValidateOptions", mock.Anything, mock.Anything).Return(nil)
truthy := true
sub, err := sm.parseSubscriptionDef(sm.ctx, &core.Subscription{
Options: core.SubscriptionOptions{
SubscriptionCoreOptions: core.SubscriptionCoreOptions{
Batch: &truthy,
},
},
Transport: "ut",
})
assert.NoError(t, err)

assert.Equal(t, uint16(50), *sub.definition.Options.ReadAhead)
assert.Equal(t, "50ms", *sub.definition.Options.BatchTimeout)
}

func TestCreateSubscriptionWithDeprecatedFilters(t *testing.T) {
mei := &eventsmocks.Plugin{}
sm, cancel := newTestSubManager(t, mei)
3 changes: 1 addition & 2 deletions internal/events/websockets/config.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
@@ -32,5 +32,4 @@ const (
func (ws *WebSockets) InitConfig(config config.Section) {
config.AddKnownKey(ReadBufferSize, bufferSizeDefault)
config.AddKnownKey(WriteBufferSize, bufferSizeDefault)

}
114 changes: 108 additions & 6 deletions internal/events/websockets/websocket_connection.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
@@ -21,6 +21,8 @@ import (
"encoding/json"
"io"
"net/http"
"net/url"
"strconv"
"sync"
"time"

@@ -49,6 +51,7 @@ type websocketConnection struct {
autoAck bool
started []*websocketStartedSub
inflight []*core.EventDeliveryResponse
inflightBatches []*core.WSEventBatch
mux sync.Mutex
closed bool
remoteAddr string
@@ -94,28 +97,59 @@ func (wc *websocketConnection) assertNamespace(namespace string) (string, error)
return namespace, nil
}

func isBoolQuerySet(query url.Values, boolOption string) bool {
optionValues, hasOptionValues := query[boolOption]
return hasOptionValues && (len(optionValues) == 0 || optionValues[0] != "false")
}

func (wc *websocketConnection) getReadAhead(query url.Values, isBatch bool) *uint16 {
readaheadStr := query.Get("readahead")
if readaheadStr != "" {
readAheadInt, err := strconv.ParseUint(readaheadStr, 10, 16)
if err == nil {
readahead := uint16(readAheadInt)
return &readahead
}
}
return nil
}

func (wc *websocketConnection) getBatchTimeout(query url.Values) *string {
batchTimeout := query.Get("batchtimeout")
if batchTimeout != "" {
return &batchTimeout
}
return nil
}

// processAutoStart gives a helper to specify query parameters to auto-start your subscription
func (wc *websocketConnection) processAutoStart(req *http.Request) {
query := req.URL.Query()
ephemeral, hasEphemeral := req.URL.Query()["ephemeral"]
isEphemeral := hasEphemeral && (len(ephemeral) == 0 || ephemeral[0] != "false")
isEphemeral := isBoolQuerySet(query, "ephemeral")
_, hasName := query["name"]
autoAck, hasAutoack := req.URL.Query()["autoack"]
isAutoack := hasAutoack && (len(autoAck) == 0 || autoAck[0] != "false")
isAutoack := isBoolQuerySet(query, "autoack")
namespace, err := wc.assertNamespace(query.Get("namespace"))
if err != nil {
wc.protocolError(err)
return
}

if hasEphemeral || hasName {
if isEphemeral || hasName {
isBatch := isBoolQuerySet(query, "batch")
filter := core.NewSubscriptionFilterFromQuery(query)
err := wc.handleStart(&core.WSStart{
AutoAck: &isAutoack,
Ephemeral: isEphemeral,
Namespace: namespace,
Name: query.Get("name"),
Filter: filter,
Options: core.SubscriptionOptions{
SubscriptionCoreOptions: core.SubscriptionCoreOptions{
Batch: &isBatch,
BatchTimeout: wc.getBatchTimeout(query),
ReadAhead: wc.getReadAhead(query, isBatch),
},
},
})
if err != nil {
wc.protocolError(err)
@@ -231,6 +265,54 @@ func (wc *websocketConnection) dispatch(event *core.EventDelivery) error {
return nil
}

func (wc *websocketConnection) dispatchBatch(sub *core.Subscription, events []*core.CombinedEventDataDelivery) error {
inflightBatch := &core.WSEventBatch{
Type: core.WSEventBatchType,
ID: fftypes.NewUUID(),
Events: make([]*core.EventDelivery, len(events)),
}
if sub != nil {
inflightBatch.Subscription = sub.SubscriptionRef
}
for i, e := range events {
// For ephemeral there's no sub, so we pick up from first event
if inflightBatch.Subscription.Namespace == "" {
inflightBatch.Subscription = e.Event.Subscription
}
inflightBatch.Events[i] = e.Event
}

var autoAck bool
wc.mux.Lock()
autoAck = wc.autoAck
if !autoAck {
wc.inflightBatches = append(wc.inflightBatches, inflightBatch)
}
wc.mux.Unlock()

err := wc.send(inflightBatch)
if err != nil {
return err
}

if autoAck {
wc.ackBatch(inflightBatch)
}

return nil
}

func (wc *websocketConnection) ackBatch(batch *core.WSEventBatch) {
for _, e := range batch.Events {
// We individually drive an ack back on each event, but do so in one pass
// (this matches the webhook implementation of batching).
wc.ws.ack(wc.connID, &core.EventDeliveryResponse{
ID: e.ID,
Subscription: batch.Subscription,
})
}
}

func (wc *websocketConnection) protocolError(err error) {
log.L(wc.ctx).Errorf("Sending protocol error to client: %s", err)
sendErr := wc.send(&core.WSError{
@@ -309,6 +391,22 @@ func (wc *websocketConnection) durableSubMatcher(sr core.SubscriptionRef) bool {
return false
}

func (wc *websocketConnection) handleBatchAck(ack *core.WSAck) (handled bool) {
wc.mux.Lock()
defer wc.mux.Unlock()
var newInflightBatches []*core.WSEventBatch
for _, batch := range wc.inflightBatches {
if batch.ID.Equals(ack.ID) { // nil safe check
wc.ackBatch(batch)
handled = true
} else {
newInflightBatches = append(newInflightBatches, batch)
}
}
wc.inflightBatches = newInflightBatches
return handled
}

func (wc *websocketConnection) checkAck(ack *core.WSAck) (*core.EventDeliveryResponse, error) {
l := log.L(wc.ctx)
var inflight *core.EventDeliveryResponse
@@ -363,6 +461,10 @@ func (wc *websocketConnection) checkAck(ack *core.WSAck) (*core.EventDeliveryRes
}

func (wc *websocketConnection) handleAck(ack *core.WSAck) error {
if handled := wc.handleBatchAck(ack); handled {
return nil
}

// Perform a locked set of check
inflight, err := wc.checkAck(ack)
if err != nil {
19 changes: 13 additions & 6 deletions internal/events/websockets/websockets.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
@@ -54,9 +54,11 @@ func (ws *WebSockets) Name() string { return "websockets" }

func (ws *WebSockets) Init(ctx context.Context, config config.Section) error {
*ws = WebSockets{
ctx: ctx,
connections: make(map[string]*websocketConnection),
capabilities: &events.Capabilities{},
ctx: ctx,
connections: make(map[string]*websocketConnection),
capabilities: &events.Capabilities{
BatchDelivery: true,
},
callbacks: callbacks{
handlers: make(map[string]events.Callbacks),
},
@@ -242,6 +244,11 @@ func (ws *WebSockets) GetStatus() *core.WebSocketStatus {
}

func (ws *WebSockets) BatchDeliveryRequest(ctx context.Context, connID string, sub *core.Subscription, events []*core.CombinedEventDataDelivery) error {
// We should have rejected creation of the subscription, due to us not supporting this in our capabilities
return i18n.NewError(ctx, coremsgs.MsgBatchDeliveryNotSupported, ws.Name())
ws.connMux.Lock()
conn, ok := ws.connections[connID]
ws.connMux.Unlock()
if !ok {
return i18n.NewError(ctx, coremsgs.MsgWSConnectionNotActive, connID)
}
return conn.dispatchBatch(sub, events)
}
361 changes: 300 additions & 61 deletions internal/events/websockets/websockets_test.go

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions internal/namespace/configreload.go
Original file line number Diff line number Diff line change
@@ -61,6 +61,8 @@ func (nm *namespaceManager) configFileChanged() {
}

func (nm *namespaceManager) configReloaded(ctx context.Context) {
// Always make sure log level is up to date
log.SetLevel(config.GetString(config.LogLevel))

// Get Viper to dump the whole new config, with everything resolved across env vars
// and the config file etc.
9 changes: 8 additions & 1 deletion pkg/core/subscription.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
@@ -88,6 +88,7 @@ const (
)

// SubscriptionCoreOptions are the core options that apply across all transports
// REMEMBER TO ADD OPTIONS HERE TO MarshalJSON()
type SubscriptionCoreOptions struct {
FirstEvent *SubOptsFirstEvent `ffstruct:"SubscriptionCoreOptions" json:"firstEvent,omitempty"`
ReadAhead *uint16 `ffstruct:"SubscriptionCoreOptions" json:"readAhead,omitempty"`
@@ -167,6 +168,12 @@ func (so SubscriptionOptions) MarshalJSON() ([]byte, error) {
if so.TLSConfigName != "" {
so.additionalOptions["tlsConfigName"] = so.TLSConfigName
}
if so.Batch != nil {
so.additionalOptions["batch"] = so.Batch
}
if so.BatchTimeout != nil {
so.additionalOptions["batchTimeout"] = so.BatchTimeout
}

return json.Marshal(&so.additionalOptions)
}
22 changes: 18 additions & 4 deletions pkg/core/subscription_test.go
Original file line number Diff line number Diff line change
@@ -28,12 +28,15 @@ func TestSubscriptionOptionsDatabaseSerialization(t *testing.T) {
firstEvent := SubOptsFirstEventNewest
readAhead := uint16(50)
yes := true
oneSec := "1s"
sub1 := &Subscription{
Options: SubscriptionOptions{
SubscriptionCoreOptions: SubscriptionCoreOptions{
FirstEvent: &firstEvent,
ReadAhead: &readAhead,
WithData: &yes,
FirstEvent: &firstEvent,
ReadAhead: &readAhead,
WithData: &yes,
Batch: &yes,
BatchTimeout: &oneSec,
},
WebhookSubOptions: WebhookSubOptions{
TLSConfigName: "myconfig",
@@ -49,7 +52,18 @@ func TestSubscriptionOptionsDatabaseSerialization(t *testing.T) {
// Verify it serializes as bytes to the database
b1, err := sub1.Options.Value()
assert.NoError(t, err)
assert.Equal(t, `{"firstEvent":"newest","my-nested-opts":{"myopt1":12345,"myopt2":"test"},"readAhead":50,"tlsConfigName":"myconfig","withData":true}`, string(b1.([]byte)))
assert.JSONEq(t, `{
"firstEvent":"newest",
"my-nested-opts":{
"myopt1":12345,
"myopt2":"test"
},
"readAhead":50,
"tlsConfigName":"myconfig",
"withData":true,
"batch":true,
"batchTimeout":"1s"
}`, string(b1.([]byte)))

f1, err := sub1.Filter.Value()
assert.NoError(t, err)
14 changes: 13 additions & 1 deletion pkg/core/websocket_actions.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
@@ -29,6 +29,9 @@ var (

// WSProtocolErrorEventType is a special event "type" field for server to send the client, if it performs a ProtocolError
WSProtocolErrorEventType = fftypes.FFEnumValue("wstype", "protocol_error")

// WSEventBatchType is the type set when the message contains an array of events
WSEventBatchType = fftypes.FFEnumValue("wstype", "event_batch")
)

// WSActionBase is the base fields of all client actions sent on the websocket
@@ -61,3 +64,12 @@ type WSError struct {
Type WSClientPayloadType `ffstruct:"WSAck" json:"type" ffenum:"wstype"`
Error string `ffstruct:"WSAck" json:"error"`
}

// WSEventBatch is used when batched delivery is enabled over the websocket, allowing
// an array of events to be ack'd as a whole (rather than ack'ing individually)
type WSEventBatch struct {
Type WSClientPayloadType `ffstruct:"WSEventBatch" json:"type" ffenum:"wstype"`
ID *fftypes.UUID `ffstruct:"WSEventBatch" json:"id"`
Subscription SubscriptionRef `ffstruct:"WSEventBatch" json:"subscription"`
Events []*EventDelivery `ffstruct:"WSEventBatch" json:"events"`
}