From 7ab1b8048fcbaa7c8457ade1a8e4c082f0081d59 Mon Sep 17 00:00:00 2001 From: gfanton <8671905+gfanton@users.noreply.github.com> Date: Thu, 4 May 2023 09:13:24 +0200 Subject: [PATCH] feat: add eventbus subscriber name Signed-off-by: gfanton <8671905+gfanton@users.noreply.github.com> --- accesscontroller/orbitdb/accesscontroller_orbitdb.go | 3 ++- baseorbitdb/orbitdb.go | 3 ++- stores/basestore/base_store.go | 5 +++-- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/accesscontroller/orbitdb/accesscontroller_orbitdb.go b/accesscontroller/orbitdb/accesscontroller_orbitdb.go index a976873..69b506f 100644 --- a/accesscontroller/orbitdb/accesscontroller_orbitdb.go +++ b/accesscontroller/orbitdb/accesscontroller_orbitdb.go @@ -16,6 +16,7 @@ import ( cid "github.com/ipfs/go-cid" "github.com/libp2p/go-libp2p/core/event" + "github.com/libp2p/go-libp2p/p2p/host/eventbus" "go.uber.org/zap" ) @@ -217,7 +218,7 @@ func (o *orbitDBAccessController) Load(ctx context.Context, address string) erro new(stores.EventWrite), new(stores.EventReady), new(stores.EventReplicated), - }) + }, eventbus.Name("odb/load")) if err != nil { return fmt.Errorf("unable subscribe to store events: %w", err) } diff --git a/baseorbitdb/orbitdb.go b/baseorbitdb/orbitdb.go index 98b85fc..5461242 100644 --- a/baseorbitdb/orbitdb.go +++ b/baseorbitdb/orbitdb.go @@ -813,7 +813,8 @@ func (o *orbitDB) EventBus() event.Bus { } func (o *orbitDB) monitorDirectChannel(ctx context.Context, bus event.Bus) error { - sub, err := bus.Subscribe(new(iface.EventPubSubPayload), eventbus.BufSize(128)) + sub, err := bus.Subscribe(new(iface.EventPubSubPayload), + eventbus.BufSize(128), eventbus.Name("odb/monitor-direct-channel")) if err != nil { return fmt.Errorf("unable to init pubsub subscriber: %w", err) } diff --git a/stores/basestore/base_store.go b/stores/basestore/base_store.go index 893c043..45880b8 100644 --- a/stores/basestore/base_store.go +++ b/stores/basestore/base_store.go @@ -268,7 +268,8 @@ func (b *BaseStore) InitBaseStore(ipfs coreapi.CoreAPI, identity *identityprovid b.options = options - sub, err := b.replicator.EventBus().Subscribe(replicator.Events, eventbus.BufSize(128)) + sub, err := b.replicator.EventBus().Subscribe(replicator.Events, + eventbus.BufSize(128), eventbus.Name("odb/base-store-main-loop")) if err != nil { return fmt.Errorf("unable to subscribe to replicator events: %w", err) } @@ -1008,7 +1009,7 @@ func (b *BaseStore) replicate() error { } func (b *BaseStore) storeListener(topic iface.PubSubTopic) error { - sub, err := b.EventBus().Subscribe(new(stores.EventWrite)) + sub, err := b.EventBus().Subscribe(new(stores.EventWrite), eventbus.Name("odb/store-listener")) if err != nil { return fmt.Errorf("unable to init event bus: %w", err) }