Skip to content

Commit

Permalink
Enable contract listeners with multiple filters
Browse files Browse the repository at this point in the history
Signed-off-by: Nicko Guyer <[email protected]>
  • Loading branch information
nguyer committed Oct 12, 2023
1 parent 13bf4e1 commit 05171de
Show file tree
Hide file tree
Showing 16 changed files with 1,463 additions and 341 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
BEGIN;
ALTER TABLE contractlisteners DROP COLUMN filters;
COMMIT;
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
BEGIN;
ALTER TABLE contractlisteners ADD COLUMN filters TEXT;
ALTER TABLE contractlisteners ALTER COLUMN event DROP NOT NULL;
ALTER TABLE contractlisteners ALTER COLUMN location DROP NOT NULL;
COMMIT:
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE contractlisteners DROP COLUMN filters;
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
ALTER TABLE contractlisteners ADD COLUMN filters TEXT;

-- SQLite doesn't support dropping NOT NULL constraints so we have to move
-- everything to a temp colum, create a new column, then copy the data back
ALTER TABLE contractlisteners RENAME COLUMN event TO event_tmp;
ALTER TABLE contractlisteners RENAME COLUMN location TO location_tmp;
ALTER TABLE contractlisteners ADD COLUMN event TEXT;
ALTER TABLE contractlisteners ADD LOCATION event TEXT;
UPDATE contractlisteners SET event = event_tmp;
UPDATE contractlisteners SET location = location_tmp;
ALTER TABLE contractlisteners DROP COLUMN event_tmp;
ALTER TABLE contractlisteners DROP COLUMN location_tmp;
17 changes: 14 additions & 3 deletions docs/reference/types/contractlistener.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,14 @@ nav_order: 10
| Field Name | Description | Type |
|------------|-------------|------|
| `id` | The UUID of the smart contract listener | [`UUID`](simpletypes#uuid) |
| `interface` | A reference to an existing FFI, containing pre-registered type information for the event | [`FFIReference`](#ffireference) |
| `interface` | Deprecated: Please use 'interface' in the array of 'filters' instead | [`FFIReference`](#ffireference) |
| `namespace` | The namespace of the listener, which defines the namespace of all blockchain events detected by this listener | `string` |
| `name` | A descriptive name for the listener | `string` |
| `backendId` | An ID assigned by the blockchain connector to this listener | `string` |
| `location` | A blockchain specific contract identifier. For example an Ethereum contract address, or a Fabric chaincode name and channel | [`JSONAny`](simpletypes#jsonany) |
| `location` | Deprecated: Please use 'location' in the array of 'filters' instead | [`JSONAny`](simpletypes#jsonany) |
| `created` | The creation time of the listener | [`FFTime`](simpletypes#fftime) |
| `event` | The definition of the event, either provided in-line when creating the listener, or extracted from the referenced FFI | [`FFISerializedEvent`](#ffiserializedevent) |
| `event` | Deprecated: Please use 'event' in the array of 'filters' instead | [`FFISerializedEvent`](#ffiserializedevent) |
| `filters` | A list of filters for the contract listener. Each filter is made up of an Event and an optional Location. Events matching these filters will always be emitted in the order determined by the blockchain. | [`ListenerFilter[]`](#listenerfilter) |
| `signature` | The stringified signature of the event, as computed by the blockchain plugin | `string` |
| `topic` | A topic to set on the FireFly event that is emitted each time a blockchain event is detected from the blockchain. Setting this topic on a number of listeners allows applications to easily subscribe to all events they need | `string` |
| `options` | Options that control how the listener subscribes to events from the underlying blockchain | [`ContractListenerOptions`](#contractlisteneroptions) |
Expand Down Expand Up @@ -102,6 +103,16 @@ nav_order: 10



## ListenerFilter

| Field Name | Description | Type |
|------------|-------------|------|
| `event` | The definition of the event, either provided in-line when creating the listener, or extracted from the referenced FFI | [`FFISerializedEvent`](#ffiserializedevent) |
| `location` | A blockchain specific contract identifier. For example an Ethereum contract address, or a Fabric chaincode name and channel | [`JSONAny`](simpletypes#jsonany) |
| `interface` | A reference to an existing FFI, containing pre-registered type information for the event | [`FFIReference`](#ffireference) |
| `signature` | The stringified signature of the event, as computed by the blockchain plugin | `string` |


## ContractListenerOptions

| Field Name | Description | Type |
Expand Down
1,281 changes: 1,071 additions & 210 deletions docs/swagger/swagger.yaml

Large diffs are not rendered by default.

37 changes: 33 additions & 4 deletions internal/blockchain/ethereum/ethereum.go
Original file line number Diff line number Diff line change
Expand Up @@ -881,17 +881,46 @@ func (e *Ethereum) AddContractListener(ctx context.Context, listener *core.Contr
return err
}
}
abi, err := ffi2abi.ConvertFFIEventDefinitionToABI(ctx, &listener.Event.FFIEventDefinition)
if err != nil {
return i18n.WrapError(ctx, err, coremsgs.MsgContractParamInvalid)

filters := make([]*filter, 0)
if listener.Event != nil {
abi, err := ffi2abi.ConvertFFIEventDefinitionToABI(ctx, &listener.Event.FFIEventDefinition)
if err != nil {
return i18n.WrapError(ctx, err, coremsgs.MsgContractParamInvalid)
}
evmFilter := &filter{
Event: abi,
}
if location != nil {
evmFilter.Address = location.Address
}
filters = append(filters, evmFilter)
} else {
for _, f := range listener.Filters {
abi, err := ffi2abi.ConvertFFIEventDefinitionToABI(ctx, &f.Event.FFIEventDefinition)
if err != nil {
return i18n.WrapError(ctx, err, coremsgs.MsgContractParamInvalid)
}
evmFilter := &filter{
Event: abi,
}
if f.Location != nil {
location, err = e.parseContractLocation(ctx, f.Location)
if err != nil {
return err
}
evmFilter.Address = location.Address
}
filters = append(filters, evmFilter)
}
}

subName := fmt.Sprintf("ff-sub-%s-%s", listener.Namespace, listener.ID)
firstEvent := string(core.SubOptsFirstEventNewest)
if listener.Options != nil {
firstEvent = listener.Options.FirstEvent
}
result, err := e.streams.createSubscription(ctx, location, e.streamID, subName, firstEvent, abi)
result, err := e.streams.createSubscription(ctx, e.streamID, subName, firstEvent, filters)
if err != nil {
return err
}
Expand Down
119 changes: 118 additions & 1 deletion internal/blockchain/ethereum/ethereum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1985,6 +1985,98 @@ func TestAddSubscriptionWithoutLocation(t *testing.T) {
assert.NoError(t, err)
}

func TestAddSubscriptionMultipleFilters(t *testing.T) {
e, cancel := newTestEthereum()
defer cancel()
httpmock.ActivateNonDefault(e.client.GetClient())
defer httpmock.DeactivateAndReset()
e.streamID = "es-1"
e.streams = &streamManager{
client: e.client,
}

sub := &core.ContractListener{
Filters: core.ListenerFilters{
{
Event: &core.FFISerializedEvent{
FFIEventDefinition: fftypes.FFIEventDefinition{
Name: "Changed",
Params: fftypes.FFIParams{
{
Name: "value",
Schema: fftypes.JSONAnyPtr(`{"type": "string", "details": {"type": "string"}}`),
},
},
},
},
},
{
Event: &core.FFISerializedEvent{
FFIEventDefinition: fftypes.FFIEventDefinition{
Name: "Changed2",
Params: fftypes.FFIParams{
{
Name: "value2",
Schema: fftypes.JSONAnyPtr(`{"type": "string", "details": {"type": "string"}}`),
},
},
},
},
Location: fftypes.JSONAnyPtr(`{"address":"0x1234"}`),
},
},
Options: &core.ContractListenerOptions{
FirstEvent: string(core.SubOptsFirstEventOldest),
},
}

httpmock.RegisterResponder("POST", `http://localhost:12345/subscriptions`,
httpmock.NewJsonResponderOrPanic(200, &subscription{}))

err := e.AddContractListener(context.Background(), sub)

assert.NoError(t, err)
}

func TestAddSubscriptionInvalidAbi(t *testing.T) {
e, cancel := newTestEthereum()
defer cancel()
httpmock.ActivateNonDefault(e.client.GetClient())
defer httpmock.DeactivateAndReset()
e.streamID = "es-1"
e.streams = &streamManager{
client: e.client,
}

sub := &core.ContractListener{
Filters: core.ListenerFilters{
{
Location: fftypes.JSONAnyPtr(fftypes.JSONObject{
"address": "0x123",
}.String()),
Event: &core.FFISerializedEvent{
FFIEventDefinition: fftypes.FFIEventDefinition{
Name: "Changed",
Params: fftypes.FFIParams{
{
Name: "value",
Schema: fftypes.JSONAnyPtr(`"not an abi"`),
},
},
},
},
},
},
}

httpmock.RegisterResponder("POST", `http://localhost:12345/subscriptions`,
httpmock.NewJsonResponderOrPanic(200, &subscription{}))

err := e.AddContractListener(context.Background(), sub)

assert.Regexp(t, "FF10311", err)
}

func TestAddSubscriptionBadParamDetails(t *testing.T) {
e, cancel := newTestEthereum()
defer cancel()
Expand Down Expand Up @@ -2020,7 +2112,7 @@ func TestAddSubscriptionBadParamDetails(t *testing.T) {
assert.Regexp(t, "FF10311", err)
}

func TestAddSubscriptionBadLocation(t *testing.T) {
func TestAddLegacySubscriptionBadLocation(t *testing.T) {
e, cancel := newTestEthereum()
defer cancel()
httpmock.ActivateNonDefault(e.client.GetClient())
Expand All @@ -2041,6 +2133,31 @@ func TestAddSubscriptionBadLocation(t *testing.T) {
assert.Regexp(t, "FF10310", err)
}

func TestAddSubscriptionBadLocation(t *testing.T) {
e, cancel := newTestEthereum()
defer cancel()
httpmock.ActivateNonDefault(e.client.GetClient())
defer httpmock.DeactivateAndReset()

e.streamID = "es-1"
e.streams = &streamManager{
client: e.client,
}

sub := &core.ContractListener{
Filters: core.ListenerFilters{
{
Location: fftypes.JSONAnyPtr(""),
Event: &core.FFISerializedEvent{},
},
},
}

err := e.AddContractListener(context.Background(), sub)

assert.Regexp(t, "FF10310", err)
}

func TestAddSubscriptionFail(t *testing.T) {
e, cancel := newTestEthereum()
defer cancel()
Expand Down
42 changes: 24 additions & 18 deletions internal/blockchain/ethereum/eventstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (

"github.com/go-resty/resty/v2"
"github.com/hyperledger/firefly-common/pkg/ffresty"
"github.com/hyperledger/firefly-common/pkg/fftypes"
"github.com/hyperledger/firefly-common/pkg/i18n"
"github.com/hyperledger/firefly-common/pkg/log"
"github.com/hyperledger/firefly-signer/pkg/abi"
Expand Down Expand Up @@ -52,16 +51,21 @@ type eventStream struct {
}

type subscription struct {
ID string `json:"id"`
Name string `json:"name,omitempty"`
Stream string `json:"stream"`
FromBlock string `json:"fromBlock"`
EthCompatAddress string `json:"address,omitempty"`
EthCompatEvent *abi.Entry `json:"event,omitempty"`
Filters []fftypes.JSONAny `json:"filters"`
ID string `json:"id"`
Name string `json:"name,omitempty"`
Stream string `json:"stream"`
FromBlock string `json:"fromBlock"`
EthCompatAddress string `json:"address,omitempty"`
EthCompatEvent *abi.Entry `json:"event,omitempty"`
Filters []*filter `json:"filters"`
subscriptionCheckpoint
}

type filter struct {
Event *abi.Entry `json:"event"`
Address string `json:"address"`
}

type subscriptionCheckpoint struct {
Checkpoint ListenerCheckpoint `json:"checkpoint,omitempty"`
Catchup bool `json:"catchup,omitempty"`
Expand Down Expand Up @@ -182,7 +186,7 @@ func (s *streamManager) getSubscriptionName(ctx context.Context, subID string) (
return sub.Name, nil
}

func (s *streamManager) createSubscription(ctx context.Context, location *Location, stream, subName, firstEvent string, abi *abi.Entry) (*subscription, error) {
func (s *streamManager) createSubscription(ctx context.Context, stream, subName, firstEvent string, filters []*filter) (*subscription, error) {
// Map FireFly "firstEvent" values to Ethereum "fromBlock" values
switch firstEvent {
case string(core.SubOptsFirstEventOldest):
Expand All @@ -191,14 +195,10 @@ func (s *streamManager) createSubscription(ctx context.Context, location *Locati
firstEvent = "latest"
}
sub := subscription{
Name: subName,
Stream: stream,
FromBlock: firstEvent,
EthCompatEvent: abi,
}

if location != nil {
sub.EthCompatAddress = location.Address
Name: subName,
Stream: stream,
FromBlock: firstEvent,
Filters: filters,
}

res, err := s.client.R().
Expand Down Expand Up @@ -267,7 +267,13 @@ func (s *streamManager) ensureFireFlySubscription(ctx context.Context, namespace
name = v1Name
}
location := &Location{Address: instancePath}
if sub, err = s.createSubscription(ctx, location, stream, name, firstEvent, abi); err != nil {
filters := []*filter{
{
Event: abi,
Address: location.Address,
},
}
if sub, err = s.createSubscription(ctx, stream, name, firstEvent, filters); err != nil {
return nil, err
}
log.L(ctx).Infof("%s subscription: %s", abi.Name, sub.ID)
Expand Down
Loading

0 comments on commit 05171de

Please sign in to comment.