Skip to content

Commit

Permalink
session: hold RLock for a shorter duration
Browse files Browse the repository at this point in the history
Refer to #170
  • Loading branch information
gnarula committed Jan 5, 2021
1 parent e1cc707 commit ad6fea6
Showing 1 changed file with 46 additions and 8 deletions.
54 changes: 46 additions & 8 deletions mino/minogrpc/session/mod.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"io/ioutil"
"os"
"sync"
"time"

"github.com/rs/zerolog"
"go.dedis.ch/dela"
Expand Down Expand Up @@ -98,6 +99,22 @@ type parent struct {
table router.RoutingTable
}

type copyContext struct {
context.Context
}

func (c copyContext) Deadline() (time.Time, bool) {
return time.Time{}, false
}

func (c copyContext) Done() <-chan struct{} {
return nil
}

func (c copyContext) Err() error {
return nil
}

// session is a participant to a stream protocol which has a parent gateway that
// determines when to close, and it can open further relays to distant peers if
// the routing table requires it.
Expand Down Expand Up @@ -230,14 +247,20 @@ func (s *session) RecvPacket(from mino.Address, p *ptypes.Packet) (*ptypes.Ack,
}

s.parentsLock.RLock()
defer s.parentsLock.RUnlock()
parents := make([]*parent, len(s.parents))
i := 0
for _, parent := range s.parents {
parents[i] = &parent
i++
}
s.parentsLock.RUnlock()

// Try to send the packet to each parent until one works.
for _, parent := range s.parents {
for _, parent := range parents {
s.traffic.LogRecv(parent.relay.Stream().Context(), from, pkt)

errs := make(chan error, len(pkt.GetDestination()))
sent := s.sendPacket(parent, pkt, errs)
sent := s.sendPacket(*parent, pkt, errs)
close(errs)

if sent {
Expand Down Expand Up @@ -276,12 +299,18 @@ func (s *session) Send(msg serde.Message, addrs ...mino.Address) <-chan error {
}

s.parentsLock.RLock()
defer s.parentsLock.RUnlock()

parents := make([]*parent, len(s.parents))
i := 0
for _, parent := range s.parents {
parents[i] = &parent
i++
}
s.parentsLock.RUnlock()

for _, parent := range parents {
packet := parent.table.Make(s.me, addrs, data)

sent := s.sendPacket(parent, packet, errs)
sent := s.sendPacket(*parent, packet, errs)
if sent {
return
}
Expand Down Expand Up @@ -372,7 +401,11 @@ func (s *session) sendTo(p parent, to mino.Address, pkt router.Packet, errs chan
}
}

ctx := p.relay.Stream().Context()
// Refer #170. Removing this because there's a chance that we're using
// a parent that is not valid anymore. We do however need to copy all the
// values stored in the context
// ctx := p.relay.Stream().Context()
ctx := copyContext{p.relay.Stream().Context()}

s.traffic.LogSend(ctx, relay.GetDistantAddress(), pkt)

Expand Down Expand Up @@ -428,7 +461,12 @@ func (s *session) setupRelay(p parent, addr mino.Address) (Relay, error) {
md := s.md.Copy()
md.Set(HandshakeKey, string(hs))

ctx := metadata.NewOutgoingContext(p.relay.Stream().Context(), md)
// Refer #170. Removing this because there's a chance that we're using
// a parent that is not valid anymore. We do however need to copy all the
// values stored in the context
// ctx := metadata.NewOutgoingContext(p.relay.Stream().Context(), md)
ctxCopy := copyContext{p.relay.Stream().Context()}
ctx := metadata.NewOutgoingContext(ctxCopy, md)

cl := ptypes.NewOverlayClient(conn)

Expand Down

1 comment on commit ad6fea6

@nkcr
Copy link
Contributor

@nkcr nkcr commented on ad6fea6 Jan 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • I think we can only use context.Background instead of copying it.
  • to copy a map you can use append and initialize the slice with the right size:
		parents := make([]parent, 0, len(s.parents))
		for _, p := range s.parents {
			parents = append(parents, p)
		}

Now that we're not giving the parent's context anymore we should ask ourselves: how do we notify nodes that the connections should be closed ?

Please sign in to comment.