Skip to content

Commit

Permalink
Merge pull request #84 from gfanton/fix/directchannel
Browse files Browse the repository at this point in the history
  • Loading branch information
gfanton authored Dec 9, 2021
2 parents c8ecbcc + 9d17cd8 commit 028fc94
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 146 deletions.
227 changes: 83 additions & 144 deletions pubsub/directchannel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,42 +6,36 @@ import (
"fmt"
"io"
"math"
"strings"
"sync"
"time"

p2pcore "github.com/libp2p/go-libp2p-core"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/protocol"
"go.uber.org/zap"

"berty.tech/go-orbit-db/events"
"berty.tech/go-orbit-db/iface"
"berty.tech/go-orbit-db/pubsub"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"go.uber.org/zap"
)

const PROTOCOL = "/go-orbit-db/ipfs-direct-channel/1.0.0"

// Channel Channel is a pubsub used for a direct communication between peers
// new messages are received via events
const PROTOCOL = "/go-orbit-db/direct-channel/1.1.0"

type channel struct {
type directChannel struct {
events.EventEmitter
receiverID p2pcore.PeerID
logger *zap.Logger
holder *channelHolder
stream io.Writer
muStream sync.Mutex

logger *zap.Logger
host host.Host
peer peer.ID
}

func (c *channel) Send(ctx context.Context, bytes []byte) error {
c.muStream.Lock()
stream := c.stream
c.muStream.Unlock()
// Send Sends a message to the other peer
func (d *directChannel) Send(ctx context.Context, bytes []byte) error {
stream, err := d.host.NewStream(ctx, d.peer, PROTOCOL)
if err != nil {
return fmt.Errorf("unable to create stream: %w", err)
}

if stream == nil {
return fmt.Errorf("stream is not opened")
if len(bytes) > math.MaxUint16 {
return fmt.Errorf("payload is too large")
}

if len(bytes) > math.MaxUint16 {
Expand All @@ -55,164 +49,109 @@ func (c *channel) Send(ctx context.Context, bytes []byte) error {
return err
}

_, err := stream.Write(bytes)
_, err = stream.Write(bytes)
return err
}

func (c *channel) Close() error {
c.muStream.Lock()
c.stream = nil
c.muStream.Unlock()

return nil
}

func (c *channel) Connect(ctx context.Context) error {
var (
err error
stream network.Stream
id protocol.ID
)

if strings.Compare(c.holder.host.ID().String(), c.receiverID.String()) < 0 {
ctx, cancel := context.WithTimeout(ctx, time.Second*5)
id = c.holder.hostProtocolID(c.receiverID)
c.holder.muExpected.Lock()
streamChan := c.holder.expectedPID[id]
c.holder.muExpected.Unlock()

select {
case stream = <-streamChan:
// nothing else to do, stream is acquired
case <-ctx.Done():
cancel()
return fmt.Errorf("unable to create stream, err: %w", ctx.Err())
}

cancel()
} else {
id = c.holder.hostProtocolID(c.holder.host.ID())
stream, err = c.holder.host.NewStream(ctx, c.receiverID, id)
if err != nil {
return err
}
}

c.muStream.Lock()
c.stream = stream
c.muStream.Unlock()

go func() {
for {
if err := c.incomingEvent(ctx, stream); err == io.EOF {
return
} else if err != nil {
c.logger.Error("error while receiving event", zap.Error(err))
return
}
}
}()

return nil
}

func (c *channel) incomingEvent(ctx context.Context, stream network.Stream) error {
func (d *directChannel) handleNewPeer(s network.Stream) {
b := make([]byte, 2)
defer func() {
_ = s.Reset()
}()

if _, err := stream.Read(b); err != nil {
if _, err := s.Read(b); err != nil {
if err == io.EOF {
return err
return
}

c.logger.Error("unable to read", zap.Error(err))
return err
d.logger.Error("unable to read", zap.Error(err))
return
}

data := make([]byte, binary.LittleEndian.Uint16(b))
_, err := stream.Read(data)
_, err := s.Read(data)
if err != nil {
if err == io.EOF {
return err
return
}

c.logger.Error("unable to read", zap.Error(err))
return err
d.logger.Error("unable to read", zap.Error(err))
return
}

c.Emit(ctx, pubsub.NewEventPayload(data))
d.Emit(context.Background(), pubsub.NewEventPayload(data))
}

// @NOTE(gfanton): we dont need this on direct channel
// Connect Waits for the other peer to be connected
func (d *directChannel) Connect(ctx context.Context) (err error) {
return nil
}

type channelHolder struct {
expectedPID map[protocol.ID]chan network.Stream
host host.Host
muExpected sync.Mutex
// @NOTE(gfanton): we dont need this on direct channel
// Close Closes the connection
func (d *directChannel) Close() error {
return nil
}

func (h *channelHolder) NewChannel(ctx context.Context, receiver p2pcore.PeerID, opts *iface.DirectChannelOptions) (iface.DirectChannel, error) {
if opts == nil {
opts = &iface.DirectChannelOptions{}
}
type holderChannels struct {
muChannels sync.Mutex
channels map[peer.ID]*directChannel
host host.Host
logger *zap.Logger
}

if opts.Logger == nil {
opts.Logger = zap.NewNop()
}
func (c *holderChannels) incomingConnHandler(s network.Stream) {
remotepeer := s.Conn().RemotePeer()

ch := &channel{
receiverID: receiver,
logger: opts.Logger,
holder: h,
}

if strings.Compare(h.host.ID().String(), receiver.String()) < 0 {
id := h.hostProtocolID(receiver)
h.muExpected.Lock()
h.expectedPID[id] = make(chan network.Stream)
h.muExpected.Unlock()

go func() {
<-ctx.Done()
h.muExpected.Lock()
delete(h.expectedPID, id)
h.muExpected.Unlock()
}()
c.muChannels.Lock()
dc, ok := c.channels[remotepeer]
if !ok {
dc = &directChannel{
logger: c.logger,
host: c.host,
peer: remotepeer,
}
c.channels[remotepeer] = dc
}
c.muChannels.Unlock()

return ch, nil
go dc.handleNewPeer(s)
}

func (h *channelHolder) hostProtocolID(receiver p2pcore.PeerID) protocol.ID {
return protocol.ID(fmt.Sprintf("%s/%s", PROTOCOL, receiver.String()))
}

func (h *channelHolder) checkExpectedStream(s string) bool {
h.muExpected.Lock()
_, ok := h.expectedPID[protocol.ID(s)]
h.muExpected.Unlock()
func (c *holderChannels) NewChannel(ctx context.Context, receiver peer.ID, opts *iface.DirectChannelOptions) (iface.DirectChannel, error) {
if opts == nil {
opts = &iface.DirectChannelOptions{}
}

return ok
}
if opts.Logger == nil {
opts.Logger = c.logger
}

func (h *channelHolder) incomingConnHandler(stream network.Stream) {
h.muExpected.Lock()
ch, ok := h.expectedPID[stream.Protocol()]
h.muExpected.Unlock()
c.muChannels.Lock()
defer c.muChannels.Unlock()

if !ok {
return
if channel, ok := c.channels[receiver]; ok {
return channel, nil
}

ch <- stream
dc := &directChannel{
logger: c.logger,
host: c.host,
peer: receiver,
}
c.channels[receiver] = dc
return dc, nil
}

func InitDirectChannelFactory(host host.Host) iface.DirectChannelFactory {
holder := &channelHolder{
expectedPID: map[protocol.ID]chan network.Stream{},
host: host,
func InitDirectChannelFactory(logger *zap.Logger, host host.Host) iface.DirectChannelFactory {
holder := &holderChannels{
logger: logger,
channels: make(map[peer.ID]*directChannel),
host: host,
}

host.SetStreamHandlerMatch(PROTOCOL, holder.checkExpectedStream, holder.incomingConnHandler)
host.SetStreamHandler(PROTOCOL, holder.incomingConnHandler)

return holder.NewChannel
}
3 changes: 2 additions & 1 deletion pubsub/directchannel/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)

func TestInitDirectChannelFactory(t *testing.T) {
Expand All @@ -32,7 +33,7 @@ func TestInitDirectChannelFactory(t *testing.T) {
hosts[i], err = mn.GenPeer()
require.NoError(t, err)

directChannelsFactories[i] = InitDirectChannelFactory(hosts[i])
directChannelsFactories[i] = InitDirectChannelFactory(zap.NewNop(), hosts[i])
}

err = mn.LinkAll()
Expand Down
2 changes: 1 addition & 1 deletion tests/replicate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func testDirectChannelNodeGenerator(t *testing.T, mn mocknet.Mocknet, i int) (or

orbitdb1, err := orbitdb.NewOrbitDB(ctx, ipfs1, &orbitdb.NewOrbitDBOptions{
Directory: &dbPath1,
DirectChannelFactory: directchannel.InitDirectChannelFactory(node1.PeerHost),
DirectChannelFactory: directchannel.InitDirectChannelFactory(zap.NewNop(), node1.PeerHost),
})
require.NoError(t, err)

Expand Down

0 comments on commit 028fc94

Please sign in to comment.