Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PR 1424 and 1447 #100

Merged
merged 10 commits into from
Jan 19, 2024
7 changes: 4 additions & 3 deletions docs/reference/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ nav_order: 2
|Key|Description|Type|Default Value|
|---|-----------|----|-------------|
|batchSize|The maximum number of records to read from the DB before performing an aggregation run|[`BytesSize`](https://pkg.go.dev/github.com/docker/go-units#BytesSize)|`200`
|batchTimeout|How long to wait for new events to arrive before performing aggregation on a page of events|[`time.Duration`](https://pkg.go.dev/time#Duration)|`250ms`
|batchTimeout|How long to wait for new events to arrive before performing aggregation on a page of events|[`time.Duration`](https://pkg.go.dev/time#Duration)|`0ms`
|firstEvent|The first event the aggregator should process, if no previous offest is stored in the DB. Valid options are `oldest` or `newest`|`string`|`oldest`
|pollTimeout|The time to wait without a notification of new events, before trying a select on the table|[`time.Duration`](https://pkg.go.dev/time#Duration)|`30s`
|rewindQueryLimit|Safety limit on the maximum number of records to search when performing queries to search for rewinds|`int`|`1000`
Expand All @@ -249,7 +249,7 @@ nav_order: 2

|Key|Description|Type|Default Value|
|---|-----------|----|-------------|
|batchTimeout|A short time to wait for new events to arrive before re-polling for new events|[`time.Duration`](https://pkg.go.dev/time#Duration)|`250ms`
|batchTimeout|A short time to wait for new events to arrive before re-polling for new events|[`time.Duration`](https://pkg.go.dev/time#Duration)|`0ms`
|bufferLength|The number of events + attachments an individual dispatcher should hold in memory ready for delivery to the subscription|`int`|`5`
|pollTimeout|The time to wait without a notification of new events, before trying a select on the table|[`time.Duration`](https://pkg.go.dev/time#Duration)|`30s`

Expand Down Expand Up @@ -1356,7 +1356,8 @@ nav_order: 2

|Key|Description|Type|Default Value|
|---|-----------|----|-------------|
|batchSize|Default read ahead to enable for subscriptions that do not explicitly configure readahead|`int`|`0`
|batchSize|Default read ahead to enable for subscriptions that do not explicitly configure readahead|`int`|`50`
|batchTimeout|Default batch timeout|`int`|`50ms`

## subscription.retry

Expand Down
2 changes: 1 addition & 1 deletion docs/reference/types/wsack.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ nav_order: 24

| Field Name | Description | Type |
|------------|-------------|------|
| `type` | WSActionBase.type | `FFEnum`:<br/>`"start"`<br/>`"ack"`<br/>`"protocol_error"` |
| `type` | WSActionBase.type | `FFEnum`:<br/>`"start"`<br/>`"ack"`<br/>`"protocol_error"`<br/>`"event_batch"` |
| `id` | WSAck.id | [`UUID`](simpletypes#uuid) |
| `subscription` | WSAck.subscription | [`SubscriptionRef`](#subscriptionref) |

Expand Down
2 changes: 1 addition & 1 deletion docs/reference/types/wserror.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,6 @@ nav_order: 25

| Field Name | Description | Type |
|------------|-------------|------|
| `type` | WSAck.type | `FFEnum`:<br/>`"start"`<br/>`"ack"`<br/>`"protocol_error"` |
| `type` | WSAck.type | `FFEnum`:<br/>`"start"`<br/>`"ack"`<br/>`"protocol_error"`<br/>`"event_batch"` |
| `error` | WSAck.error | `string` |

2 changes: 1 addition & 1 deletion docs/reference/types/wsstart.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ nav_order: 23

| Field Name | Description | Type |
|------------|-------------|------|
| `type` | WSActionBase.type | `FFEnum`:<br/>`"start"`<br/>`"ack"`<br/>`"protocol_error"` |
| `type` | WSActionBase.type | `FFEnum`:<br/>`"start"`<br/>`"ack"`<br/>`"protocol_error"`<br/>`"event_batch"` |
| `autoack` | WSStart.autoack | `bool` |
| `namespace` | WSStart.namespace | `string` |
| `name` | WSStart.name | `string` |
Expand Down
15 changes: 9 additions & 6 deletions internal/coreconfig/coreconfig.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -327,8 +327,10 @@ var (
OrgDescription = ffc("org.description")
// OrchestratorStartupAttempts is how many time to attempt to connect to core infrastructure on startup
OrchestratorStartupAttempts = ffc("orchestrator.startupAttempts")
// SubscriptionDefaultsReadAhead default read ahead to enable for subscriptions that do not explicitly configure readahead
SubscriptionDefaultsReadAhead = ffc("subscription.defaults.batchSize")
// SubscriptionDefaultsBatchSize default read ahead to enable for subscriptions that do not explicitly configure readahead
SubscriptionDefaultsBatchSize = ffc("subscription.defaults.batchSize")
// SubscriptionDefaultsBatchTimeout default batch timeout
SubscriptionDefaultsBatchTimeout = ffc("subscription.defaults.batchTimeout")
// SubscriptionMax maximum number of pre-defined subscriptions that can exist (note for high fan-out consider connecting a dedicated pub/sub broker to the dispatcher)
SubscriptionMax = ffc("subscription.max")
// SubscriptionsRetryInitialDelay is the initial retry delay
Expand Down Expand Up @@ -404,7 +406,7 @@ func setDefaults() {
viper.SetDefault(string(DownloadRetryFactor), 2.0)
viper.SetDefault(string(EventAggregatorFirstEvent), core.SubOptsFirstEventOldest)
viper.SetDefault(string(EventAggregatorBatchSize), 200)
viper.SetDefault(string(EventAggregatorBatchTimeout), "250ms")
viper.SetDefault(string(EventAggregatorBatchTimeout), "0ms")
viper.SetDefault(string(EventAggregatorPollTimeout), "30s")
viper.SetDefault(string(EventAggregatorRewindTimeout), "50ms")
viper.SetDefault(string(EventAggregatorRewindQueueLength), 10)
Expand All @@ -414,7 +416,7 @@ func setDefaults() {
viper.SetDefault(string(EventAggregatorRetryMaxDelay), "30s")
viper.SetDefault(string(EventDBEventsBufferSize), 100)
viper.SetDefault(string(EventDispatcherBufferLength), 5)
viper.SetDefault(string(EventDispatcherBatchTimeout), "250ms")
viper.SetDefault(string(EventDispatcherBatchTimeout), "0ms")
viper.SetDefault(string(EventDispatcherPollTimeout), "30s")
viper.SetDefault(string(EventTransportsEnabled), []string{"websockets", "webhooks"})
viper.SetDefault(string(EventTransportsDefault), "websockets")
Expand Down Expand Up @@ -451,7 +453,8 @@ func setDefaults() {
viper.SetDefault(string(PrivateMessagingBatchSize), 200)
viper.SetDefault(string(PrivateMessagingBatchTimeout), "1s")
viper.SetDefault(string(PrivateMessagingBatchPayloadLimit), "800Kb")
viper.SetDefault(string(SubscriptionDefaultsReadAhead), 0)
viper.SetDefault(string(SubscriptionDefaultsBatchSize), 50)
viper.SetDefault(string(SubscriptionDefaultsBatchTimeout), "50ms")
viper.SetDefault(string(SubscriptionMax), 500)
viper.SetDefault(string(SubscriptionsRetryInitialDelay), "250ms")
viper.SetDefault(string(SubscriptionsRetryMaxDelay), "30s")
Expand Down
7 changes: 4 additions & 3 deletions internal/coremsgs/en_config_descriptions.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -383,8 +383,9 @@ var (
ConfigPluginSharedstorageIpfsGatewayURL = ffc("config.plugins.sharedstorage[].ipfs.gateway.url", "The URL for the IPFS Gateway", urlStringType)
ConfigPluginSharedstorageIpfsGatewayProxyURL = ffc("config.plugins.sharedstorage[].ipfs.gateway.proxy.url", "Optional HTTP proxy server to use when connecting to the IPFS Gateway", urlStringType)

ConfigSubscriptionMax = ffc("config.subscription.max", "The maximum number of pre-defined subscriptions that can exist (note for high fan-out consider connecting a dedicated pub/sub broker to the dispatcher)", i18n.IntType)
ConfigSubscriptionDefaultsBatchSize = ffc("config.subscription.defaults.batchSize", "Default read ahead to enable for subscriptions that do not explicitly configure readahead", i18n.IntType)
ConfigSubscriptionMax = ffc("config.subscription.max", "The maximum number of pre-defined subscriptions that can exist (note for high fan-out consider connecting a dedicated pub/sub broker to the dispatcher)", i18n.IntType)
ConfigSubscriptionDefaultsBatchSize = ffc("config.subscription.defaults.batchSize", "Default read ahead to enable for subscriptions that do not explicitly configure readahead", i18n.IntType)
ConfigSubscriptionDefaultsBatchTimeout = ffc("config.subscription.defaults.batchTimeout", "Default batch timeout", i18n.IntType)

ConfigTokensName = ffc("config.tokens[].name", "A name to identify this token plugin", i18n.StringType)
ConfigTokensPlugin = ffc("config.tokens[].plugin", "The type of the token plugin to use", i18n.StringType)
Expand Down
153 changes: 69 additions & 84 deletions internal/events/event_dispatcher.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -65,25 +65,21 @@ type eventDispatcher struct {
elected bool
eventPoller *eventPoller
inflight map[fftypes.UUID]*core.Event
eventDelivery chan *core.EventDelivery
eventDelivery chan []*core.EventDelivery
mux sync.Mutex
namespace string
readAhead int
batch bool
batchTimeout time.Duration
subscription *subscription
txHelper txcommon.Helper
}

func newEventDispatcher(ctx context.Context, enricher *eventEnricher, ei events.Plugin, di database.Plugin, dm data.Manager, bm broadcast.Manager, pm privatemessaging.Manager, connID string, sub *subscription, en *eventNotifier, txHelper txcommon.Helper) *eventDispatcher {
ctx, cancelCtx := context.WithCancel(ctx)
readAhead := config.GetUint(coreconfig.SubscriptionDefaultsReadAhead)
readAhead := uint(0)
if sub.definition.Options.ReadAhead != nil {
readAhead = uint(*sub.definition.Options.ReadAhead)
}
if readAhead > maxReadAhead {
readAhead = maxReadAhead
}

batchTimeout := defaultBatchTimeout
if sub.definition.Options.BatchTimeout != nil && *sub.definition.Options.BatchTimeout != "" {
Expand All @@ -108,13 +104,12 @@ func newEventDispatcher(ctx context.Context, enricher *eventEnricher, ei events.
subscription: sub,
namespace: sub.definition.Namespace,
inflight: make(map[fftypes.UUID]*core.Event),
eventDelivery: make(chan *core.EventDelivery, readAhead+1),
eventDelivery: make(chan []*core.EventDelivery, readAhead+1),
readAhead: int(readAhead),
acksNacks: make(chan ackNack),
closed: make(chan struct{}),
txHelper: txHelper,
batch: batch,
batchTimeout: batchTimeout,
}

pollerConf := &eventPollerConf{
Expand All @@ -138,6 +133,20 @@ func newEventDispatcher(ctx context.Context, enricher *eventEnricher, ei events.
firstEvent: sub.definition.Options.FirstEvent,
}

// Users can tune the batch related settings.
// This is always true in batch:true cases, and optionally you can use the batchTimeout setting
// to tweak how we optimize ourselves for readahead / latency detection without batching
// (most likely users with this requirement would be best to just move to batch:true).
if batchTimeout > 0 {
pollerConf.eventBatchTimeout = batchTimeout
if batchTimeout > pollerConf.eventPollTimeout {
pollerConf.eventPollTimeout = batchTimeout
}
}
if batch || pollerConf.eventBatchSize < int(readAhead) {
pollerConf.eventBatchSize = ed.readAhead
}

ed.eventPoller = newEventPoller(ctx, di, en, pollerConf)
return ed
}
Expand Down Expand Up @@ -165,11 +174,8 @@ func (ed *eventDispatcher) electAndStart() {
ed.elected = true
ed.eventPoller.start()

if ed.batch {
go ed.deliverBatchedEvents()
} else {
go ed.deliverEvents()
}
go ed.deliverEvents()

// Wait until the event poller closes
<-ed.eventPoller.closed
}
Expand Down Expand Up @@ -326,7 +332,15 @@ func (ed *eventDispatcher) bufferedDelivery(events []core.LocallySequenced) (boo
ed.mux.Unlock()

dispatched++
ed.eventDelivery <- event
if !ed.batch {
// dispatch individually
ed.eventDelivery <- []*core.EventDelivery{event}
}
}

if ed.batch && len(dispatchable) > 0 {
// Dispatch the whole batch now marked in-flight
ed.eventDelivery <- dispatchable
}

if inflightCount == 0 {
Expand Down Expand Up @@ -384,88 +398,59 @@ func (ed *eventDispatcher) handleAckOffsetUpdate(ack ackNack) {
}
}

func (ed *eventDispatcher) deliverBatchedEvents() {
func (ed *eventDispatcher) deliverEvents() {
withData := ed.subscription.definition.Options.WithData != nil && *ed.subscription.definition.Options.WithData

var events []*core.CombinedEventDataDelivery
var batchTimeoutContext context.Context
var batchTimeoutCancel func()
for {
var timeoutContext context.Context
var timedOut bool
if batchTimeoutContext != nil {
timeoutContext = batchTimeoutContext
} else {
timeoutContext = ed.ctx
}
select {
case event, ok := <-ed.eventDelivery:
case events, ok := <-ed.eventDelivery:
if !ok {
if batchTimeoutCancel != nil {
batchTimeoutCancel()
}
return
}

if events == nil {
events = []*core.CombinedEventDataDelivery{}
batchTimeoutContext, batchTimeoutCancel = context.WithTimeout(ed.ctx, ed.batchTimeout)
}

log.L(ed.ctx).Debugf("Dispatching %s event in a batch: %.10d/%s [%s]: ref=%s/%s", ed.transport.Name(), event.Sequence, event.ID, event.Type, event.Namespace, event.Reference)

var data []*core.Data
// As soon as we hit an error, we need to trigger into nack mode
var err error
if withData && event.Message != nil {
data, _, err = ed.data.GetMessageDataCached(ed.ctx, event.Message)
}

events = append(events, &core.CombinedEventDataDelivery{Event: event, Data: data})

if err != nil {
ed.deliveryResponse(&core.EventDeliveryResponse{ID: event.ID, Rejected: true})
}

case <-timeoutContext.Done():
timedOut = true
case <-ed.ctx.Done():
if batchTimeoutCancel != nil {
batchTimeoutCancel()
}
return
}

if len(events) == ed.readAhead || (timedOut && len(events) > 0) {
_ = ed.transport.BatchDeliveryRequest(ed.ctx, ed.connID, ed.subscription.definition, events)
// If err handle all the delivery responses for all the events??
events = nil
}
}
}

// TODO issue here, we can't just call DeliveryRequest with one thing.
func (ed *eventDispatcher) deliverEvents() {
withData := ed.subscription.definition.Options.WithData != nil && *ed.subscription.definition.Options.WithData
for {
select {
case event, ok := <-ed.eventDelivery:
if !ok {
return
// Loop through the events enriching them, and dispatching individually in non-batch mode
eventsWithData := make([]*core.CombinedEventDataDelivery, len(events))
for i := 0; i < len(events); i++ {
e := &core.CombinedEventDataDelivery{
Event: events[i],
}
eventsWithData[i] = e
// The first error we encounter stops us attempting to enrich or dispatch any more events
if err == nil {
log.L(ed.ctx).Debugf("Dispatching %s event: %.10d/%s [%s]: ref=%s/%s", ed.transport.Name(), e.Event.Sequence, e.Event.ID, e.Event.Type, e.Event.Namespace, e.Event.Reference)
if withData && e.Event.Message != nil {
e.Data, _, err = ed.data.GetMessageDataCached(ed.ctx, e.Event.Message)
}
}
// If we are non-batched, we have to deliver each event individually...
if !ed.batch {
// .. only attempt to deliver if we've not triggered into an error scenario for one of the events already
if err == nil {
err = ed.transport.DeliveryRequest(ed.ctx, ed.connID, ed.subscription.definition, e.Event, e.Data)
}
// ... if we've triggered into an error scenario, we need to nack immediately for this and all the rest of the events
if err != nil {
ed.deliveryResponse(&core.EventDeliveryResponse{ID: e.Event.ID, Rejected: true})
}
}
}

log.L(ed.ctx).Debugf("Dispatching %s event: %.10d/%s [%s]: ref=%s/%s", ed.transport.Name(), event.Sequence, event.ID, event.Type, event.Namespace, event.Reference)
var data []*core.Data
var err error
if withData && event.Message != nil {
data, _, err = ed.data.GetMessageDataCached(ed.ctx, event.Message)
// In batch mode we do one dispatch of the whole set as one
if ed.batch {
// Only attempt to deliver if we're in a non error case (enrich might have failed above)
if err == nil {
err = ed.transport.BatchDeliveryRequest(ed.ctx, ed.connID, ed.subscription.definition, eventsWithData)
}
// If we're in an error case we have to nack everything immediately
if err != nil {
for _, e := range events {
ed.deliveryResponse(&core.EventDeliveryResponse{ID: e.Event.ID, Rejected: true})
}
}
}

if err == nil {
err = ed.transport.DeliveryRequest(ed.ctx, ed.connID, ed.subscription.definition, event, data)
}
if err != nil {
ed.deliveryResponse(&core.EventDeliveryResponse{ID: event.ID, Rejected: true})
}
case <-ed.ctx.Done():
return
}
Expand Down
Loading