Skip to content

Commit

Permalink
Merge pull request #2440 from iotaledger/discardable-pipes
Browse files Browse the repository at this point in the history
Discard a pipe on consensus completion.
  • Loading branch information
muXxer authored May 8, 2023
2 parents 734c9f0 + fef2310 commit 6a6675c
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 10 deletions.
6 changes: 4 additions & 2 deletions packages/chain/cons/cons_gr/gr.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,13 +168,12 @@ func New(
constInstRaw := cons.New(chainID, chainStore, me, myNodeIdentity.GetPrivateKey(), dkShare, procCache, netPeeringID[:], gpa.NodeIDFromPublicKey, log).AsGPA()
cgr.consInst = gpa.NewAckHandler(me, constInstRaw, redeliveryPeriod)

netRecvPipeInCh := cgr.netRecvPipe.In()
unhook := net.Attach(&netPeeringID, peering.PeerMessageReceiverChainCons, func(recv *peering.PeerMessageIn) {
if recv.MsgType != msgTypeCons {
cgr.log.Warnf("Unexpected message, type=%v", recv.MsgType)
return
}
netRecvPipeInCh <- recv
cgr.netRecvPipe.TryAdd(recv)
})
cgr.netDisconnect = unhook

Expand Down Expand Up @@ -202,6 +201,9 @@ func (cgr *ConsGr) Time(t time.Time) {

func (cgr *ConsGr) run() { //nolint:gocyclo,funlen
defer util.ExecuteIfNotNil(cgr.netDisconnect)
defer func() {
cgr.netRecvPipe.Discard()
}()

ctxClose := cgr.ctx.Done()
netRecvPipeOutCh := cgr.netRecvPipe.Out()
Expand Down
2 changes: 2 additions & 0 deletions packages/util/pipe/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,6 @@ type Pipe[E any] interface {
Out() <-chan E
Len() int
Close()
Discard()
TryAdd(e E) bool
}
47 changes: 39 additions & 8 deletions packages/util/pipe/pipe.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
package pipe

import "sync"

// InfinitePipe provides deserialised sender and receiver: it queues messages
// sent by the sender and returns them to the receiver whenever it is ready,
// without blocking the sender process. Depending on the backing queue, the pipe
// might have other characteristics.
type InfinitePipe[E any] struct {
input chan E
output chan E
length chan int
buffer Queue[E]
input chan E
output chan E
length chan int
buffer Queue[E]
discardCh chan struct{}
closeLock *sync.RWMutex
closed bool
}

var _ Pipe[Hashable] = &InfinitePipe[Hashable]{}
Expand Down Expand Up @@ -47,10 +52,13 @@ func NewLimitPriorityHashInfinitePipe[E Hashable](priorityFun func(E) bool, limi

func newInfinitePipe[E any](queue Queue[E]) *InfinitePipe[E] {
ch := &InfinitePipe[E]{
input: make(chan E),
output: make(chan E),
length: make(chan int),
buffer: queue,
input: make(chan E),
output: make(chan E),
length: make(chan int),
buffer: queue,
discardCh: make(chan struct{}),
closeLock: &sync.RWMutex{},
closed: false,
}
go ch.infiniteBuffer()
return ch
Expand All @@ -69,7 +77,25 @@ func (ch *InfinitePipe[E]) Len() int {
}

func (ch *InfinitePipe[E]) Close() {
ch.closeLock.Lock()
defer ch.closeLock.Unlock()
close(ch.input)
ch.closed = true
}

func (ch *InfinitePipe[E]) Discard() {
ch.Close()
close(ch.discardCh)
}

func (ch *InfinitePipe[E]) TryAdd(e E) bool {
ch.closeLock.RLock()
defer ch.closeLock.RUnlock()
if ch.closed {
return false
}
ch.In() <- e
return true
}

func (ch *InfinitePipe[E]) infiniteBuffer() {
Expand All @@ -89,6 +115,11 @@ func (ch *InfinitePipe[E]) infiniteBuffer() {
case output <- next:
ch.buffer.Remove()
case ch.length <- ch.buffer.Length():
case <-ch.discardCh:
// Close the pipe without waiting for the values to be consumed.
close(ch.output)
close(ch.length)
return
}

if ch.buffer.Length() > 0 {
Expand Down

0 comments on commit 6a6675c

Please sign in to comment.