Skip to content

Commit

Permalink
feat: removed callbacks based subscription system (#33)
Browse files Browse the repository at this point in the history
  • Loading branch information
glouvigny authored Mar 3, 2020
1 parent 90e6210 commit c507f70
Show file tree
Hide file tree
Showing 17 changed files with 504 additions and 328 deletions.
16 changes: 9 additions & 7 deletions accesscontroller/orbitdb/accesscontroller_orbitdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,12 +190,14 @@ func (o *orbitDBAccessController) Load(ctx context.Context, address string) erro

o.kvStore = store

go o.kvStore.Subscribe(ctx, func(e events.Event) {
switch e.(type) {
case stores.EventReady, stores.EventWrite, stores.EventReplicated:
o.onUpdate()
go func() {
for e := range o.kvStore.Subscribe(ctx) {
switch e.(type) {
case stores.EventReady, stores.EventWrite, stores.EventReplicated:
o.onUpdate(ctx)
}
}
})
}()

err = o.kvStore.Load(ctx, -1)
if err != nil {
Expand All @@ -217,8 +219,8 @@ func (o *orbitDBAccessController) Close() error {
return nil
}

func (o *orbitDBAccessController) onUpdate() {
o.Emit(&EventUpdated{})
func (o *orbitDBAccessController) onUpdate(ctx context.Context) {
o.Emit(ctx, &EventUpdated{})
}

// NewIPFSAccessController Returns a default access controller for OrbitDB database
Expand Down
191 changes: 94 additions & 97 deletions baseorbitdb/orbitdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"berty.tech/go-orbit-db/address"
"berty.tech/go-orbit-db/cache"
"berty.tech/go-orbit-db/cache/cacheleveldown"
"berty.tech/go-orbit-db/events"
"berty.tech/go-orbit-db/iface"
_ "berty.tech/go-orbit-db/internal/buildconstraints" // fail for bad go version
"berty.tech/go-orbit-db/pubsub"
Expand Down Expand Up @@ -778,92 +777,90 @@ func (o *orbitDB) onClose(ctx context.Context, addr cid.Cid) error {
}

func (o *orbitDB) storeListener(ctx context.Context, store Store) {
go store.Subscribe(ctx, func(evt events.Event) {
switch evt.(type) {
case *stores.EventClosed:
logger().Debug("received stores.close event")
go func() {
for evt := range store.Subscribe(ctx) {
switch e := evt.(type) {
case *stores.EventWrite:
logger().Debug("received stores.write event")
if len(e.Heads) == 0 {
logger().Debug(fmt.Sprintf("'heads' are not defined"))
continue
}

e := evt.(*stores.EventClosed)
err := o.onClose(ctx, e.Address.GetRoot())
logger().Debug(fmt.Sprintf("unable to perform onClose %v", err))
if ps := o.PubSub(); ps != nil {
headsBytes, err := json.Marshal(e.Heads)
if err != nil {
logger().Debug(fmt.Sprintf("unable to serialize heads %v", err))
continue
}

case *stores.EventWrite:
logger().Debug("received stores.write event")
e := evt.(*stores.EventWrite)
if len(e.Heads) == 0 {
logger().Debug(fmt.Sprintf("'heads' are not defined"))
return
}
err = ps.Publish(ctx, e.Address.String(), headsBytes)
if err != nil {
logger().Debug(fmt.Sprintf("unable to publish message on pubsub %v", err))
continue
}

if ps := o.PubSub(); ps != nil {
headsBytes, err := json.Marshal(e.Heads)
if err != nil {
logger().Debug(fmt.Sprintf("unable to serialize heads %v", err))
return
logger().Debug("stores.write event: published event on pub sub")
}
}
}

err = ps.Publish(ctx, e.Address.String(), headsBytes)
if err != nil {
logger().Debug(fmt.Sprintf("unable to publish message on pubsub %v", err))
return
}
logger().Debug("received stores.close event")

logger().Debug("stores.write event: published event on pub sub")
}
if err := o.onClose(ctx, store.Address().GetRoot()); err != nil {
logger().Debug(fmt.Sprintf("unable to perform onClose %v", err))
}
})
}()
}

func (o *orbitDB) pubSubChanListener(ctx context.Context, sub pubsub.Subscription, addr address.Address) {
go sub.Subscribe(ctx, func(e events.Event) {
logger().Debug("Got pub sub message")
switch e.(type) {
case *pubsub.MessageEvent:
evt := e.(*pubsub.MessageEvent)
func (o *orbitDB) pubSubChanListener(ctx context.Context, ps pubsub.Subscription, addr address.Address) {
go func() {
for e := range ps.Subscribe(ctx) {
logger().Debug("Got pub sub message")
switch evt := e.(type) {
case *pubsub.MessageEvent:
addr := evt.Topic
store, ok := o.getStore(addr)

addr := evt.Topic
store, ok := o.getStore(addr)

if !ok {
logger().Error(fmt.Sprintf("unable to find store for address %s", addr))
return
}
if !ok {
logger().Error(fmt.Sprintf("unable to find store for address %s", addr))
continue
}

headsEntriesBytes := evt.Content
var headsEntries []*entry.Entry
headsEntriesBytes := evt.Content
var headsEntries []*entry.Entry

err := json.Unmarshal(headsEntriesBytes, &headsEntries)
if err != nil {
logger().Error("unable to unmarshal head entries")
}
err := json.Unmarshal(headsEntriesBytes, &headsEntries)
if err != nil {
logger().Error("unable to unmarshal head entries")
}

if len(headsEntries) == 0 {
logger().Debug(fmt.Sprintf("Nothing to synchronize for %s:", addr))
}
if len(headsEntries) == 0 {
logger().Debug(fmt.Sprintf("Nothing to synchronize for %s:", addr))
}

logger().Debug(fmt.Sprintf("Received %d heads for %s:", len(headsEntries), addr))
logger().Debug(fmt.Sprintf("Received %d heads for %s:", len(headsEntries), addr))

entries := make([]ipfslog.Entry, len(headsEntries))
for i := range headsEntries {
entries[i] = headsEntries[i]
}
entries := make([]ipfslog.Entry, len(headsEntries))
for i := range headsEntries {
entries[i] = headsEntries[i]
}

if err := store.Sync(ctx, entries); err != nil {
logger().Debug(fmt.Sprintf("Error while syncing heads for %s:", addr))
}
case *peermonitor.EventPeerJoin:
evt := e.(*peermonitor.EventPeerJoin)
o.onNewPeerJoined(ctx, evt.Peer, addr)
logger().Debug(fmt.Sprintf("peer %s joined from %s self is %s", evt.Peer.String(), addr, o.PeerID()))
if err := store.Sync(ctx, entries); err != nil {
logger().Debug(fmt.Sprintf("Error while syncing heads for %s:", addr))
}
case *peermonitor.EventPeerJoin:
o.onNewPeerJoined(ctx, evt.Peer, addr)
logger().Debug(fmt.Sprintf("peer %s joined from %s self is %s", evt.Peer.String(), addr, o.PeerID()))

case *peermonitor.EventPeerLeave:
evt := e.(*peermonitor.EventPeerLeave)
logger().Debug(fmt.Sprintf("peer %s left from %s self is %s", evt.Peer.String(), addr, o.PeerID()))
case *peermonitor.EventPeerLeave:
logger().Debug(fmt.Sprintf("peer %s left from %s self is %s", evt.Peer.String(), addr, o.PeerID()))

default:
logger().Debug("unhandled event, can't match type")
default:
logger().Debug("unhandled event, can't match type")
}
}
})
}()
}

func (o *orbitDB) onNewPeerJoined(ctx context.Context, p p2pcore.PeerID, addr address.Address) {
Expand All @@ -888,7 +885,7 @@ func (o *orbitDB) onNewPeerJoined(ctx context.Context, p p2pcore.PeerID, addr ad
return
}

store.Emit(stores.NewEventNewPeer(p))
store.Emit(ctx, stores.NewEventNewPeer(p))
}

func (o *orbitDB) exchangeHeads(ctx context.Context, p p2pcore.PeerID, addr address.Address) (oneonone.Channel, error) {
Expand Down Expand Up @@ -942,42 +939,42 @@ func (o *orbitDB) exchangeHeads(ctx context.Context, p p2pcore.PeerID, addr addr
}

func (o *orbitDB) watchOneOnOneMessage(ctx context.Context, channel oneonone.Channel) {
go channel.Subscribe(ctx, func(evt events.Event) {
logger().Debug("received one on one message")

switch evt.(type) {
case *oneonone.EventMessage:
e := evt.(*oneonone.EventMessage)
go func() {
for evt := range channel.Subscribe(ctx) {
logger().Debug("received one on one message")

switch e := evt.(type) {
case *oneonone.EventMessage:
heads := &exchangedHeads{}
err := json.Unmarshal(e.Payload, &heads)
if err != nil {
logger().Error("unable to unmarshal heads", zap.Error(err))
}

heads := &exchangedHeads{}
err := json.Unmarshal(e.Payload, &heads)
if err != nil {
logger().Error("unable to unmarshal heads", zap.Error(err))
}
logger().Debug(fmt.Sprintf("%s: Received %d heads for '%s':", o.PeerID().String(), len(heads.Heads), heads.Address))
store, ok := o.getStore(heads.Address)

logger().Debug(fmt.Sprintf("%s: Received %d heads for '%s':", o.PeerID().String(), len(heads.Heads), heads.Address))
store, ok := o.getStore(heads.Address)
if !ok {
logger().Debug("Heads from unknown store, skipping")
return
}

if !ok {
logger().Debug("Heads from unknown store, skipping")
return
}
if len(heads.Heads) > 0 {
untypedHeads := make([]ipfslog.Entry, len(heads.Heads))
for i := range heads.Heads {
untypedHeads[i] = heads.Heads[i]
}

if len(heads.Heads) > 0 {
untypedHeads := make([]ipfslog.Entry, len(heads.Heads))
for i := range heads.Heads {
untypedHeads[i] = heads.Heads[i]
if err := store.Sync(ctx, untypedHeads); err != nil {
logger().Error("unable to sync heads", zap.Error(err))
}
}

if err := store.Sync(ctx, untypedHeads); err != nil {
logger().Error("unable to sync heads", zap.Error(err))
}
default:
logger().Debug("unhandled event type")
}

default:
logger().Debug("unhandled event type")
}
})
}()
}

var _ BaseOrbitDB = &orbitDB{}
Loading

0 comments on commit c507f70

Please sign in to comment.