diff --git a/docs/reference/config.md b/docs/reference/config.md
index 5e248953ea..15509b3601 100644
--- a/docs/reference/config.md
+++ b/docs/reference/config.md
@@ -224,7 +224,7 @@ nav_order: 2
 |Key|Description|Type|Default Value|
 |---|-----------|----|-------------|
 |batchSize|The maximum number of records to read from the DB before performing an aggregation run|[`BytesSize`](https://pkg.go.dev/github.com/docker/go-units#BytesSize)|`200`
-|batchTimeout|How long to wait for new events to arrive before performing aggregation on a page of events|[`time.Duration`](https://pkg.go.dev/time#Duration)|`250ms`
+|batchTimeout|How long to wait for new events to arrive before performing aggregation on a page of events|[`time.Duration`](https://pkg.go.dev/time#Duration)|`0ms`
 |firstEvent|The first event the aggregator should process, if no previous offest is stored in the DB. Valid options are `oldest` or `newest`|`string`|`oldest`
 |pollTimeout|The time to wait without a notification of new events, before trying a select on the table|[`time.Duration`](https://pkg.go.dev/time#Duration)|`30s`
 |rewindQueryLimit|Safety limit on the maximum number of records to search when performing queries to search for rewinds|`int`|`1000`
@@ -249,7 +249,7 @@ nav_order: 2
 
 |Key|Description|Type|Default Value|
 |---|-----------|----|-------------|
-|batchTimeout|A short time to wait for new events to arrive before re-polling for new events|[`time.Duration`](https://pkg.go.dev/time#Duration)|`250ms`
+|batchTimeout|A short time to wait for new events to arrive before re-polling for new events|[`time.Duration`](https://pkg.go.dev/time#Duration)|`0ms`
 |bufferLength|The number of events + attachments an individual dispatcher should hold in memory ready for delivery to the subscription|`int`|`5`
 |pollTimeout|The time to wait without a notification of new events, before trying a select on the table|[`time.Duration`](https://pkg.go.dev/time#Duration)|`30s`
 
@@ -1356,7 +1356,8 @@ nav_order: 2
 
 |Key|Description|Type|Default Value|
 |---|-----------|----|-------------|
-|batchSize|Default read ahead to enable for subscriptions that do not explicitly configure readahead|`int`|`0`
+|batchSize|Default read ahead to enable for subscriptions that do not explicitly configure readahead|`int`|`50`
+|batchTimeout|Default batch timeout|`int`|`50ms`
 
 ## subscription.retry
 
diff --git a/docs/reference/types/wsack.md b/docs/reference/types/wsack.md
index 5b23853316..fc44ff2f07 100644
--- a/docs/reference/types/wsack.md
+++ b/docs/reference/types/wsack.md
@@ -37,7 +37,7 @@ nav_order: 24
 
 | Field Name | Description | Type |
 |------------|-------------|------|
-| `type` | WSActionBase.type | `FFEnum`:<br/>`"start"`<br/>`"ack"`<br/>`"protocol_error"` |
+| `type` | WSActionBase.type | `FFEnum`:<br/>`"start"`<br/>`"ack"`<br/>`"protocol_error"`<br/>`"event_batch"` |
 | `id` | WSAck.id | [`UUID`](simpletypes#uuid) |
 | `subscription` | WSAck.subscription | [`SubscriptionRef`](#subscriptionref) |
 
diff --git a/docs/reference/types/wserror.md b/docs/reference/types/wserror.md
index e7052b0a16..ed2cb06e1a 100644
--- a/docs/reference/types/wserror.md
+++ b/docs/reference/types/wserror.md
@@ -33,6 +33,6 @@ nav_order: 25
 
 | Field Name | Description | Type |
 |------------|-------------|------|
-| `type` | WSAck.type | `FFEnum`:<br/>`"start"`<br/>`"ack"`<br/>`"protocol_error"` |
+| `type` | WSAck.type | `FFEnum`:<br/>`"start"`<br/>`"ack"`<br/>`"protocol_error"`<br/>`"event_batch"` |
 | `error` | WSAck.error | `string` |
 
diff --git a/docs/reference/types/wsstart.md b/docs/reference/types/wsstart.md
index 3c606a79b9..f8f3798f77 100644
--- a/docs/reference/types/wsstart.md
+++ b/docs/reference/types/wsstart.md
@@ -42,7 +42,7 @@ nav_order: 23
 
 | Field Name | Description | Type |
 |------------|-------------|------|
-| `type` | WSActionBase.type | `FFEnum`:<br/>`"start"`<br/>`"ack"`<br/>`"protocol_error"` |
+| `type` | WSActionBase.type | `FFEnum`:<br/>`"start"`<br/>`"ack"`<br/>`"protocol_error"`<br/>`"event_batch"` |
 | `autoack` | WSStart.autoack | `bool` |
 | `namespace` | WSStart.namespace | `string` |
 | `name` | WSStart.name | `string` |
diff --git a/internal/coreconfig/coreconfig.go b/internal/coreconfig/coreconfig.go
index 1a8e904ece..6ae2b8a78e 100644
--- a/internal/coreconfig/coreconfig.go
+++ b/internal/coreconfig/coreconfig.go
@@ -1,4 +1,4 @@
-// Copyright © 2023 Kaleido, Inc.
+// Copyright © 2024 Kaleido, Inc.
 //
 // SPDX-License-Identifier: Apache-2.0
 //
@@ -327,8 +327,10 @@ var (
 	OrgDescription = ffc("org.description")
 	// OrchestratorStartupAttempts is how many time to attempt to connect to core infrastructure on startup
 	OrchestratorStartupAttempts = ffc("orchestrator.startupAttempts")
-	// SubscriptionDefaultsReadAhead default read ahead to enable for subscriptions that do not explicitly configure readahead
-	SubscriptionDefaultsReadAhead = ffc("subscription.defaults.batchSize")
+	// SubscriptionDefaultsBatchSize default read ahead to enable for subscriptions that do not explicitly configure readahead
+	SubscriptionDefaultsBatchSize = ffc("subscription.defaults.batchSize")
+	// SubscriptionDefaultsBatchTimeout default batch timeout
+	SubscriptionDefaultsBatchTimeout = ffc("subscription.defaults.batchTimeout")
 	// SubscriptionMax maximum number of pre-defined subscriptions that can exist (note for high fan-out consider connecting a dedicated pub/sub broker to the dispatcher)
 	SubscriptionMax = ffc("subscription.max")
 	// SubscriptionsRetryInitialDelay is the initial retry delay
@@ -404,7 +406,7 @@ func setDefaults() {
 	viper.SetDefault(string(DownloadRetryFactor), 2.0)
 	viper.SetDefault(string(EventAggregatorFirstEvent), core.SubOptsFirstEventOldest)
 	viper.SetDefault(string(EventAggregatorBatchSize), 200)
-	viper.SetDefault(string(EventAggregatorBatchTimeout), "250ms")
+	viper.SetDefault(string(EventAggregatorBatchTimeout), "0ms")
 	viper.SetDefault(string(EventAggregatorPollTimeout), "30s")
 	viper.SetDefault(string(EventAggregatorRewindTimeout), "50ms")
 	viper.SetDefault(string(EventAggregatorRewindQueueLength), 10)
@@ -414,7 +416,7 @@ func setDefaults() {
 	viper.SetDefault(string(EventAggregatorRetryMaxDelay), "30s")
 	viper.SetDefault(string(EventDBEventsBufferSize), 100)
 	viper.SetDefault(string(EventDispatcherBufferLength), 5)
-	viper.SetDefault(string(EventDispatcherBatchTimeout), "250ms")
+	viper.SetDefault(string(EventDispatcherBatchTimeout), "0ms")
 	viper.SetDefault(string(EventDispatcherPollTimeout), "30s")
 	viper.SetDefault(string(EventTransportsEnabled), []string{"websockets", "webhooks"})
 	viper.SetDefault(string(EventTransportsDefault), "websockets")
@@ -451,7 +453,8 @@ func setDefaults() {
 	viper.SetDefault(string(PrivateMessagingBatchSize), 200)
 	viper.SetDefault(string(PrivateMessagingBatchTimeout), "1s")
 	viper.SetDefault(string(PrivateMessagingBatchPayloadLimit), "800Kb")
-	viper.SetDefault(string(SubscriptionDefaultsReadAhead), 0)
+	viper.SetDefault(string(SubscriptionDefaultsBatchSize), 50)
+	viper.SetDefault(string(SubscriptionDefaultsBatchTimeout), "50ms")
 	viper.SetDefault(string(SubscriptionMax), 500)
 	viper.SetDefault(string(SubscriptionsRetryInitialDelay), "250ms")
 	viper.SetDefault(string(SubscriptionsRetryMaxDelay), "30s")
diff --git a/internal/coremsgs/en_config_descriptions.go b/internal/coremsgs/en_config_descriptions.go
index a148b5b2b9..b766b72b6d 100644
--- a/internal/coremsgs/en_config_descriptions.go
+++ b/internal/coremsgs/en_config_descriptions.go
@@ -1,4 +1,4 @@
-// Copyright © 2023 Kaleido, Inc.
+// Copyright © 2024 Kaleido, Inc.
 //
 // SPDX-License-Identifier: Apache-2.0
 //
@@ -383,8 +383,9 @@ var (
 	ConfigPluginSharedstorageIpfsGatewayURL      = ffc("config.plugins.sharedstorage[].ipfs.gateway.url", "The URL for the IPFS Gateway", urlStringType)
 	ConfigPluginSharedstorageIpfsGatewayProxyURL = ffc("config.plugins.sharedstorage[].ipfs.gateway.proxy.url", "Optional HTTP proxy server to use when connecting to the IPFS Gateway", urlStringType)
 
-	ConfigSubscriptionMax               = ffc("config.subscription.max", "The maximum number of pre-defined subscriptions that can exist (note for high fan-out consider connecting a dedicated pub/sub broker to the dispatcher)", i18n.IntType)
-	ConfigSubscriptionDefaultsBatchSize = ffc("config.subscription.defaults.batchSize", "Default read ahead to enable for subscriptions that do not explicitly configure readahead", i18n.IntType)
+	ConfigSubscriptionMax                  = ffc("config.subscription.max", "The maximum number of pre-defined subscriptions that can exist (note for high fan-out consider connecting a dedicated pub/sub broker to the dispatcher)", i18n.IntType)
+	ConfigSubscriptionDefaultsBatchSize    = ffc("config.subscription.defaults.batchSize", "Default read ahead to enable for subscriptions that do not explicitly configure readahead", i18n.IntType)
+	ConfigSubscriptionDefaultsBatchTimeout = ffc("config.subscription.defaults.batchTimeout", "Default batch timeout", i18n.IntType)
 
 	ConfigTokensName     = ffc("config.tokens[].name", "A name to identify this token plugin", i18n.StringType)
 	ConfigTokensPlugin   = ffc("config.tokens[].plugin", "The type of the token plugin to use", i18n.StringType)
diff --git a/internal/events/event_dispatcher.go b/internal/events/event_dispatcher.go
index a862d2dd57..8afd99e390 100644
--- a/internal/events/event_dispatcher.go
+++ b/internal/events/event_dispatcher.go
@@ -1,4 +1,4 @@
-// Copyright © 2023 Kaleido, Inc.
+// Copyright © 2024 Kaleido, Inc.
 //
 // SPDX-License-Identifier: Apache-2.0
 //
@@ -65,25 +65,21 @@ type eventDispatcher struct {
 	elected       bool
 	eventPoller   *eventPoller
 	inflight      map[fftypes.UUID]*core.Event
-	eventDelivery chan *core.EventDelivery
+	eventDelivery chan []*core.EventDelivery
 	mux           sync.Mutex
 	namespace     string
 	readAhead     int
 	batch         bool
-	batchTimeout  time.Duration
 	subscription  *subscription
 	txHelper      txcommon.Helper
 }
 
 func newEventDispatcher(ctx context.Context, enricher *eventEnricher, ei events.Plugin, di database.Plugin, dm data.Manager, bm broadcast.Manager, pm privatemessaging.Manager, connID string, sub *subscription, en *eventNotifier, txHelper txcommon.Helper) *eventDispatcher {
 	ctx, cancelCtx := context.WithCancel(ctx)
-	readAhead := config.GetUint(coreconfig.SubscriptionDefaultsReadAhead)
+	readAhead := uint(0)
 	if sub.definition.Options.ReadAhead != nil {
 		readAhead = uint(*sub.definition.Options.ReadAhead)
 	}
-	if readAhead > maxReadAhead {
-		readAhead = maxReadAhead
-	}
 
 	batchTimeout := defaultBatchTimeout
 	if sub.definition.Options.BatchTimeout != nil && *sub.definition.Options.BatchTimeout != "" {
@@ -108,13 +104,12 @@ func newEventDispatcher(ctx context.Context, enricher *eventEnricher, ei events.
 		subscription:  sub,
 		namespace:     sub.definition.Namespace,
 		inflight:      make(map[fftypes.UUID]*core.Event),
-		eventDelivery: make(chan *core.EventDelivery, readAhead+1),
+		eventDelivery: make(chan []*core.EventDelivery, readAhead+1),
 		readAhead:     int(readAhead),
 		acksNacks:     make(chan ackNack),
 		closed:        make(chan struct{}),
 		txHelper:      txHelper,
 		batch:         batch,
-		batchTimeout:  batchTimeout,
 	}
 
 	pollerConf := &eventPollerConf{
@@ -138,6 +133,20 @@ func newEventDispatcher(ctx context.Context, enricher *eventEnricher, ei events.
 		firstEvent:       sub.definition.Options.FirstEvent,
 	}
 
+	// Users can tune the batch related settings.
+	// This is always true in batch:true cases, and optionally you can use the batchTimeout setting
+	// to tweak how we optimize ourselves for readahead / latency detection without batching
+	// (most likely users with this requirement would be best to just move to batch:true).
+	if batchTimeout > 0 {
+		pollerConf.eventBatchTimeout = batchTimeout
+		if batchTimeout > pollerConf.eventPollTimeout {
+			pollerConf.eventPollTimeout = batchTimeout
+		}
+	}
+	if batch || pollerConf.eventBatchSize < int(readAhead) {
+		pollerConf.eventBatchSize = ed.readAhead
+	}
+
 	ed.eventPoller = newEventPoller(ctx, di, en, pollerConf)
 	return ed
 }
@@ -165,11 +174,8 @@ func (ed *eventDispatcher) electAndStart() {
 	ed.elected = true
 	ed.eventPoller.start()
 
-	if ed.batch {
-		go ed.deliverBatchedEvents()
-	} else {
-		go ed.deliverEvents()
-	}
+	go ed.deliverEvents()
+
 	// Wait until the event poller closes
 	<-ed.eventPoller.closed
 }
@@ -326,7 +332,15 @@ func (ed *eventDispatcher) bufferedDelivery(events []core.LocallySequenced) (boo
 			ed.mux.Unlock()
 
 			dispatched++
-			ed.eventDelivery <- event
+			if !ed.batch {
+				// dispatch individually
+				ed.eventDelivery <- []*core.EventDelivery{event}
+			}
+		}
+
+		if ed.batch && len(dispatchable) > 0 {
+			// Dispatch the whole batch now marked in-flight
+			ed.eventDelivery <- dispatchable
 		}
 
 		if inflightCount == 0 {
@@ -384,88 +398,59 @@ func (ed *eventDispatcher) handleAckOffsetUpdate(ack ackNack) {
 	}
 }
 
-func (ed *eventDispatcher) deliverBatchedEvents() {
+func (ed *eventDispatcher) deliverEvents() {
 	withData := ed.subscription.definition.Options.WithData != nil && *ed.subscription.definition.Options.WithData
-
-	var events []*core.CombinedEventDataDelivery
-	var batchTimeoutContext context.Context
-	var batchTimeoutCancel func()
 	for {
-		var timeoutContext context.Context
-		var timedOut bool
-		if batchTimeoutContext != nil {
-			timeoutContext = batchTimeoutContext
-		} else {
-			timeoutContext = ed.ctx
-		}
 		select {
-		case event, ok := <-ed.eventDelivery:
+		case events, ok := <-ed.eventDelivery:
 			if !ok {
-				if batchTimeoutCancel != nil {
-					batchTimeoutCancel()
-				}
 				return
 			}
 
-			if events == nil {
-				events = []*core.CombinedEventDataDelivery{}
-				batchTimeoutContext, batchTimeoutCancel = context.WithTimeout(ed.ctx, ed.batchTimeout)
-			}
-
-			log.L(ed.ctx).Debugf("Dispatching %s event in a batch: %.10d/%s [%s]: ref=%s/%s", ed.transport.Name(), event.Sequence, event.ID, event.Type, event.Namespace, event.Reference)
-
-			var data []*core.Data
+			// As soon as we hit an error, we need to trigger into nack mode
 			var err error
-			if withData && event.Message != nil {
-				data, _, err = ed.data.GetMessageDataCached(ed.ctx, event.Message)
-			}
-
-			events = append(events, &core.CombinedEventDataDelivery{Event: event, Data: data})
-
-			if err != nil {
-				ed.deliveryResponse(&core.EventDeliveryResponse{ID: event.ID, Rejected: true})
-			}
-
-		case <-timeoutContext.Done():
-			timedOut = true
-		case <-ed.ctx.Done():
-			if batchTimeoutCancel != nil {
-				batchTimeoutCancel()
-			}
-			return
-		}
 
-		if len(events) == ed.readAhead || (timedOut && len(events) > 0) {
-			_ = ed.transport.BatchDeliveryRequest(ed.ctx, ed.connID, ed.subscription.definition, events)
-			// If err handle all the delivery responses for all the events??
-			events = nil
-		}
-	}
-}
-
-// TODO issue here, we can't just call DeliveryRequest with one thing.
-func (ed *eventDispatcher) deliverEvents() {
-	withData := ed.subscription.definition.Options.WithData != nil && *ed.subscription.definition.Options.WithData
-	for {
-		select {
-		case event, ok := <-ed.eventDelivery:
-			if !ok {
-				return
+			// Loop through the events enriching them, and dispatching individually in non-batch mode
+			eventsWithData := make([]*core.CombinedEventDataDelivery, len(events))
+			for i := 0; i < len(events); i++ {
+				e := &core.CombinedEventDataDelivery{
+					Event: events[i],
+				}
+				eventsWithData[i] = e
+				// The first error we encounter stops us attempting to enrich or dispatch any more events
+				if err == nil {
+					log.L(ed.ctx).Debugf("Dispatching %s event: %.10d/%s [%s]: ref=%s/%s", ed.transport.Name(), e.Event.Sequence, e.Event.ID, e.Event.Type, e.Event.Namespace, e.Event.Reference)
+					if withData && e.Event.Message != nil {
+						e.Data, _, err = ed.data.GetMessageDataCached(ed.ctx, e.Event.Message)
+					}
+				}
+				// If we are non-batched, we have to deliver each event individually...
+				if !ed.batch {
+					// .. only attempt to deliver if we've not triggered into an error scenario for one of the events already
+					if err == nil {
+						err = ed.transport.DeliveryRequest(ed.ctx, ed.connID, ed.subscription.definition, e.Event, e.Data)
+					}
+					// ... if we've triggered into an error scenario, we need to nack immediately for this and all the rest of the events
+					if err != nil {
+						ed.deliveryResponse(&core.EventDeliveryResponse{ID: e.Event.ID, Rejected: true})
+					}
+				}
 			}
 
-			log.L(ed.ctx).Debugf("Dispatching %s event: %.10d/%s [%s]: ref=%s/%s", ed.transport.Name(), event.Sequence, event.ID, event.Type, event.Namespace, event.Reference)
-			var data []*core.Data
-			var err error
-			if withData && event.Message != nil {
-				data, _, err = ed.data.GetMessageDataCached(ed.ctx, event.Message)
+			// In batch mode we do one dispatch of the whole set as one
+			if ed.batch {
+				// Only attempt to deliver if we're in a non error case (enrich might have failed above)
+				if err == nil {
+					err = ed.transport.BatchDeliveryRequest(ed.ctx, ed.connID, ed.subscription.definition, eventsWithData)
+				}
+				// If we're in an error case we have to nack everything immediately
+				if err != nil {
+					for _, e := range events {
+						ed.deliveryResponse(&core.EventDeliveryResponse{ID: e.Event.ID, Rejected: true})
+					}
+				}
 			}
 
-			if err == nil {
-				err = ed.transport.DeliveryRequest(ed.ctx, ed.connID, ed.subscription.definition, event, data)
-			}
-			if err != nil {
-				ed.deliveryResponse(&core.EventDeliveryResponse{ID: event.ID, Rejected: true})
-			}
 		case <-ed.ctx.Done():
 			return
 		}
diff --git a/internal/events/event_dispatcher_test.go b/internal/events/event_dispatcher_test.go
index 2f592c92db..a056c32518 100644
--- a/internal/events/event_dispatcher_test.go
+++ b/internal/events/event_dispatcher_test.go
@@ -23,7 +23,6 @@ import (
 	"testing"
 	"time"
 
-	"github.com/hyperledger/firefly-common/pkg/config"
 	"github.com/hyperledger/firefly-common/pkg/fftypes"
 	"github.com/hyperledger/firefly-common/pkg/log"
 	"github.com/hyperledger/firefly/internal/cache"
@@ -131,20 +130,6 @@ func TestEventDispatcherStartStopBatched(t *testing.T) {
 	ed.close()
 }
 
-func TestMaxReadAhead(t *testing.T) {
-	config.Set(coreconfig.SubscriptionDefaultsReadAhead, 65537)
-	ed, cancel := newTestEventDispatcher(&subscription{
-		dispatcherElection: make(chan bool, 1),
-		definition: &core.Subscription{
-			SubscriptionRef: core.SubscriptionRef{Namespace: "ns1", Name: "sub1"},
-			Ephemeral:       true,
-			Options:         core.SubscriptionOptions{},
-		},
-	})
-	defer cancel()
-	assert.Equal(t, int(65536), ed.readAhead)
-}
-
 func TestEventDispatcherLeaderElection(t *testing.T) {
 	log.SetLevel("debug")
 
@@ -377,6 +362,171 @@ func TestEventDispatcherNoReadAheadInOrder(t *testing.T) {
 	mdm.AssertExpectations(t)
 }
 
+func TestEventDispatcherBatchBased(t *testing.T) {
+	log.SetLevel("debug")
+	three := uint16(3)
+	longTime := "1m"
+	subID := fftypes.NewUUID()
+	truthy := true
+	sub := &subscription{
+		dispatcherElection: make(chan bool, 1),
+		definition: &core.Subscription{
+			SubscriptionRef: core.SubscriptionRef{ID: subID, Namespace: "ns1", Name: "sub1"},
+			Options: core.SubscriptionOptions{
+				SubscriptionCoreOptions: core.SubscriptionCoreOptions{
+					Batch:        &truthy,
+					ReadAhead:    &three,
+					BatchTimeout: &longTime, // because the batch should fill
+				},
+			},
+		},
+		eventMatcher: regexp.MustCompile(fmt.Sprintf("^%s|%s$", core.EventTypeMessageConfirmed, core.EventTypeMessageConfirmed)),
+	}
+
+	ed, cancel := newTestEventDispatcher(sub)
+	defer cancel()
+	go ed.deliverEvents()
+	ed.eventPoller.offsetCommitted = make(chan int64, 3)
+	mdi := ed.database.(*databasemocks.Plugin)
+	mei := ed.transport.(*eventsmocks.Plugin)
+	mdm := ed.data.(*datamocks.Manager)
+
+	eventDeliveries := make(chan []*core.CombinedEventDataDelivery)
+	deliveryRequestMock := mei.On("BatchDeliveryRequest", ed.ctx, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
+	deliveryRequestMock.RunFn = func(a mock.Arguments) {
+		eventDeliveries <- a.Get(3).([]*core.CombinedEventDataDelivery)
+	}
+
+	// Setup the IDs
+	ref1 := fftypes.NewUUID()
+	ev1 := fftypes.NewUUID()
+	ref2 := fftypes.NewUUID()
+	ev2 := fftypes.NewUUID()
+	ref3 := fftypes.NewUUID()
+	ev3 := fftypes.NewUUID()
+	ref4 := fftypes.NewUUID()
+	ev4 := fftypes.NewUUID()
+
+	// Setup enrichment
+	mdm.On("GetMessageWithDataCached", mock.Anything, ref1).Return(&core.Message{
+		Header: core.MessageHeader{ID: ref1},
+	}, nil, true, nil)
+	mdm.On("GetMessageWithDataCached", mock.Anything, ref2).Return(&core.Message{
+		Header: core.MessageHeader{ID: ref2},
+	}, nil, true, nil)
+	mdm.On("GetMessageWithDataCached", mock.Anything, ref3).Return(&core.Message{
+		Header: core.MessageHeader{ID: ref3},
+	}, nil, true, nil)
+	mdm.On("GetMessageWithDataCached", mock.Anything, ref4).Return(&core.Message{
+		Header: core.MessageHeader{ID: ref4},
+	}, nil, true, nil)
+
+	// Deliver a batch of messages
+	batch1Done := make(chan struct{})
+	go func() {
+		repoll, err := ed.bufferedDelivery([]core.LocallySequenced{
+			&core.Event{ID: ev1, Sequence: 10000001, Reference: ref1, Type: core.EventTypeMessageConfirmed}, // match
+			&core.Event{ID: ev2, Sequence: 10000002, Reference: ref2, Type: core.EventTypeMessageRejected},
+			&core.Event{ID: ev3, Sequence: 10000003, Reference: ref3, Type: core.EventTypeMessageConfirmed}, // match
+			&core.Event{ID: ev4, Sequence: 10000004, Reference: ref4, Type: core.EventTypeMessageConfirmed}, // match
+		})
+		assert.NoError(t, err)
+		assert.True(t, repoll)
+		close(batch1Done)
+	}()
+
+	// Expect to get the batch dispatched - with the three matching events
+	events := <-eventDeliveries
+	assert.Len(t, events, 3)
+	assert.Equal(t, *ev1, *events[0].Event.ID)
+	assert.Equal(t, *ref1, *events[0].Event.Message.Header.ID)
+	assert.Equal(t, *ev3, *events[1].Event.ID)
+	assert.Equal(t, *ref3, *events[1].Event.Message.Header.ID)
+	assert.Equal(t, *ev4, *events[2].Event.ID)
+	assert.Equal(t, *ref4, *events[2].Event.Message.Header.ID)
+
+	// Ack the batch
+	go func() {
+		ed.deliveryResponse(&core.EventDeliveryResponse{ID: events[0].Event.ID})
+		ed.deliveryResponse(&core.EventDeliveryResponse{ID: events[1].Event.ID})
+		ed.deliveryResponse(&core.EventDeliveryResponse{ID: events[2].Event.ID})
+	}()
+
+	assert.Equal(t, int64(10000001), <-ed.eventPoller.offsetCommitted)
+	assert.Equal(t, int64(10000003), <-ed.eventPoller.offsetCommitted)
+	assert.Equal(t, int64(10000004), <-ed.eventPoller.offsetCommitted)
+
+	// This should complete the batch
+	<-batch1Done
+
+	mdi.AssertExpectations(t)
+	mei.AssertExpectations(t)
+	mdm.AssertExpectations(t)
+}
+
+func TestEventDispatcherBatchDispatchFail(t *testing.T) {
+	log.SetLevel("debug")
+	two := uint16(2)
+	longTime := "1m"
+	subID := fftypes.NewUUID()
+	truthy := true
+	sub := &subscription{
+		dispatcherElection: make(chan bool, 1),
+		definition: &core.Subscription{
+			SubscriptionRef: core.SubscriptionRef{ID: subID, Namespace: "ns1", Name: "sub1"},
+			Options: core.SubscriptionOptions{
+				SubscriptionCoreOptions: core.SubscriptionCoreOptions{
+					Batch:        &truthy,
+					ReadAhead:    &two,
+					BatchTimeout: &longTime, // because the batch should fill
+				},
+			},
+		},
+		eventMatcher: regexp.MustCompile(fmt.Sprintf("^%s|%s$", core.EventTypeMessageConfirmed, core.EventTypeMessageConfirmed)),
+	}
+
+	ed, cancel := newTestEventDispatcher(sub)
+	defer cancel()
+	go ed.deliverEvents()
+	ed.eventPoller.offsetCommitted = make(chan int64, 3)
+	mdi := ed.database.(*databasemocks.Plugin)
+	mei := ed.transport.(*eventsmocks.Plugin)
+	mdm := ed.data.(*datamocks.Manager)
+
+	eventDeliveries := make(chan []*core.CombinedEventDataDelivery)
+	deliveryRequestMock := mei.On("BatchDeliveryRequest", ed.ctx, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("pop"))
+	deliveryRequestMock.RunFn = func(a mock.Arguments) {
+		eventDeliveries <- a.Get(3).([]*core.CombinedEventDataDelivery)
+	}
+
+	// Deliver a batch of messages
+	batch1Done := make(chan struct{})
+	go func() {
+		repoll, err := ed.bufferedDelivery([]core.LocallySequenced{
+			&core.Event{ID: fftypes.NewUUID(), Sequence: 10000001, Type: core.EventTypeMessageConfirmed},
+			&core.Event{ID: fftypes.NewUUID(), Sequence: 10000002, Type: core.EventTypeMessageConfirmed},
+		})
+		assert.NoError(t, err)
+		assert.True(t, repoll)
+		close(batch1Done)
+	}()
+
+	mdm.On("GetMessageWithDataCached", mock.Anything, mock.Anything).Return(&core.Message{
+		Header: core.MessageHeader{ID: fftypes.NewUUID()},
+	}, nil, true, nil)
+
+	// Expect to get the batch dispatched - with the three matching events
+	events := <-eventDeliveries
+	assert.Len(t, events, 2)
+
+	// This should complete the batch
+	<-batch1Done
+
+	mdi.AssertExpectations(t)
+	mei.AssertExpectations(t)
+	mdm.AssertExpectations(t)
+}
+
 func TestEnrichEventsFailGetMessages(t *testing.T) {
 
 	sub := &subscription{
@@ -966,21 +1116,6 @@ func TestEventDeliveryClosed(t *testing.T) {
 	cancel()
 }
 
-func TestBatchEventDeliveryClosed(t *testing.T) {
-
-	sub := &subscription{
-		definition: &core.Subscription{},
-	}
-	ed, cancel := newTestEventDispatcher(sub)
-	defer cancel()
-
-	ed.batchTimeout = 1 * time.Minute
-	ed.eventDelivery <- &core.EventDelivery{}
-	close(ed.eventDelivery)
-
-	ed.deliverBatchedEvents()
-}
-
 func TestAckClosed(t *testing.T) {
 
 	sub := &subscription{
@@ -1039,17 +1174,19 @@ func TestDeliverEventsWithDataFail(t *testing.T) {
 	mdm.On("GetMessageDataCached", ed.ctx, mock.Anything).Return(nil, false, fmt.Errorf("pop"))
 
 	id1 := fftypes.NewUUID()
-	ed.eventDelivery <- &core.EventDelivery{
-		EnrichedEvent: core.EnrichedEvent{
-			Event: core.Event{
-				ID: id1,
-			},
-			Message: &core.Message{
-				Header: core.MessageHeader{
-					ID: fftypes.NewUUID(),
+	ed.eventDelivery <- []*core.EventDelivery{
+		{
+			EnrichedEvent: core.EnrichedEvent{
+				Event: core.Event{
+					ID: id1,
 				},
-				Data: core.DataRefs{
-					{ID: fftypes.NewUUID()},
+				Message: &core.Message{
+					Header: core.MessageHeader{
+						ID: fftypes.NewUUID(),
+					},
+					Data: core.DataRefs{
+						{ID: fftypes.NewUUID()},
+					},
 				},
 			},
 		},
@@ -1166,154 +1303,3 @@ func TestEventDeliveryBatch(t *testing.T) {
 	mbm.AssertExpectations(t)
 	mms.AssertExpectations(t)
 }
-
-func TestEventDispatcherBatchReadAhead(t *testing.T) {
-	log.SetLevel("debug")
-	var five = uint16(5)
-	subID := fftypes.NewUUID()
-	truthy := true
-	oneSec := "1s"
-	sub := &subscription{
-		dispatcherElection: make(chan bool, 1),
-		definition: &core.Subscription{
-			SubscriptionRef: core.SubscriptionRef{ID: subID, Namespace: "ns1", Name: "sub1"},
-			Options: core.SubscriptionOptions{
-				SubscriptionCoreOptions: core.SubscriptionCoreOptions{
-					ReadAhead:    &five,
-					Batch:        &truthy,
-					BatchTimeout: &oneSec,
-				},
-			},
-		},
-		eventMatcher: regexp.MustCompile(fmt.Sprintf("^%s|%s$", core.EventTypeMessageConfirmed, core.EventTypeMessageConfirmed)),
-	}
-
-	ed, cancel := newTestEventDispatcher(sub)
-	defer cancel()
-	go ed.deliverBatchedEvents()
-	ed.eventPoller.offsetCommitted = make(chan int64, 3)
-	mdi := ed.database.(*databasemocks.Plugin)
-	mei := ed.transport.(*eventsmocks.Plugin)
-	mdm := ed.data.(*datamocks.Manager)
-
-	eventDeliveries := make(chan *core.EventDelivery)
-	deliveryRequestMock := mei.On("BatchDeliveryRequest", ed.ctx, mock.Anything, mock.Anything, mock.Anything).Return(nil)
-	deliveryRequestMock.RunFn = func(a mock.Arguments) {
-		batchEvents := a.Get(3).([]*core.CombinedEventDataDelivery)
-		for _, event := range batchEvents {
-			eventDeliveries <- event.Event
-		}
-	}
-
-	// Setup the IDs
-	ref1 := fftypes.NewUUID()
-	ev1 := fftypes.NewUUID()
-	ref2 := fftypes.NewUUID()
-	ev2 := fftypes.NewUUID()
-	ref3 := fftypes.NewUUID()
-	ev3 := fftypes.NewUUID()
-	ref4 := fftypes.NewUUID()
-	ev4 := fftypes.NewUUID()
-
-	// Setup enrichment
-	mdm.On("GetMessageWithDataCached", mock.Anything, ref1).Return(&core.Message{
-		Header: core.MessageHeader{ID: ref1},
-	}, nil, true, nil)
-	mdm.On("GetMessageWithDataCached", mock.Anything, ref2).Return(&core.Message{
-		Header: core.MessageHeader{ID: ref2},
-	}, nil, true, nil)
-	mdm.On("GetMessageWithDataCached", mock.Anything, ref3).Return(&core.Message{
-		Header: core.MessageHeader{ID: ref3},
-	}, nil, true, nil)
-	mdm.On("GetMessageWithDataCached", mock.Anything, ref4).Return(&core.Message{
-		Header: core.MessageHeader{ID: ref4},
-	}, nil, true, nil)
-
-	// Deliver a batch of messages
-	batch1Done := make(chan struct{})
-	go func() {
-		repoll, err := ed.bufferedDelivery([]core.LocallySequenced{
-			&core.Event{ID: ev1, Sequence: 10000001, Reference: ref1, Type: core.EventTypeMessageConfirmed}, // match
-			&core.Event{ID: ev2, Sequence: 10000002, Reference: ref2, Type: core.EventTypeMessageRejected},
-			&core.Event{ID: ev3, Sequence: 10000003, Reference: ref3, Type: core.EventTypeMessageConfirmed}, // match
-			&core.Event{ID: ev4, Sequence: 10000004, Reference: ref4, Type: core.EventTypeMessageConfirmed}, // match
-		})
-		assert.NoError(t, err)
-		assert.True(t, repoll)
-		close(batch1Done)
-	}()
-
-	// Wait for the two calls to deliver the matching messages to the client (read ahead allows this)
-	event1 := <-eventDeliveries
-	assert.Equal(t, *ev1, *event1.ID)
-	assert.Equal(t, *ref1, *event1.Message.Header.ID)
-	event3 := <-eventDeliveries
-	assert.Equal(t, *ev3, *event3.ID)
-	assert.Equal(t, *ref3, *event3.Message.Header.ID)
-	event4 := <-eventDeliveries
-	assert.Equal(t, *ev4, *event4.ID)
-	assert.Equal(t, *ref4, *event4.Message.Header.ID)
-
-	// Send back the two acks - out of order to validate the read-ahead logic
-	go func() {
-		ed.deliveryResponse(&core.EventDeliveryResponse{ID: event4.ID})
-		ed.deliveryResponse(&core.EventDeliveryResponse{ID: event1.ID})
-		ed.deliveryResponse(&core.EventDeliveryResponse{ID: event3.ID})
-	}()
-
-	// Confirm we get the offset updates in the correct order, even though the confirmations
-	// came in a different order from the app.
-	assert.Equal(t, int64(10000001), <-ed.eventPoller.offsetCommitted)
-	assert.Equal(t, int64(10000003), <-ed.eventPoller.offsetCommitted)
-	assert.Equal(t, int64(10000004), <-ed.eventPoller.offsetCommitted)
-
-	// This should complete the batch
-	<-batch1Done
-
-	mdi.AssertExpectations(t)
-	mei.AssertExpectations(t)
-	mdm.AssertExpectations(t)
-}
-
-func TestBatchDeliverEventsWithDataFail(t *testing.T) {
-	yes := true
-	sub := &subscription{
-		definition: &core.Subscription{
-			Options: core.SubscriptionOptions{
-				SubscriptionCoreOptions: core.SubscriptionCoreOptions{
-					WithData: &yes,
-				},
-			},
-		},
-	}
-
-	ed, cancel := newTestEventDispatcher(sub)
-	defer cancel()
-
-	mdm := ed.data.(*datamocks.Manager)
-	mdm.On("GetMessageDataCached", ed.ctx, mock.Anything).Return(nil, false, fmt.Errorf("pop"))
-
-	id1 := fftypes.NewUUID()
-	ed.eventDelivery <- &core.EventDelivery{
-		EnrichedEvent: core.EnrichedEvent{
-			Event: core.Event{
-				ID: id1,
-			},
-			Message: &core.Message{
-				Header: core.MessageHeader{
-					ID: fftypes.NewUUID(),
-				},
-				Data: core.DataRefs{
-					{ID: fftypes.NewUUID()},
-				},
-			},
-		},
-	}
-
-	ed.inflight[*id1] = &core.Event{ID: id1}
-	go ed.deliverBatchedEvents()
-
-	an := <-ed.acksNacks
-	assert.True(t, an.isNack)
-
-}
diff --git a/internal/events/event_poller.go b/internal/events/event_poller.go
index 9d876b6c8b..cd509cc204 100644
--- a/internal/events/event_poller.go
+++ b/internal/events/event_poller.go
@@ -1,4 +1,4 @@
-// Copyright © 2023 Kaleido, Inc.
+// Copyright © 2024 Kaleido, Inc.
 //
 // SPDX-License-Identifier: Apache-2.0
 //
@@ -196,7 +196,12 @@ func (ep *eventPoller) eventLoop() {
 		close(ep.offsetCommitted)
 	}()
 
+	doBatchDelay := false
 	for {
+		if doBatchDelay {
+			ep.waitForBatchTimeout()
+		}
+
 		// Read messages from the DB - in an error condition we retry until success, or a closed context
 		events, err := ep.readPage()
 		if err != nil {
@@ -205,6 +210,15 @@ func (ep *eventPoller) eventLoop() {
 		}
 
 		eventCount := len(events)
+
+		// We might want to wait for the batch to fill - so we delay and re-poll
+		if ep.conf.eventBatchTimeout > 0 && !doBatchDelay && eventCount < ep.conf.eventBatchSize {
+			doBatchDelay = true
+			l.Tracef("Batch delay: detected=%d, batchSize=%d batchTimeout=%s", eventCount, ep.conf.eventBatchSize, ep.conf.eventBatchTimeout)
+			continue
+		}
+		doBatchDelay = false // clear any batch delay for next iteration
+
 		repoll := false
 		if eventCount > 0 {
 			// We process all the events in the page in a single database run group, and
@@ -280,6 +294,16 @@ func (ep *eventPoller) shoulderTap() {
 	}
 }
 
+func (ep *eventPoller) waitForBatchTimeout() {
+	// For throughput optimized environments, we can set an eventBatchingTimeout to allow
+	// dispatching of incomplete batches at a shorter timeout than the
+	// long timeout between polling cycles (at the cost of some dispatch latency)
+	select {
+	case <-time.After(ep.conf.eventBatchTimeout):
+	case <-ep.ctx.Done():
+	}
+}
+
 func (ep *eventPoller) waitForShoulderTapOrPollTimeout(lastEventCount int) bool {
 	l := log.L(ep.ctx)
 	longTimeoutDuration := ep.conf.eventPollTimeout
@@ -289,20 +313,6 @@ func (ep *eventPoller) waitForShoulderTapOrPollTimeout(lastEventCount int) bool
 		return true
 	}
 
-	// For throughput optimized environments, we can set an eventBatchingTimeout to allow messages to arrive
-	// between polling cycles (at the cost of some dispatch latency)
-	if ep.conf.eventBatchTimeout > 0 && lastEventCount > 0 {
-		shortTimeout := time.NewTimer(ep.conf.eventBatchTimeout)
-		select {
-		case <-shortTimeout.C:
-			l.Tracef("Woken after batch timeout")
-		case <-ep.ctx.Done():
-			l.Debugf("Exiting due to cancelled context")
-			return false
-		}
-		longTimeoutDuration -= ep.conf.eventBatchTimeout
-	}
-
 	longTimeout := time.NewTimer(longTimeoutDuration)
 	select {
 	case <-longTimeout.C:
diff --git a/internal/events/event_poller_test.go b/internal/events/event_poller_test.go
index 8215d2c8aa..de080a5a2e 100644
--- a/internal/events/event_poller_test.go
+++ b/internal/events/event_poller_test.go
@@ -36,7 +36,7 @@ func newTestEventPoller(t *testing.T, mdi *databasemocks.Plugin, neh newEventsHa
 	ctx, cancel := context.WithCancel(context.Background())
 	ep = newEventPoller(ctx, mdi, newEventNotifier(ctx, "ut"), &eventPollerConf{
 		eventBatchSize:             10,
-		eventBatchTimeout:          1 * time.Millisecond,
+		eventBatchTimeout:          0, // customized for individual tests that enable this
 		eventPollTimeout:           10 * time.Second,
 		startupOffsetRetryAttempts: 1,
 		retry: retry.Retry{
@@ -219,6 +219,57 @@ func TestReadPageSingleCommitEvent(t *testing.T) {
 	mdi.AssertExpectations(t)
 }
 
+func TestReadPageBatchTimeoutNotFull(t *testing.T) {
+	mdi := &databasemocks.Plugin{}
+	processEventCalled := make(chan []core.LocallySequenced, 1)
+	ep, cancel := newTestEventPoller(t, mdi, func(events []core.LocallySequenced) (bool, error) {
+		processEventCalled <- events
+		return false, nil
+	}, nil)
+	ep.conf.eventBatchTimeout = 1 * time.Microsecond
+	ep.conf.eventBatchSize = 3
+	ev1 := core.NewEvent(core.EventTypeMessageConfirmed, "ns1", fftypes.NewUUID(), nil, "")
+	ev2 := core.NewEvent(core.EventTypeMessageConfirmed, "ns1", fftypes.NewUUID(), nil, "")
+	mdi.On("GetEvents", mock.Anything, "unit", mock.Anything).Return([]*core.Event{ev1}, nil, nil).Once() // half batch
+	mdi.On("GetEvents", mock.Anything, "unit", mock.Anything).Return([]*core.Event{ev1, ev2}, nil, nil).Run(func(args mock.Arguments) {
+		ep.shoulderTap()
+	}).Once()
+	mdi.On("GetEvents", mock.Anything, "unit", mock.Anything).Return(nil, nil, fmt.Errorf("context done")).Run(func(args mock.Arguments) {
+		cancel()
+	})
+	ep.eventLoop()
+
+	events := <-processEventCalled
+	assert.Equal(t, *ev1.ID, *events[0].(*core.Event).ID)
+	assert.Equal(t, *ev2.ID, *events[1].(*core.Event).ID)
+	mdi.AssertExpectations(t)
+}
+
+func TestReadPageBatchFull(t *testing.T) {
+	mdi := &databasemocks.Plugin{}
+	processEventCalled := make(chan []core.LocallySequenced, 1)
+	ep, cancel := newTestEventPoller(t, mdi, func(events []core.LocallySequenced) (bool, error) {
+		processEventCalled <- events
+		return false, nil
+	}, nil)
+	ep.conf.eventBatchTimeout = 1 * time.Microsecond
+	ep.conf.eventBatchSize = 2
+	ev1 := core.NewEvent(core.EventTypeMessageConfirmed, "ns1", fftypes.NewUUID(), nil, "")
+	ev2 := core.NewEvent(core.EventTypeMessageConfirmed, "ns1", fftypes.NewUUID(), nil, "")
+	mdi.On("GetEvents", mock.Anything, "unit", mock.Anything).Return([]*core.Event{ev1, ev2}, nil, nil).Run(func(args mock.Arguments) {
+		ep.shoulderTap()
+	}).Once()
+	mdi.On("GetEvents", mock.Anything, "unit", mock.Anything).Return(nil, nil, fmt.Errorf("context done")).Run(func(args mock.Arguments) {
+		cancel()
+	})
+	ep.eventLoop()
+
+	events := <-processEventCalled
+	assert.Equal(t, *ev1.ID, *events[0].(*core.Event).ID)
+	assert.Equal(t, *ev2.ID, *events[1].(*core.Event).ID)
+	mdi.AssertExpectations(t)
+}
+
 func TestReadPageRewind(t *testing.T) {
 	mdi := &databasemocks.Plugin{}
 	processEventCalled := make(chan core.LocallySequenced, 1)
@@ -325,6 +376,14 @@ func TestDoubleTap(t *testing.T) {
 	ep.shoulderTap() // this should not block
 }
 
+func TestWaitForBatchTimeoutClosedContext(t *testing.T) {
+	mdi := &databasemocks.Plugin{}
+	ep, cancel := newTestEventPoller(t, mdi, nil, nil)
+	ep.conf.eventBatchTimeout = 1 * time.Minute
+	cancel()
+	ep.waitForBatchTimeout()
+}
+
 func TestDoubleConfirm(t *testing.T) {
 	mdi := &databasemocks.Plugin{}
 	ep, cancel := newTestEventPoller(t, mdi, nil, nil)
diff --git a/internal/events/subscription_manager.go b/internal/events/subscription_manager.go
index 788e72ec31..928e3789ec 100644
--- a/internal/events/subscription_manager.go
+++ b/internal/events/subscription_manager.go
@@ -1,4 +1,4 @@
-// Copyright © 2023 Kaleido, Inc.
+// Copyright © 2024 Kaleido, Inc.
 //
 // SPDX-License-Identifier: Apache-2.0
 //
@@ -20,6 +20,7 @@ import (
 	"context"
 	"regexp"
 	"sync"
+	"time"
 
 	"github.com/hyperledger/firefly-common/pkg/config"
 	"github.com/hyperledger/firefly-common/pkg/fftypes"
@@ -90,6 +91,9 @@ type subscriptionManager struct {
 	newOrUpdatedSubscriptions chan *fftypes.UUID
 	deletedSubscriptions      chan *fftypes.UUID
 	retry                     retry.Retry
+
+	defaultBatchSize    uint16
+	defaultBatchTimeout time.Duration
 }
 
 func newSubscriptionManager(ctx context.Context, ns *core.Namespace, enricher *eventEnricher, di database.Plugin, dm data.Manager, en *eventNotifier, bm broadcast.Manager, pm privatemessaging.Manager, txHelper txcommon.Helper, transports map[string]events.Plugin) (*subscriptionManager, error) {
@@ -116,6 +120,8 @@ func newSubscriptionManager(ctx context.Context, ns *core.Namespace, enricher *e
 			MaximumDelay: config.GetDuration(coreconfig.SubscriptionsRetryMaxDelay),
 			Factor:       config.GetFloat64(coreconfig.SubscriptionsRetryFactor),
 		},
+		defaultBatchSize:    uint16(config.GetInt(coreconfig.SubscriptionDefaultsBatchSize)),
+		defaultBatchTimeout: config.GetDuration(coreconfig.SubscriptionDefaultsBatchTimeout),
 	}
 
 	for _, ei := range sm.transports {
@@ -270,6 +276,18 @@ func (sm *subscriptionManager) parseSubscriptionDef(ctx context.Context, subDef
 		subDef.Options.TLSConfig = sm.namespace.TLSConfigs[subDef.Options.TLSConfigName]
 	}
 
+	// Defaults that only apply in batch mode
+	if subDef.Options.Batch != nil && *subDef.Options.Batch {
+		if subDef.Options.ReadAhead == nil || *subDef.Options.ReadAhead == 0 {
+			defaultBatchSize := sm.defaultBatchSize
+			subDef.Options.ReadAhead = &defaultBatchSize
+		}
+		if subDef.Options.BatchTimeout == nil || *subDef.Options.BatchTimeout == "" {
+			defaultBatchTimeout := sm.defaultBatchTimeout.String()
+			subDef.Options.BatchTimeout = &defaultBatchTimeout
+		}
+	}
+
 	if err := transport.ValidateOptions(ctx, &subDef.Options); err != nil {
 		return nil, err
 	}
diff --git a/internal/events/subscription_manager_test.go b/internal/events/subscription_manager_test.go
index d72b8d1116..ef509c7057 100644
--- a/internal/events/subscription_manager_test.go
+++ b/internal/events/subscription_manager_test.go
@@ -551,6 +551,30 @@ func TestCreateSubscriptionSuccessTLSConfig(t *testing.T) {
 	assert.NotNil(t, sub.definition.Options.TLSConfig)
 }
 
+func TestCreateSubscriptionSuccessBatch(t *testing.T) {
+	coreconfig.Reset()
+
+	mei := &eventsmocks.Plugin{}
+	sm, cancel := newTestSubManager(t, mei)
+	defer cancel()
+
+	mei.On("GetFFRestyConfig", mock.Anything).Return(&ffresty.Config{})
+	mei.On("ValidateOptions", mock.Anything, mock.Anything).Return(nil)
+	truthy := true
+	sub, err := sm.parseSubscriptionDef(sm.ctx, &core.Subscription{
+		Options: core.SubscriptionOptions{
+			SubscriptionCoreOptions: core.SubscriptionCoreOptions{
+				Batch: &truthy,
+			},
+		},
+		Transport: "ut",
+	})
+	assert.NoError(t, err)
+
+	assert.Equal(t, uint16(50), *sub.definition.Options.ReadAhead)
+	assert.Equal(t, "50ms", *sub.definition.Options.BatchTimeout)
+}
+
 func TestCreateSubscriptionWithDeprecatedFilters(t *testing.T) {
 	mei := &eventsmocks.Plugin{}
 	sm, cancel := newTestSubManager(t, mei)
diff --git a/internal/events/websockets/config.go b/internal/events/websockets/config.go
index 8ce482a87d..c5d8df776a 100644
--- a/internal/events/websockets/config.go
+++ b/internal/events/websockets/config.go
@@ -1,4 +1,4 @@
-// Copyright © 2022 Kaleido, Inc.
+// Copyright © 2024 Kaleido, Inc.
 //
 // SPDX-License-Identifier: Apache-2.0
 //
@@ -32,5 +32,4 @@ const (
 func (ws *WebSockets) InitConfig(config config.Section) {
 	config.AddKnownKey(ReadBufferSize, bufferSizeDefault)
 	config.AddKnownKey(WriteBufferSize, bufferSizeDefault)
-
 }
diff --git a/internal/events/websockets/websocket_connection.go b/internal/events/websockets/websocket_connection.go
index e72d1eff40..58de422fa0 100644
--- a/internal/events/websockets/websocket_connection.go
+++ b/internal/events/websockets/websocket_connection.go
@@ -1,4 +1,4 @@
-// Copyright © 2023 Kaleido, Inc.
+// Copyright © 2024 Kaleido, Inc.
 //
 // SPDX-License-Identifier: Apache-2.0
 //
@@ -21,6 +21,8 @@ import (
 	"encoding/json"
 	"io"
 	"net/http"
+	"net/url"
+	"strconv"
 	"sync"
 	"time"
 
@@ -49,6 +51,7 @@ type websocketConnection struct {
 	autoAck         bool
 	started         []*websocketStartedSub
 	inflight        []*core.EventDeliveryResponse
+	inflightBatches []*core.WSEventBatch
 	mux             sync.Mutex
 	closed          bool
 	remoteAddr      string
@@ -94,21 +97,45 @@ func (wc *websocketConnection) assertNamespace(namespace string) (string, error)
 	return namespace, nil
 }
 
+func isBoolQuerySet(query url.Values, boolOption string) bool {
+	optionValues, hasOptionValues := query[boolOption]
+	return hasOptionValues && (len(optionValues) == 0 || optionValues[0] != "false")
+}
+
+func (wc *websocketConnection) getReadAhead(query url.Values, isBatch bool) *uint16 {
+	readaheadStr := query.Get("readahead")
+	if readaheadStr != "" {
+		readAheadInt, err := strconv.ParseUint(readaheadStr, 10, 16)
+		if err == nil {
+			readahead := uint16(readAheadInt)
+			return &readahead
+		}
+	}
+	return nil
+}
+
+func (wc *websocketConnection) getBatchTimeout(query url.Values) *string {
+	batchTimeout := query.Get("batchtimeout")
+	if batchTimeout != "" {
+		return &batchTimeout
+	}
+	return nil
+}
+
 // processAutoStart gives a helper to specify query parameters to auto-start your subscription
 func (wc *websocketConnection) processAutoStart(req *http.Request) {
 	query := req.URL.Query()
-	ephemeral, hasEphemeral := req.URL.Query()["ephemeral"]
-	isEphemeral := hasEphemeral && (len(ephemeral) == 0 || ephemeral[0] != "false")
+	isEphemeral := isBoolQuerySet(query, "ephemeral")
 	_, hasName := query["name"]
-	autoAck, hasAutoack := req.URL.Query()["autoack"]
-	isAutoack := hasAutoack && (len(autoAck) == 0 || autoAck[0] != "false")
+	isAutoack := isBoolQuerySet(query, "autoack")
 	namespace, err := wc.assertNamespace(query.Get("namespace"))
 	if err != nil {
 		wc.protocolError(err)
 		return
 	}
 
-	if hasEphemeral || hasName {
+	if isEphemeral || hasName {
+		isBatch := isBoolQuerySet(query, "batch")
 		filter := core.NewSubscriptionFilterFromQuery(query)
 		err := wc.handleStart(&core.WSStart{
 			AutoAck:   &isAutoack,
@@ -116,6 +143,13 @@ func (wc *websocketConnection) processAutoStart(req *http.Request) {
 			Namespace: namespace,
 			Name:      query.Get("name"),
 			Filter:    filter,
+			Options: core.SubscriptionOptions{
+				SubscriptionCoreOptions: core.SubscriptionCoreOptions{
+					Batch:        &isBatch,
+					BatchTimeout: wc.getBatchTimeout(query),
+					ReadAhead:    wc.getReadAhead(query, isBatch),
+				},
+			},
 		})
 		if err != nil {
 			wc.protocolError(err)
@@ -231,6 +265,54 @@ func (wc *websocketConnection) dispatch(event *core.EventDelivery) error {
 	return nil
 }
 
+func (wc *websocketConnection) dispatchBatch(sub *core.Subscription, events []*core.CombinedEventDataDelivery) error {
+	inflightBatch := &core.WSEventBatch{
+		Type:   core.WSEventBatchType,
+		ID:     fftypes.NewUUID(),
+		Events: make([]*core.EventDelivery, len(events)),
+	}
+	if sub != nil {
+		inflightBatch.Subscription = sub.SubscriptionRef
+	}
+	for i, e := range events {
+		// For ephemeral there's no sub, so we pick up from first event
+		if inflightBatch.Subscription.Namespace == "" {
+			inflightBatch.Subscription = e.Event.Subscription
+		}
+		inflightBatch.Events[i] = e.Event
+	}
+
+	var autoAck bool
+	wc.mux.Lock()
+	autoAck = wc.autoAck
+	if !autoAck {
+		wc.inflightBatches = append(wc.inflightBatches, inflightBatch)
+	}
+	wc.mux.Unlock()
+
+	err := wc.send(inflightBatch)
+	if err != nil {
+		return err
+	}
+
+	if autoAck {
+		wc.ackBatch(inflightBatch)
+	}
+
+	return nil
+}
+
+func (wc *websocketConnection) ackBatch(batch *core.WSEventBatch) {
+	for _, e := range batch.Events {
+		// We individually drive an ack back on each event, but do so in one pass
+		// (this matches the webhook implementation of batching).
+		wc.ws.ack(wc.connID, &core.EventDeliveryResponse{
+			ID:           e.ID,
+			Subscription: batch.Subscription,
+		})
+	}
+}
+
 func (wc *websocketConnection) protocolError(err error) {
 	log.L(wc.ctx).Errorf("Sending protocol error to client: %s", err)
 	sendErr := wc.send(&core.WSError{
@@ -309,6 +391,22 @@ func (wc *websocketConnection) durableSubMatcher(sr core.SubscriptionRef) bool {
 	return false
 }
 
+func (wc *websocketConnection) handleBatchAck(ack *core.WSAck) (handled bool) {
+	wc.mux.Lock()
+	defer wc.mux.Unlock()
+	var newInflightBatches []*core.WSEventBatch
+	for _, batch := range wc.inflightBatches {
+		if batch.ID.Equals(ack.ID) { // nil safe check
+			wc.ackBatch(batch)
+			handled = true
+		} else {
+			newInflightBatches = append(newInflightBatches, batch)
+		}
+	}
+	wc.inflightBatches = newInflightBatches
+	return handled
+}
+
 func (wc *websocketConnection) checkAck(ack *core.WSAck) (*core.EventDeliveryResponse, error) {
 	l := log.L(wc.ctx)
 	var inflight *core.EventDeliveryResponse
@@ -363,6 +461,10 @@ func (wc *websocketConnection) checkAck(ack *core.WSAck) (*core.EventDeliveryRes
 }
 
 func (wc *websocketConnection) handleAck(ack *core.WSAck) error {
+	if handled := wc.handleBatchAck(ack); handled {
+		return nil
+	}
+
 	// Perform a locked set of check
 	inflight, err := wc.checkAck(ack)
 	if err != nil {
diff --git a/internal/events/websockets/websockets.go b/internal/events/websockets/websockets.go
index a12a325dd0..fe6dc9bf0e 100644
--- a/internal/events/websockets/websockets.go
+++ b/internal/events/websockets/websockets.go
@@ -1,4 +1,4 @@
-// Copyright © 2023 Kaleido, Inc.
+// Copyright © 2024 Kaleido, Inc.
 //
 // SPDX-License-Identifier: Apache-2.0
 //
@@ -54,9 +54,11 @@ func (ws *WebSockets) Name() string { return "websockets" }
 
 func (ws *WebSockets) Init(ctx context.Context, config config.Section) error {
 	*ws = WebSockets{
-		ctx:          ctx,
-		connections:  make(map[string]*websocketConnection),
-		capabilities: &events.Capabilities{},
+		ctx:         ctx,
+		connections: make(map[string]*websocketConnection),
+		capabilities: &events.Capabilities{
+			BatchDelivery: true,
+		},
 		callbacks: callbacks{
 			handlers: make(map[string]events.Callbacks),
 		},
@@ -242,6 +244,11 @@ func (ws *WebSockets) GetStatus() *core.WebSocketStatus {
 }
 
 func (ws *WebSockets) BatchDeliveryRequest(ctx context.Context, connID string, sub *core.Subscription, events []*core.CombinedEventDataDelivery) error {
-	// We should have rejected creation of the subscription, due to us not supporting this in our capabilities
-	return i18n.NewError(ctx, coremsgs.MsgBatchDeliveryNotSupported, ws.Name())
+	ws.connMux.Lock()
+	conn, ok := ws.connections[connID]
+	ws.connMux.Unlock()
+	if !ok {
+		return i18n.NewError(ctx, coremsgs.MsgWSConnectionNotActive, connID)
+	}
+	return conn.dispatchBatch(sub, events)
 }
diff --git a/internal/events/websockets/websockets_test.go b/internal/events/websockets/websockets_test.go
index e2f837988b..cba25ebfad 100644
--- a/internal/events/websockets/websockets_test.go
+++ b/internal/events/websockets/websockets_test.go
@@ -236,6 +236,68 @@ func TestStartReceiveAckEphemeral(t *testing.T) {
 	cbs.AssertExpectations(t)
 }
 
+func TestAutoAckBatch(t *testing.T) {
+	log.SetLevel("trace")
+
+	cbs := &eventsmocks.Callbacks{}
+	ws, wsc, cancel := newTestWebsockets(t, cbs, nil, "autoack=true")
+	defer cancel()
+	var connID string
+	mes := cbs.On("EphemeralSubscription",
+		mock.MatchedBy(func(s string) bool { connID = s; return true }),
+		"ns1", mock.Anything, mock.MatchedBy(func(o *core.SubscriptionOptions) bool {
+			return *o.Batch
+		})).Return(nil)
+	ack := cbs.On("DeliveryResponse",
+		mock.MatchedBy(func(s string) bool { return s == connID }),
+		mock.Anything).Return(nil)
+
+	waitSubscribed := make(chan struct{})
+	mes.RunFn = func(a mock.Arguments) {
+		close(waitSubscribed)
+	}
+
+	waitAcked := make(chan struct{})
+	ack.RunFn = func(a mock.Arguments) {
+		close(waitAcked)
+	}
+
+	err := wsc.Send(context.Background(), []byte(`{
+		"type":"start",
+		"namespace":"ns1",
+		"ephemeral":true,
+		"autoack": true,
+		"options": {
+			"batch": true
+		}
+	}`))
+	assert.NoError(t, err)
+
+	<-waitSubscribed
+	sub := &core.Subscription{
+		SubscriptionRef: core.SubscriptionRef{ID: fftypes.NewUUID(), Namespace: "ns1", Name: "sub1"},
+	}
+	ws.BatchDeliveryRequest(ws.ctx, connID, sub, []*core.CombinedEventDataDelivery{
+		{Event: &core.EventDelivery{
+			EnrichedEvent: core.EnrichedEvent{
+				Event: core.Event{ID: fftypes.NewUUID()},
+			},
+			Subscription: core.SubscriptionRef{
+				ID:        fftypes.NewUUID(),
+				Namespace: "ns1",
+			},
+		}},
+	})
+
+	b := <-wsc.Receive()
+	var res core.EventDelivery
+	err = json.Unmarshal(b, &res)
+	assert.NoError(t, err)
+
+	<-waitAcked
+	cbs.AssertExpectations(t)
+}
+
 func TestStartReceiveDurable(t *testing.T) {
 	cbs := &eventsmocks.Callbacks{}
 	ws, wsc, cancel := newTestWebsockets(t, cbs, nil)
@@ -243,12 +305,13 @@ func TestStartReceiveDurable(t *testing.T) {
 	var connID string
 	sub := cbs.On("RegisterConnection",
 		mock.MatchedBy(func(s string) bool { connID = s; return true }),
-		mock.MatchedBy(func(subMatch events.SubscriptionMatcher) bool {
-			return subMatch(core.SubscriptionRef{Namespace: "ns1", Name: "sub1"}) &&
-				!subMatch(core.SubscriptionRef{Namespace: "ns2", Name: "sub1"}) &&
-				!subMatch(core.SubscriptionRef{Namespace: "ns1", Name: "sub2"})
-		}),
-	).Return(nil)
+		mock.Anything,
+	).Return(nil).Run(func(args mock.Arguments) {
+		subMatch := args[1].(events.SubscriptionMatcher)
+		assert.True(t, subMatch(core.SubscriptionRef{Namespace: "ns1", Name: "sub1"}))
+		assert.False(t, subMatch(core.SubscriptionRef{Namespace: "ns2", Name: "sub1"}))
+		assert.False(t, subMatch(core.SubscriptionRef{Namespace: "ns1", Name: "sub2"}))
+	})
 	ack := cbs.On("DeliveryResponse",
 		mock.MatchedBy(func(s string) bool { return s == connID }),
 		mock.Anything).Return(nil)
@@ -316,28 +379,107 @@ func TestStartReceiveDurable(t *testing.T) {
 	cbs.AssertExpectations(t)
 }
 
-func TestStartReceiveDurableWithAuth(t *testing.T) {
+func TestStartReceiveDurableBatch(t *testing.T) {
 	cbs := &eventsmocks.Callbacks{}
-	ws, wsc, cancel := newTestWebsockets(t, cbs, &testAuthorizer{})
+	ws, wsc, cancel := newTestWebsockets(t, cbs, nil)
 	defer cancel()
 	var connID string
-	sub := cbs.On("RegisterConnection",
+	mrg := cbs.On("RegisterConnection",
 		mock.MatchedBy(func(s string) bool { connID = s; return true }),
-		mock.MatchedBy(func(subMatch events.SubscriptionMatcher) bool {
-			return subMatch(core.SubscriptionRef{Namespace: "ns1", Name: "sub1"}) &&
-				!subMatch(core.SubscriptionRef{Namespace: "ns2", Name: "sub1"}) &&
-				!subMatch(core.SubscriptionRef{Namespace: "ns1", Name: "sub2"})
-		}),
-	).Return(nil)
+		mock.Anything,
+	).Return(nil).Run(func(args mock.Arguments) {
+		subMatch := args[1].(events.SubscriptionMatcher)
+		assert.True(t, subMatch(core.SubscriptionRef{Namespace: "ns1", Name: "sub1"}))
+		assert.False(t, subMatch(core.SubscriptionRef{Namespace: "ns2", Name: "sub1"}))
+		assert.False(t, subMatch(core.SubscriptionRef{Namespace: "ns1", Name: "sub2"}))
+	})
 	ack := cbs.On("DeliveryResponse",
 		mock.MatchedBy(func(s string) bool { return s == connID }),
 		mock.Anything).Return(nil)
 
 	waitSubscribed := make(chan struct{})
-	sub.RunFn = func(a mock.Arguments) {
+	mrg.RunFn = func(a mock.Arguments) {
 		close(waitSubscribed)
 	}
 
+	acks := make(chan *core.EventDeliveryResponse)
+	ack.RunFn = func(a mock.Arguments) {
+		acks <- a[1].(*core.EventDeliveryResponse)
+	}
+
+	err := wsc.Send(context.Background(), []byte(`{"type":"start","namespace":"ns1","name":"sub1"}`))
+	assert.NoError(t, err)
+
+	<-waitSubscribed
+	sub := &core.Subscription{
+		SubscriptionRef: core.SubscriptionRef{ID: fftypes.NewUUID(), Namespace: "ns1", Name: "sub1"},
+	}
+	event1ID := fftypes.NewUUID()
+	event2ID := fftypes.NewUUID()
+	ws.BatchDeliveryRequest(ws.ctx, connID, sub, []*core.CombinedEventDataDelivery{
+		{
+			Event: &core.EventDelivery{
+				EnrichedEvent: core.EnrichedEvent{
+					Event: core.Event{ID: event1ID},
+				},
+				Subscription: sub.SubscriptionRef,
+			},
+		},
+		{
+			Event: &core.EventDelivery{
+				EnrichedEvent: core.EnrichedEvent{
+					Event: core.Event{ID: event2ID},
+				},
+				Subscription: sub.SubscriptionRef,
+			},
+		},
+	})
+
+	b := <-wsc.Receive()
+	var deliveredBatch core.WSEventBatch
+	err = json.Unmarshal(b, &deliveredBatch)
+	assert.NoError(t, err)
+	assert.Len(t, deliveredBatch.Events, 2)
+	assert.Equal(t, "ns1", deliveredBatch.Subscription.Namespace)
+	assert.Equal(t, "sub1", deliveredBatch.Subscription.Name)
+	err = wsc.Send(context.Background(), []byte(fmt.Sprintf(`{
+		"type":"ack",
+		"id": "%s",
+		"subscription": {
+			"namespace": "ns1",
+			"name": "sub1"
+		}
+	}`, deliveredBatch.ID)))
+	assert.NoError(t, err)
+
+	ack1 := <-acks
+	assert.Equal(t, *event1ID, *ack1.ID)
+	ack2 := <-acks
+	assert.Equal(t, *event2ID, *ack2.ID)
+
+	cbs.AssertExpectations(t)
+}
+
+func TestStartReceiveDurableWithAuth(t *testing.T) {
+	cbs := &eventsmocks.Callbacks{}
+	ws, wsc, cancel := newTestWebsockets(t, cbs, &testAuthorizer{})
+	defer cancel()
+	var connID string
+	waitSubscribed := make(chan struct{})
+	cbs.On("RegisterConnection",
+		mock.MatchedBy(func(s string) bool { connID = s; return true }),
+		mock.Anything,
+	).Run(func(args mock.Arguments) {
+		subMatch := args[1].(events.SubscriptionMatcher)
+		assert.True(t, subMatch(core.SubscriptionRef{Namespace: "ns1", Name: "sub1"}))
+		assert.False(t, subMatch(core.SubscriptionRef{Namespace: "ns2", Name: "sub1"}))
+		assert.False(t, subMatch(core.SubscriptionRef{Namespace: "ns1", Name: "sub2"}))
+		close(waitSubscribed)
+	}).Return(nil)
+	ack := cbs.On("DeliveryResponse",
+		mock.MatchedBy(func(s string) bool { return s == connID }),
+		mock.Anything).Return(nil)
+
 	waitAcked := make(chan struct{})
 	ack.RunFn = func(a mock.Arguments) {
 		close(waitAcked)
@@ -401,22 +543,20 @@ func TestStartReceiveDurableUnauthorized(t *testing.T) {
 	_, wsc, cancel := newTestWebsockets(t, cbs, &testAuthorizer{})
 	defer cancel()
 	var connID string
-	sub := cbs.On("RegisterConnection",
+	waitSubscribed := make(chan struct{})
+	cbs.On("RegisterConnection",
 		mock.MatchedBy(func(s string) bool { connID = s; return true }),
-		mock.MatchedBy(func(subMatch events.SubscriptionMatcher) bool {
-			return subMatch(core.SubscriptionRef{Namespace: "ns2", Name: "sub1"}) &&
-				!subMatch(core.SubscriptionRef{Namespace: "ns1", Name: "sub2"})
-		}),
-	).Return(nil)
+		mock.Anything,
+	).Return(nil).Run(func(args mock.Arguments) {
+		subMatch := args[1].(events.SubscriptionMatcher)
+		assert.True(t, subMatch(core.SubscriptionRef{Namespace: "ns2", Name: "sub1"}))
+		assert.False(t, subMatch(core.SubscriptionRef{Namespace: "ns1", Name: "sub2"}))
+		close(waitSubscribed)
+	})
 	ack := cbs.On("DeliveryResponse",
 		mock.MatchedBy(func(s string) bool { return s == connID }),
 		mock.Anything).Return(nil)
 
-	waitSubscribed := make(chan struct{})
-	sub.RunFn = func(a mock.Arguments) {
-		close(waitSubscribed)
-	}
-
 	waitAcked := make(chan struct{})
 	ack.RunFn = func(a mock.Arguments) {
 		close(waitAcked)
@@ -435,18 +575,18 @@ func TestStartReceiveDurableUnauthorized(t *testing.T) {
 func TestAutoStartReceiveAckEphemeral(t *testing.T) {
 	var connID string
 	cbs := &eventsmocks.Callbacks{}
-	sub := cbs.On("EphemeralSubscription",
+	waitSubscribed := make(chan struct{})
+	cbs.On("EphemeralSubscription",
 		mock.MatchedBy(func(s string) bool { connID = s; return true }),
-		"ns1", mock.Anything, mock.Anything).Return(nil)
+		"ns1", mock.Anything, mock.Anything).
+		Return(nil).
+		Run(func(args mock.Arguments) {
+			close(waitSubscribed)
+		})
 	ack := cbs.On("DeliveryResponse",
 		mock.MatchedBy(func(s string) bool { return s == connID }),
 		mock.Anything).Return(nil)
 
-	waitSubscribed := make(chan struct{})
-	sub.RunFn = func(a mock.Arguments) {
-		close(waitSubscribed)
-	}
-
 	waitAcked := make(chan struct{})
 	ack.RunFn = func(a mock.Arguments) {
 		close(waitAcked)
@@ -478,6 +618,55 @@ func TestAutoStartReceiveAckEphemeral(t *testing.T) {
 	cbs.AssertExpectations(t)
 }
 
+func TestAutoStartReceiveAckBatchEphemeral(t *testing.T) {
+	var connID string
+	cbs := &eventsmocks.Callbacks{}
+	waitSubscribed := make(chan struct{})
+	cbs.On("EphemeralSubscription",
+		mock.MatchedBy(func(s string) bool { connID = s; return true }),
+		"ns1", mock.Anything, mock.Anything).
+		Return(nil).
+		Run(func(args mock.Arguments) {
+			close(waitSubscribed)
+		})
+	ack := cbs.On("DeliveryResponse",
+		mock.MatchedBy(func(s string) bool { return s == connID }),
+		mock.Anything).Return(nil)
+
+	waitAcked := make(chan struct{})
+	ack.RunFn = func(a mock.Arguments) {
+		close(waitAcked)
+	}
+
+	ws, wsc, cancel := newTestWebsockets(t, cbs, nil, "ephemeral", "namespace=ns1", "batch")
+	defer cancel()
+
+	<-waitSubscribed
+	ws.BatchDeliveryRequest(ws.ctx, connID, nil, []*core.CombinedEventDataDelivery{
+		{Event: &core.EventDelivery{
+			EnrichedEvent: core.EnrichedEvent{
+				Event: core.Event{ID: fftypes.NewUUID()},
+			},
+			Subscription: core.SubscriptionRef{
+				ID:        fftypes.NewUUID(),
+				Namespace: "ns1",
+			},
+		}},
+	})
+
+	b := <-wsc.Receive()
+	var deliveredBatch core.WSEventBatch
+	err := json.Unmarshal(b, &deliveredBatch)
+	assert.NoError(t, err)
+	assert.Len(t, deliveredBatch.Events, 1)
+
+	err = wsc.Send(context.Background(), []byte(`{"type":"ack", "id": "`+deliveredBatch.ID.String()+`"}`))
+	assert.NoError(t, err)
+
+	<-waitAcked
+	cbs.AssertExpectations(t)
+}
+
 func TestAutoStartBadOptions(t *testing.T) {
 	cbs := &eventsmocks.Callbacks{}
 	_, wsc, cancel := newTestWebsockets(t, cbs, nil, "name=missingnamespace")
@@ -491,6 +680,29 @@ func TestAutoStartBadOptions(t *testing.T) {
 	cbs.AssertExpectations(t)
 }
 
+func TestAutoStartCustomReadAheadBatch(t *testing.T) {
+	cbs := &eventsmocks.Callbacks{}
+
+	subscribedConn := make(chan string, 1)
+	cbs.On("EphemeralSubscription",
+		mock.MatchedBy(func(s string) bool {
+			subscribedConn <- s
+			return true
+		}),
+		"ns1",
+		mock.Anything,
+		mock.MatchedBy(func(o *core.SubscriptionOptions) bool {
+			return *o.ReadAhead == 42 && *o.BatchTimeout == "1s"
+		}),
+	).Return(nil)
+
+	_, _, cancel := newTestWebsockets(t, cbs, nil, "namespace=ns1", "ephemeral", "batch", "batchtimeout=1s", "readahead=42")
+	defer cancel()
+
+	<-subscribedConn
+
+}
+
 func TestAutoStartBadNamespace(t *testing.T) {
 	cbs := &eventsmocks.Callbacks{}
 	_, wsc, cancel := newTestWebsockets(t, cbs, nil, "ephemeral", "namespace=ns2")
@@ -523,6 +735,30 @@ func TestHandleAckWithAutoAck(t *testing.T) {
 	assert.Regexp(t, "FF10180", err)
 }
 
+func TestHandleBatchNotMatch(t *testing.T) {
+	eventUUID := fftypes.NewUUID()
+	wsc := &websocketConnection{
+		ctx: context.Background(),
+		started: []*websocketStartedSub{{WSStart: core.WSStart{
+			Ephemeral: false, Name: "name1", Namespace: "ns1",
+		}}},
+		sendMessages: make(chan interface{}, 1),
+		inflight: []*core.EventDeliveryResponse{
+			{ID: eventUUID},
+		},
+		inflightBatches: []*core.WSEventBatch{
+			{ID: fftypes.NewUUID()},
+		},
+		autoAck: true,
+	}
+	err := wsc.handleAck(&core.WSAck{
+		ID: eventUUID,
+	})
+	assert.Regexp(t, "FF10180", err)
+	assert.Len(t, wsc.inflight, 1)
+	assert.Len(t, wsc.inflightBatches, 1)
+}
+
 func TestHandleStartFlippingAutoAck(t *testing.T) {
 	eventUUID := fftypes.NewUUID()
 	wsc := &websocketConnection{
@@ -665,6 +901,16 @@ func TestConnectionDispatchAfterClose(t *testing.T) {
 	assert.Regexp(t, "FF00147", err)
 }
 
+func TestConnectionDispatchBatchAfterClose(t *testing.T) {
+	ctx, cancel := context.WithCancel(context.Background())
+	cancel()
+	wsc := &websocketConnection{
+		ctx: ctx,
+	}
+	err := wsc.dispatchBatch(&core.Subscription{}, []*core.CombinedEventDataDelivery{})
+	assert.Regexp(t, "FF00147", err)
+}
+
 func TestWebsocketDispatchAfterClose(t *testing.T) {
 	ws := &WebSockets{
 		ctx:         context.Background(),
@@ -674,6 +920,15 @@ func TestWebsocketDispatchAfterClose(t *testing.T) {
 	assert.Regexp(t, "FF10173", err)
 }
 
+func TestWebsocketBatchDispatchAfterClose(t *testing.T) {
+	ws := &WebSockets{
+		ctx:         context.Background(),
+		connections: make(map[string]*websocketConnection),
+	}
+	err := ws.BatchDeliveryRequest(ws.ctx, "gone", nil, []*core.CombinedEventDataDelivery{})
+	assert.Regexp(t, "FF10173", err)
+}
+
 func TestDispatchAutoAck(t *testing.T) {
 	cbs := &eventsmocks.Callbacks{}
 	cbs.On("DeliveryResponse", mock.Anything, mock.Anything).Return(nil)
@@ -826,21 +1081,6 @@ func TestNamespaceRestartedFailClose(t *testing.T) {
 	mcb.AssertExpectations(t)
 }
 
-func TestEventDeliveryBatchReturnsUnsupported(t *testing.T) {
-	cbs := &eventsmocks.Callbacks{}
-	ws, _, cancel := newTestWebsockets(t, cbs, nil)
-	defer cancel()
-
-	sub := &core.Subscription{
-		SubscriptionRef: core.SubscriptionRef{
-			Namespace: "ns1",
-		},
-	}
-
-	err := ws.BatchDeliveryRequest(ws.ctx, "id", sub, []*core.CombinedEventDataDelivery{})
-	assert.Regexp(t, "FF10461", err)
-}
-
 func TestNamespaceScopedSendWrongNamespaceStartAction(t *testing.T) {
 	cbs := &eventsmocks.Callbacks{}
 	_, wsc, cancel := newTestWebsocketsCommon(t, cbs, nil, "ns1")
@@ -889,23 +1129,22 @@ func TestNamespaceScopedSuccess(t *testing.T) {
 	ws, wsc, cancel := newTestWebsocketsCommon(t, cbs, nil, "ns1")
 	defer cancel()
 	var connID string
-	sub := cbs.On("RegisterConnection",
+	waitSubscribed := make(chan struct{})
+
+	cbs.On("RegisterConnection",
 		mock.MatchedBy(func(s string) bool { connID = s; return true }),
-		mock.MatchedBy(func(subMatch events.SubscriptionMatcher) bool {
-			return subMatch(core.SubscriptionRef{Namespace: "ns1", Name: "sub1"}) &&
-				!subMatch(core.SubscriptionRef{Namespace: "ns2", Name: "sub1"}) &&
-				!subMatch(core.SubscriptionRef{Namespace: "ns1", Name: "sub2"})
-		}),
-	).Return(nil)
+		mock.Anything,
+	).Return(nil).Run(func(args mock.Arguments) {
+		subMatch := args[1].(events.SubscriptionMatcher)
+		assert.True(t, subMatch(core.SubscriptionRef{Namespace: "ns1", Name: "sub1"}))
+		assert.False(t, subMatch(core.SubscriptionRef{Namespace: "ns2", Name: "sub1"}))
+		assert.False(t, subMatch(core.SubscriptionRef{Namespace: "ns1", Name: "sub2"}))
+		close(waitSubscribed)
+	})
 	ack := cbs.On("DeliveryResponse",
 		mock.MatchedBy(func(s string) bool { return s == connID }),
 		mock.Anything).Return(nil)
 
-	waitSubscribed := make(chan struct{})
-	sub.RunFn = func(a mock.Arguments) {
-		close(waitSubscribed)
-	}
-
 	waitAcked := make(chan struct{})
 	ack.RunFn = func(a mock.Arguments) {
 		close(waitAcked)
diff --git a/internal/namespace/configreload.go b/internal/namespace/configreload.go
index 486c111eca..1556fb2f66 100644
--- a/internal/namespace/configreload.go
+++ b/internal/namespace/configreload.go
@@ -61,6 +61,8 @@ func (nm *namespaceManager) configFileChanged() {
 }
 
 func (nm *namespaceManager) configReloaded(ctx context.Context) {
+	// Always make sure log level is up to date
+	log.SetLevel(config.GetString(config.LogLevel))
 
 	// Get Viper to dump the whole new config, with everything resolved across env vars
 	// and the config file etc.
diff --git a/pkg/core/subscription.go b/pkg/core/subscription.go
index 7af6fc85e7..e9e789d1dc 100644
--- a/pkg/core/subscription.go
+++ b/pkg/core/subscription.go
@@ -1,4 +1,4 @@
-// Copyright © 2023 Kaleido, Inc.
+// Copyright © 2024 Kaleido, Inc.
 //
 // SPDX-License-Identifier: Apache-2.0
 //
@@ -88,6 +88,7 @@ const (
 )
 
 // SubscriptionCoreOptions are the core options that apply across all transports
+// REMEMBER TO ADD OPTIONS HERE TO MarshalJSON()
 type SubscriptionCoreOptions struct {
 	FirstEvent   *SubOptsFirstEvent `ffstruct:"SubscriptionCoreOptions" json:"firstEvent,omitempty"`
 	ReadAhead    *uint16            `ffstruct:"SubscriptionCoreOptions" json:"readAhead,omitempty"`
@@ -167,6 +168,12 @@ func (so SubscriptionOptions) MarshalJSON() ([]byte, error) {
 	if so.TLSConfigName != "" {
 		so.additionalOptions["tlsConfigName"] = so.TLSConfigName
 	}
+	if so.Batch != nil {
+		so.additionalOptions["batch"] = so.Batch
+	}
+	if so.BatchTimeout != nil {
+		so.additionalOptions["batchTimeout"] = so.BatchTimeout
+	}
 
 	return json.Marshal(&so.additionalOptions)
 }
diff --git a/pkg/core/subscription_test.go b/pkg/core/subscription_test.go
index 0165dc5939..45d1ebf00c 100644
--- a/pkg/core/subscription_test.go
+++ b/pkg/core/subscription_test.go
@@ -28,12 +28,15 @@ func TestSubscriptionOptionsDatabaseSerialization(t *testing.T) {
 	firstEvent := SubOptsFirstEventNewest
 	readAhead := uint16(50)
 	yes := true
+	oneSec := "1s"
 	sub1 := &Subscription{
 		Options: SubscriptionOptions{
 			SubscriptionCoreOptions: SubscriptionCoreOptions{
-				FirstEvent: &firstEvent,
-				ReadAhead:  &readAhead,
-				WithData:   &yes,
+				FirstEvent:   &firstEvent,
+				ReadAhead:    &readAhead,
+				WithData:     &yes,
+				Batch:        &yes,
+				BatchTimeout: &oneSec,
 			},
 			WebhookSubOptions: WebhookSubOptions{
 				TLSConfigName: "myconfig",
@@ -49,7 +52,18 @@ func TestSubscriptionOptionsDatabaseSerialization(t *testing.T) {
 	// Verify it serializes as bytes to the database
 	b1, err := sub1.Options.Value()
 	assert.NoError(t, err)
-	assert.Equal(t, `{"firstEvent":"newest","my-nested-opts":{"myopt1":12345,"myopt2":"test"},"readAhead":50,"tlsConfigName":"myconfig","withData":true}`, string(b1.([]byte)))
+	assert.JSONEq(t, `{
+		"firstEvent":"newest",
+		"my-nested-opts":{
+			"myopt1":12345,
+			"myopt2":"test"
+		},
+		"readAhead":50,
+		"tlsConfigName":"myconfig",
+		"withData":true,
+		"batch":true,
+		"batchTimeout":"1s"
+	}`, string(b1.([]byte)))
 
 	f1, err := sub1.Filter.Value()
 	assert.NoError(t, err)
diff --git a/pkg/core/websocket_actions.go b/pkg/core/websocket_actions.go
index d02fb9940d..5511489460 100644
--- a/pkg/core/websocket_actions.go
+++ b/pkg/core/websocket_actions.go
@@ -1,4 +1,4 @@
-// Copyright © 2022 Kaleido, Inc.
+// Copyright © 2024 Kaleido, Inc.
 //
 // SPDX-License-Identifier: Apache-2.0
 //
@@ -29,6 +29,9 @@ var (
 
 	// WSProtocolErrorEventType is a special event "type" field for server to send the client, if it performs a ProtocolError
 	WSProtocolErrorEventType = fftypes.FFEnumValue("wstype", "protocol_error")
+
+	// WSEventBatchType is the type set when the message contains an array of events
+	WSEventBatchType = fftypes.FFEnumValue("wstype", "event_batch")
 )
 
 // WSActionBase is the base fields of all client actions sent on the websocket
@@ -61,3 +64,12 @@ type WSError struct {
 	Type  WSClientPayloadType `ffstruct:"WSAck" json:"type" ffenum:"wstype"`
 	Error string              `ffstruct:"WSAck" json:"error"`
 }
+
+// WSEventBatch is used when batched delivery is enabled over the websocket, allowing
+// an array of events to be ack'd as a whole (rather than ack'ing individually)
+type WSEventBatch struct {
+	Type         WSClientPayloadType `ffstruct:"WSEventBatch" json:"type" ffenum:"wstype"`
+	ID           *fftypes.UUID       `ffstruct:"WSEventBatch" json:"id"`
+	Subscription SubscriptionRef     `ffstruct:"WSEventBatch" json:"subscription"`
+	Events       []*EventDelivery    `ffstruct:"WSEventBatch" json:"events"`
+}