Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor maybePersistBlockchainEvent to Return a Boolean Indicating Event Creation Status #1559

Merged
merged 4 commits into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions internal/events/blockchain_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,20 +71,21 @@ func (em *eventManager) getChainListenerByProtocolIDCached(ctx context.Context,
return l, nil
}

func (em *eventManager) maybePersistBlockchainEvent(ctx context.Context, chainEvent *core.BlockchainEvent, listener *core.ContractListener) error {
// handleBlockchainBatchPinEvent handles a blockchain event, returning true if the event was created, false if it was a duplicate along with an error if any failures occur
func (em *eventManager) maybePersistBlockchainEvent(ctx context.Context, chainEvent *core.BlockchainEvent, listener *core.ContractListener) (bool, error) {
existing, err := em.txHelper.InsertOrGetBlockchainEvent(ctx, chainEvent)
if err != nil {
return err
return false, err
}
if existing != nil {
log.L(ctx).Debugf("Ignoring duplicate blockchain event %s", chainEvent.ProtocolID)
// Return the ID of the existing event
chainEvent.ID = existing.ID
return nil
return false, nil
}
topic := em.getTopicForChainListener(listener)
ffEvent := core.NewEvent(core.EventTypeBlockchainEventReceived, chainEvent.Namespace, chainEvent.ID, chainEvent.TX.ID, topic)
return em.database.InsertEvent(ctx, ffEvent)
return true, em.database.InsertEvent(ctx, ffEvent)
}

func (em *eventManager) getChainListenerCached(cacheKey string, getter func() (*core.ContractListener, error)) (*core.ContractListener, error) {
Expand Down
4 changes: 3 additions & 1 deletion internal/events/blockchain_event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ func TestContractEventWrongNS(t *testing.T) {

}

// TODO: Add test case for event not existing
dwertent marked this conversation as resolved.
Show resolved Hide resolved
func TestPersistBlockchainEventDuplicate(t *testing.T) {
em := newTestEventManager(t)
defer em.cleanup(t)
Expand All @@ -173,9 +174,10 @@ func TestPersistBlockchainEventDuplicate(t *testing.T) {
em.mth.On("InsertOrGetBlockchainEvent", mock.Anything, ev).
Return(&core.BlockchainEvent{ID: existingID}, nil)

err := em.maybePersistBlockchainEvent(em.ctx, ev, nil)
created, err := em.maybePersistBlockchainEvent(em.ctx, ev, nil)
assert.NoError(t, err)
assert.Equal(t, existingID, ev.ID)
assert.False(t, created)

}

Expand Down
2 changes: 1 addition & 1 deletion internal/events/event_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -681,7 +681,7 @@ func TestEventFilterOnSubscriptionMatchesEventType(t *testing.T) {
filteredEvents, _ = em.FilterHistoricalEventsOnSubscription(context.Background(), events, subscription)
assert.NotNil(t, filteredEvents)
assert.Equal(t, 1, len(filteredEvents))

listenerUuid := fftypes.NewUUID()

events[0].Event.Topic = ""
Expand Down
7 changes: 5 additions & 2 deletions internal/events/token_pool_created.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,13 @@ func (em *eventManager) confirmPool(ctx context.Context, pool *core.TokenPool, e
Type: pool.TX.Type,
BlockchainID: blockchainID,
})
if err := em.maybePersistBlockchainEvent(ctx, chainEvent, nil); err != nil {
created, err := em.maybePersistBlockchainEvent(ctx, chainEvent, nil)
if err != nil {
return err
}
em.emitBlockchainEventMetric(ev)
if created {
em.emitBlockchainEventMetric(ev)
}
}
if _, err := em.txHelper.PersistTransaction(ctx, pool.TX.ID, pool.TX.Type, blockchainID); err != nil {
return err
Expand Down
9 changes: 6 additions & 3 deletions internal/events/tokens_approved.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -97,10 +97,13 @@ func (em *eventManager) persistTokenApproval(ctx context.Context, approval *toke
Type: approval.TX.Type,
BlockchainID: approval.Event.BlockchainTXID,
})
if err := em.maybePersistBlockchainEvent(ctx, chainEvent, nil); err != nil {
created, err := em.maybePersistBlockchainEvent(ctx, chainEvent, nil)
if err != nil {
return false, err
}
em.emitBlockchainEventMetric(approval.Event)
if created {
em.emitBlockchainEventMetric(approval.Event)
}
approval.BlockchainEvent = chainEvent.ID

fb := database.TokenApprovalQueryFactory.NewFilter(ctx)
Expand Down
9 changes: 6 additions & 3 deletions internal/events/tokens_transferred.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -89,10 +89,13 @@ func (em *eventManager) persistTokenTransfer(ctx context.Context, transfer *toke
Type: transfer.TX.Type,
BlockchainID: transfer.Event.BlockchainTXID,
})
if err := em.maybePersistBlockchainEvent(ctx, chainEvent, nil); err != nil {
created, err := em.maybePersistBlockchainEvent(ctx, chainEvent, nil)
if err != nil {
return false, err
}
em.emitBlockchainEventMetric(transfer.Event)
if created {
em.emitBlockchainEventMetric(transfer.Event)
}
transfer.BlockchainEvent = chainEvent.ID

// This is a no-op if we've already persisted this token transfer
Expand Down
Loading