Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PR 1424 and 1447 #100

Merged
merged 10 commits into from
Jan 19, 2024
Prev Previous commit
Ensure nacks sent for whole batch
Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
peterbroadhurst committed Jan 19, 2024
commit 71762c9f21c7d5e52b5fc3bab116cc91d7b5e045
39 changes: 25 additions & 14 deletions internal/events/event_dispatcher.go
Original file line number Diff line number Diff line change
@@ -398,7 +398,6 @@ func (ed *eventDispatcher) handleAckOffsetUpdate(ack ackNack) {
}
}

// TODO issue here, we can't just call DeliveryRequest with one thing.
func (ed *eventDispatcher) deliverEvents() {
withData := ed.subscription.definition.Options.WithData != nil && *ed.subscription.definition.Options.WithData
for {
@@ -410,30 +409,42 @@ func (ed *eventDispatcher) deliverEvents() {

// As soon as we hit an error, we need to trigger into nack mode
var err error

// Loop through the events enriching them, and dispatching individually in non-batch mode
eventsWithData := make([]*core.CombinedEventDataDelivery, len(events))
for i := 0; i < len(events) && err == nil; i++ {
for i := 0; i < len(events); i++ {
e := &core.CombinedEventDataDelivery{
Event: events[i],
}
eventsWithData[i] = e
log.L(ed.ctx).Debugf("Dispatching %s event: %.10d/%s [%s]: ref=%s/%s", ed.transport.Name(), e.Event.Sequence, e.Event.ID, e.Event.Type, e.Event.Namespace, e.Event.Reference)
if withData && e.Event.Message != nil {
e.Data, _, err = ed.data.GetMessageDataCached(ed.ctx, e.Event.Message)
}
// Individual events (in reality there is only ever i==0 for this case)
if err == nil && !ed.batch {
err = ed.transport.DeliveryRequest(ed.ctx, ed.connID, ed.subscription.definition, e.Event, e.Data)
// The first error we encounter stops us attempting to enrich or dispatch any more events
if err == nil {
log.L(ed.ctx).Debugf("Dispatching %s event: %.10d/%s [%s]: ref=%s/%s", ed.transport.Name(), e.Event.Sequence, e.Event.ID, e.Event.Type, e.Event.Namespace, e.Event.Reference)
if withData && e.Event.Message != nil {
e.Data, _, err = ed.data.GetMessageDataCached(ed.ctx, e.Event.Message)
}
}
if err != nil {
ed.deliveryResponse(&core.EventDeliveryResponse{ID: e.Event.ID, Rejected: true})
// If we are non-batched, we have to deliver each event individually...
if !ed.batch {
// .. only attempt to deliver if we've not triggered into an error scenario for one of the events already
if err == nil {
err = ed.transport.DeliveryRequest(ed.ctx, ed.connID, ed.subscription.definition, e.Event, e.Data)
}
// ... if we've triggered into an error scenario, we need to nack immediately for this and all the rest of the events
if err != nil {
ed.deliveryResponse(&core.EventDeliveryResponse{ID: e.Event.ID, Rejected: true})
}
}
}

// In batch mode we do one dispatch of the whole set as one
if err == nil && ed.batch {
err = ed.transport.BatchDeliveryRequest(ed.ctx, ed.connID, ed.subscription.definition, eventsWithData)
if ed.batch {
// Only attempt to deliver if we're in a non error case (enrich might have failed above)
if err == nil {
err = ed.transport.BatchDeliveryRequest(ed.ctx, ed.connID, ed.subscription.definition, eventsWithData)
}
// If we're in an error case we have to nack everything immediately
if err != nil {
// nack everything on behalf of the failed delivery
for _, e := range events {
ed.deliveryResponse(&core.EventDeliveryResponse{ID: e.Event.ID, Rejected: true})
}