From 605ba38d89381ac14c6f0c242efea2ee83b8f583 Mon Sep 17 00:00:00 2001 From: Eugene Siegel Date: Mon, 14 Oct 2024 19:50:05 -0400 Subject: [PATCH 1/3] graph: change ValidationBarrier usage in the builder code This omits calls to InitJobDependencies, SignalDependants, and WaitForDependants. These changes have been made here because the router / builder code does not actually need job dependency management. Calls to the builder code (i.e. AddNode, AddEdge, UpdateEdge) are all blocking in the gossiper. This, combined with the fact that child jobs are run after parent jobs in the gossiper, means that the calls to the router will happen in the proper dependency order. --- graph/builder.go | 40 ++++------------------------- graph/validation_barrier.go | 51 ------------------------------------- 2 files changed, 5 insertions(+), 86 deletions(-) diff --git a/graph/builder.go b/graph/builder.go index d6984af709..a2908a0f1b 100644 --- a/graph/builder.go +++ b/graph/builder.go @@ -675,45 +675,20 @@ func (b *Builder) handleNetworkUpdate(vb *ValidationBarrier, defer b.wg.Done() defer vb.CompleteJob() - // If this message has an existing dependency, then we'll wait until - // that has been fully validated before we proceed. - err := vb.WaitForDependants(update.msg) - if err != nil { - switch { - case IsError(err, ErrVBarrierShuttingDown): - update.err <- err - - case IsError(err, ErrParentValidationFailed): - update.err <- NewErrf(ErrIgnored, err.Error()) //nolint - - default: - log.Warnf("unexpected error during validation "+ - "barrier shutdown: %v", err) - update.err <- err - } - - return - } - // Process the routing update to determine if this is either a new // update from our PoV or an update to a prior vertex/edge we // previously accepted. - err = b.processUpdate(update.msg, update.op...) + err := b.processUpdate(update.msg, update.op...) update.err <- err - // If this message had any dependencies, then we can now signal them to - // continue. - allowDependents := err == nil || IsError(err, ErrIgnored, ErrOutdated) - vb.SignalDependants(update.msg, allowDependents) + notError := err == nil || IsError(err, ErrIgnored, ErrOutdated) // If the error is not nil here, there's no need to send topology // change. if err != nil { - // We now decide to log an error or not. If allowDependents is - // false, it means there is an error and the error is neither - // ErrIgnored or ErrOutdated. In this case, we'll log an error. - // Otherwise, we'll add debug log only. - if allowDependents { + // Log as a debug message if this is not an error we need to be + // concerned about. + if notError { log.Debugf("process network updates got: %v", err) } else { log.Errorf("process network updates got: %v", err) @@ -789,11 +764,6 @@ func (b *Builder) networkHandler() { // result we'll modify the channel graph accordingly depending // on the exact type of the message. case update := <-b.networkUpdates: - // We'll set up any dependants, and wait until a free - // slot for this job opens up, this allows us to not - // have thousands of goroutines active. - validationBarrier.InitJobDependencies(update.msg) - b.wg.Add(1) go b.handleNetworkUpdate(validationBarrier, update) diff --git a/graph/validation_barrier.go b/graph/validation_barrier.go index a97709605e..a5a8da8bb6 100644 --- a/graph/validation_barrier.go +++ b/graph/validation_barrier.go @@ -4,7 +4,6 @@ import ( "fmt" "sync" - "github.com/lightningnetwork/lnd/graph/db/models" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/routing/route" ) @@ -125,33 +124,14 @@ func (v *ValidationBarrier) InitJobDependencies(job interface{}) { v.nodeAnnDependencies[route.Vertex(msg.NodeID1)] = signals v.nodeAnnDependencies[route.Vertex(msg.NodeID2)] = signals } - case *models.ChannelEdgeInfo: - - shortID := lnwire.NewShortChanIDFromInt(msg.ChannelID) - if _, ok := v.chanAnnFinSignal[shortID]; !ok { - signals := &validationSignals{ - allow: make(chan struct{}), - deny: make(chan struct{}), - } - - v.chanAnnFinSignal[shortID] = signals - v.chanEdgeDependencies[shortID] = signals - - v.nodeAnnDependencies[route.Vertex(msg.NodeKey1Bytes)] = signals - v.nodeAnnDependencies[route.Vertex(msg.NodeKey2Bytes)] = signals - } // These other types don't have any dependants, so no further // initialization needs to be done beyond just occupying a job slot. - case *models.ChannelEdgePolicy: - return case *lnwire.ChannelUpdate1: return case *lnwire.NodeAnnouncement: // TODO(roasbeef): node ann needs to wait on existing channel updates return - case *models.LightningNode: - return case *lnwire.AnnounceSignatures1: // TODO(roasbeef): need to wait on chan ann? return @@ -187,20 +167,6 @@ func (v *ValidationBarrier) WaitForDependants(job interface{}) error { switch msg := job.(type) { // Any ChannelUpdate or NodeAnnouncement jobs will need to wait on the // completion of any active ChannelAnnouncement jobs related to them. - case *models.ChannelEdgePolicy: - shortID := lnwire.NewShortChanIDFromInt(msg.ChannelID) - signals, ok = v.chanEdgeDependencies[shortID] - - jobDesc = fmt.Sprintf("job=lnwire.ChannelEdgePolicy, scid=%v", - msg.ChannelID) - - case *models.LightningNode: - vertex := route.Vertex(msg.PubKeyBytes) - signals, ok = v.nodeAnnDependencies[vertex] - - jobDesc = fmt.Sprintf("job=channeldb.LightningNode, pub=%s", - vertex) - case *lnwire.ChannelUpdate1: signals, ok = v.chanEdgeDependencies[msg.ShortChannelID] @@ -217,7 +183,6 @@ func (v *ValidationBarrier) WaitForDependants(job interface{}) error { // return directly. case *lnwire.AnnounceSignatures1: // TODO(roasbeef): need to wait on chan ann? - case *models.ChannelEdgeInfo: case *lnwire.ChannelAnnouncement1: } @@ -263,17 +228,6 @@ func (v *ValidationBarrier) SignalDependants(job interface{}, allow bool) { // If we've just finished executing a ChannelAnnouncement, then we'll // close out the signal, and remove the signal from the map of active // ones. This will allow/deny any dependent jobs to continue execution. - case *models.ChannelEdgeInfo: - shortID := lnwire.NewShortChanIDFromInt(msg.ChannelID) - finSignals, ok := v.chanAnnFinSignal[shortID] - if ok { - if allow { - close(finSignals.allow) - } else { - close(finSignals.deny) - } - delete(v.chanAnnFinSignal, shortID) - } case *lnwire.ChannelAnnouncement1: finSignals, ok := v.chanAnnFinSignal[msg.ShortChannelID] if ok { @@ -290,15 +244,10 @@ func (v *ValidationBarrier) SignalDependants(job interface{}, allow bool) { // For all other job types, we'll delete the tracking entries from the // map, as if we reach this point, then all dependants have already // finished executing and we can proceed. - case *models.LightningNode: - delete(v.nodeAnnDependencies, route.Vertex(msg.PubKeyBytes)) case *lnwire.NodeAnnouncement: delete(v.nodeAnnDependencies, route.Vertex(msg.NodeID)) case *lnwire.ChannelUpdate1: delete(v.chanEdgeDependencies, msg.ShortChannelID) - case *models.ChannelEdgePolicy: - shortID := lnwire.NewShortChanIDFromInt(msg.ChannelID) - delete(v.chanEdgeDependencies, shortID) case *lnwire.AnnounceSignatures1: return From b6759aa65286879bd9c26286d2f491e01fd67f02 Mon Sep 17 00:00:00 2001 From: Eugene Siegel Date: Fri, 1 Nov 2024 15:13:24 -0400 Subject: [PATCH 2/3] discovery+graph: track job set dependencies in ValidationBarrier This commit does two things: - removes the concept of allow / deny. Having this in place was a minor optimization and removing it makes the solution simpler. - changes the job dependency tracking to track sets of abstact parent jobs rather than individual parent jobs. As a note, the purpose of the ValidationBarrier is that it allows us to launch gossip validation jobs in goroutines while still ensuring that the validation order of these goroutines is adhered to when it comes to validating ChannelAnnouncement _before_ ChannelUpdate and _before_ NodeAnnouncement. --- discovery/gossiper.go | 36 ++- go.mod | 2 +- go.sum | 4 +- graph/validation_barrier.go | 441 ++++++++++++++++++++++--------- graph/validation_barrier_test.go | 158 +++++++++-- 5 files changed, 493 insertions(+), 148 deletions(-) diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 9c51734396..868e8fee0a 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -512,6 +512,9 @@ type AuthenticatedGossiper struct { // AuthenticatedGossiper lock. chanUpdateRateLimiter map[uint64][2]*rate.Limiter + // vb is used to enforce job dependency ordering of gossip messages. + vb *graph.ValidationBarrier + sync.Mutex } @@ -537,6 +540,8 @@ func New(cfg Config, selfKeyDesc *keychain.KeyDescriptor) *AuthenticatedGossiper banman: newBanman(), } + gossiper.vb = graph.NewValidationBarrier(1000, gossiper.quit) + gossiper.syncMgr = newSyncManager(&SyncManagerCfg{ ChainHash: cfg.ChainHash, ChanSeries: cfg.ChanSeries, @@ -1398,10 +1403,6 @@ func (d *AuthenticatedGossiper) networkHandler() { log.Errorf("Unable to rebroadcast stale announcements: %v", err) } - // We'll use this validation to ensure that we process jobs in their - // dependency order during parallel validation. - validationBarrier := graph.NewValidationBarrier(1000, d.quit) - for { select { // A new policy update has arrived. We'll commit it to the @@ -1470,11 +1471,17 @@ func (d *AuthenticatedGossiper) networkHandler() { // We'll set up any dependent, and wait until a free // slot for this job opens up, this allow us to not // have thousands of goroutines active. - validationBarrier.InitJobDependencies(announcement.msg) + annJobID, err := d.vb.InitJobDependencies( + announcement.msg, + ) + if err != nil { + announcement.err <- err + continue + } d.wg.Add(1) go d.handleNetworkMessages( - announcement, &announcements, validationBarrier, + announcement, &announcements, annJobID, ) // The trickle timer has ticked, which indicates we should @@ -1525,10 +1532,10 @@ func (d *AuthenticatedGossiper) networkHandler() { // // NOTE: must be run as a goroutine. func (d *AuthenticatedGossiper) handleNetworkMessages(nMsg *networkMsg, - deDuped *deDupedAnnouncements, vb *graph.ValidationBarrier) { + deDuped *deDupedAnnouncements, jobID graph.JobID) { defer d.wg.Done() - defer vb.CompleteJob() + defer d.vb.CompleteJob() // We should only broadcast this message forward if it originated from // us or it wasn't received as part of our initial historical sync. @@ -1536,7 +1543,7 @@ func (d *AuthenticatedGossiper) handleNetworkMessages(nMsg *networkMsg, // If this message has an existing dependency, then we'll wait until // that has been fully validated before we proceed. - err := vb.WaitForDependants(nMsg.msg) + err := d.vb.WaitForParents(jobID, nMsg.msg) if err != nil { log.Debugf("Validating network message %s got err: %v", nMsg.msg.MsgType(), err) @@ -1566,7 +1573,16 @@ func (d *AuthenticatedGossiper) handleNetworkMessages(nMsg *networkMsg, // If this message had any dependencies, then we can now signal them to // continue. - vb.SignalDependants(nMsg.msg, allow) + err = d.vb.SignalDependents(nMsg.msg, jobID) + if err != nil { + // Something is wrong if SignalDependents returns an error. + log.Errorf("SignalDependents returned error for msg=%v with "+ + "JobID=%v", spew.Sdump(nMsg.msg), jobID) + + nMsg.err <- err + + return + } // If the announcement was accepted, then add the emitted announcements // to our announce batch to be broadcast once the trickle timer ticks diff --git a/go.mod b/go.mod index 7680509fd3..adf765cfbd 100644 --- a/go.mod +++ b/go.mod @@ -36,7 +36,7 @@ require ( github.com/lightningnetwork/lightning-onion v1.2.1-0.20240712235311-98bd56499dfb github.com/lightningnetwork/lnd/cert v1.2.2 github.com/lightningnetwork/lnd/clock v1.1.1 - github.com/lightningnetwork/lnd/fn/v2 v2.0.2 + github.com/lightningnetwork/lnd/fn/v2 v2.0.7 github.com/lightningnetwork/lnd/healthcheck v1.2.6 github.com/lightningnetwork/lnd/kvdb v1.4.12 github.com/lightningnetwork/lnd/queue v1.1.1 diff --git a/go.sum b/go.sum index 5ed9cd2046..74a6fcafb7 100644 --- a/go.sum +++ b/go.sum @@ -456,8 +456,8 @@ github.com/lightningnetwork/lnd/cert v1.2.2 h1:71YK6hogeJtxSxw2teq3eGeuy4rHGKcFf github.com/lightningnetwork/lnd/cert v1.2.2/go.mod h1:jQmFn/Ez4zhDgq2hnYSw8r35bqGVxViXhX6Cd7HXM6U= github.com/lightningnetwork/lnd/clock v1.1.1 h1:OfR3/zcJd2RhH0RU+zX/77c0ZiOnIMsDIBjgjWdZgA0= github.com/lightningnetwork/lnd/clock v1.1.1/go.mod h1:mGnAhPyjYZQJmebS7aevElXKTFDuO+uNFFfMXK1W8xQ= -github.com/lightningnetwork/lnd/fn/v2 v2.0.2 h1:M7o2lYrh/zCp+lntPB3WP/rWTu5U+4ssyHW+kqNJ0fs= -github.com/lightningnetwork/lnd/fn/v2 v2.0.2/go.mod h1:TOzwrhjB/Azw1V7aa8t21ufcQmdsQOQMDtxVOQWNl8s= +github.com/lightningnetwork/lnd/fn/v2 v2.0.7 h1:2LkgcGk20vXcUJyrlYLWMptnEouOBnCixskMsQW+GxU= +github.com/lightningnetwork/lnd/fn/v2 v2.0.7/go.mod h1:TOzwrhjB/Azw1V7aa8t21ufcQmdsQOQMDtxVOQWNl8s= github.com/lightningnetwork/lnd/healthcheck v1.2.6 h1:1sWhqr93GdkWy4+6U7JxBfcyZIE78MhIHTJZfPx7qqI= github.com/lightningnetwork/lnd/healthcheck v1.2.6/go.mod h1:Mu02um4CWY/zdTOvFje7WJgJcHyX2zq/FG3MhOAiGaQ= github.com/lightningnetwork/lnd/kvdb v1.4.12 h1:Y0WY5Tbjyjn6eCYh068qkWur5oFtioJlfxc8w5SlJeQ= diff --git a/graph/validation_barrier.go b/graph/validation_barrier.go index a5a8da8bb6..9d20bcb578 100644 --- a/graph/validation_barrier.go +++ b/graph/validation_barrier.go @@ -3,29 +3,34 @@ package graph import ( "fmt" "sync" + "sync/atomic" + "github.com/go-errors/errors" + "github.com/lightningnetwork/lnd/fn/v2" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/routing/route" ) -// validationSignals contains two signals which allows the ValidationBarrier to -// communicate back to the caller whether a dependent should be processed or not -// based on whether its parent was successfully validated. Only one of these -// signals is to be used at a time. -type validationSignals struct { - // allow is the signal used to allow a dependent to be processed. - allow chan struct{} +// JobID identifies an active job in the validation barrier. It is large so +// that we don't need to worry about overflows. +type JobID uint64 - // deny is the signal used to prevent a dependent from being processed. - deny chan struct{} +// jobInfo stores job dependency info for a set of dependent gossip messages. +type jobInfo struct { + // activeParentJobIDs is the set of active parent job ids. + activeParentJobIDs fn.Set[JobID] + + // activeDependentJobs is the set of active dependent job ids. + activeDependentJobs fn.Set[JobID] } -// ValidationBarrier is a barrier used to ensure proper validation order while -// concurrently validating new announcements for channel edges, and the -// attributes of channel edges. It uses this set of maps (protected by this -// mutex) to track validation dependencies. For a given channel our -// dependencies look like this: chanAnn <- chanUp <- nodeAnn. That is we must -// validate the item on the left of the arrow before that on the right. +// ValidationBarrier is a barrier used to enforce a strict validation order +// while concurrently validating other updates for channel edges. It uses a set +// of maps to track validation dependencies. This is needed in practice because +// gossip messages for a given channel may arive in order, but then due to +// scheduling in different goroutines, may be validated in the wrong order. +// With the ValidationBarrier, the dependent update will wait until the parent +// update completes. type ValidationBarrier struct { // validationSemaphore is a channel of structs which is used as a // semaphore. Initially we'll fill this with a buffered channel of the @@ -33,23 +38,27 @@ type ValidationBarrier struct { // from this channel, then restore the value upon completion. validationSemaphore chan struct{} - // chanAnnFinSignal is map that keep track of all the pending - // ChannelAnnouncement like validation job going on. Once the job has - // been completed, the channel will be closed unblocking any - // dependants. - chanAnnFinSignal map[lnwire.ShortChannelID]*validationSignals + // jobInfoMap stores the set of job ids for each channel. + // NOTE: This MUST be used with the mutex. + // NOTE: This currently stores string representations of + // lnwire.ShortChannelID and route.Vertex. Since these are of different + // lengths, collision cannot occur in their string representations. + // N.B.: Check that any new string-converted types don't collide with + // existing string-converted types. + jobInfoMap map[string]*jobInfo + + // jobDependencies is a mapping from a child's JobID to the set of + // parent JobID that it depends on. + // NOTE: This MUST be used with the mutex. + jobDependencies map[JobID]fn.Set[JobID] - // chanEdgeDependencies tracks any channel edge updates which should - // wait until the completion of the ChannelAnnouncement before - // proceeding. This is a dependency, as we can't validate the update - // before we validate the announcement which creates the channel - // itself. - chanEdgeDependencies map[lnwire.ShortChannelID]*validationSignals + // childJobChans stores the notification channel that each child job + // listens on for parent job completions. + // NOTE: This MUST be used with the mutex. + childJobChans map[JobID]chan struct{} - // nodeAnnDependencies tracks any pending NodeAnnouncement validation - // jobs which should wait until the completion of the - // ChannelAnnouncement before proceeding. - nodeAnnDependencies map[route.Vertex]*validationSignals + // idCtr is an atomic integer that is used to assign JobIDs. + idCtr atomic.Uint64 quit chan struct{} sync.Mutex @@ -62,10 +71,10 @@ func NewValidationBarrier(numActiveReqs int, quitChan chan struct{}) *ValidationBarrier { v := &ValidationBarrier{ - chanAnnFinSignal: make(map[lnwire.ShortChannelID]*validationSignals), - chanEdgeDependencies: make(map[lnwire.ShortChannelID]*validationSignals), - nodeAnnDependencies: make(map[route.Vertex]*validationSignals), - quit: quitChan, + jobInfoMap: make(map[string]*jobInfo), + jobDependencies: make(map[JobID]fn.Set[JobID]), + childJobChans: make(map[JobID]chan struct{}), + quit: quitChan, } // We'll first initialize a set of semaphores to limit our concurrency @@ -80,7 +89,9 @@ func NewValidationBarrier(numActiveReqs int, // InitJobDependencies will wait for a new job slot to become open, and then // sets up any dependent signals/trigger for the new job -func (v *ValidationBarrier) InitJobDependencies(job interface{}) { +func (v *ValidationBarrier) InitJobDependencies(job interface{}) (JobID, + error) { + // We'll wait for either a new slot to become open, or for the quit // channel to be closed. select { @@ -91,50 +102,104 @@ func (v *ValidationBarrier) InitJobDependencies(job interface{}) { v.Lock() defer v.Unlock() + // updateOrCreateJobInfo modifies the set of activeParentJobs for this + // annID and updates jobInfoMap. + updateOrCreateJobInfo := func(annID string, annJobID JobID) { + info, ok := v.jobInfoMap[annID] + if ok { + // If an entry already exists for annID, then a job + // related to it is being validated. Add to the set of + // parent job ids. This addition will only affect + // _later_, _child_ jobs for the annID. + info.activeParentJobIDs.Add(annJobID) + return + } + + // No entry exists for annID, meaning that we should create + // one. + parentJobSet := fn.NewSet(annJobID) + + info = &jobInfo{ + activeParentJobIDs: parentJobSet, + activeDependentJobs: fn.NewSet[JobID](), + } + v.jobInfoMap[annID] = info + } + + // populateDependencies populates the job dependency mappings (i.e. + // which should complete after another) for the (annID, childJobID) + // tuple. + populateDependencies := func(annID string, childJobID JobID) { + // If there is no entry in the jobInfoMap, we don't have to + // wait on any parent jobs to finish. + info, ok := v.jobInfoMap[annID] + if !ok { + return + } + + // We want to see a snapshot of active parent jobs for this + // annID that are already registered in activeParentJobIDs. The + // child job identified by childJobID can only run after these + // parent jobs have run. After grabbing the snapshot, we then + // want to persist a slice of these jobs. + + // Create the notification chan that parent jobs will send (or + // close) on when they complete. + jobChan := make(chan struct{}) + + // Add to set of activeDependentJobs for this annID. + info.activeDependentJobs.Add(childJobID) + + // Store in childJobChans. The parent jobs will fetch this chan + // to notify on. The child job will later fetch this chan to + // listen on when WaitForParents is called. + v.childJobChans[childJobID] = jobChan + + // Copy over the parent job IDs at this moment for this annID. + // This job must be processed AFTER those parent IDs. + parentJobs := info.activeParentJobIDs.Copy() + + // Populate the jobDependencies mapping. + v.jobDependencies[childJobID] = parentJobs + } + // Once a slot is open, we'll examine the message of the job, to see if // there need to be any dependent barriers set up. switch msg := job.(type) { - - // If this is a channel announcement, then we'll need to set up den - // tenancies, as we'll need to verify this before we verify any - // ChannelUpdates for the same channel, or NodeAnnouncements of nodes - // that are involved in this channel. This goes for both the wire - // type,s and also the types that we use within the database. case *lnwire.ChannelAnnouncement1: + id := JobID(v.idCtr.Add(1)) - // We ensure that we only create a new announcement signal iff, - // one doesn't already exist, as there may be duplicate - // announcements. We'll close this signal once the - // ChannelAnnouncement has been validated. This will result in - // all the dependent jobs being unlocked so they can finish - // execution themselves. - if _, ok := v.chanAnnFinSignal[msg.ShortChannelID]; !ok { - // We'll create the channel that we close after we - // validate this announcement. All dependants will - // point to this same channel, so they'll be unblocked - // at the same time. - signals := &validationSignals{ - allow: make(chan struct{}), - deny: make(chan struct{}), - } - - v.chanAnnFinSignal[msg.ShortChannelID] = signals - v.chanEdgeDependencies[msg.ShortChannelID] = signals + updateOrCreateJobInfo(msg.ShortChannelID.String(), id) + updateOrCreateJobInfo(route.Vertex(msg.NodeID1).String(), id) + updateOrCreateJobInfo(route.Vertex(msg.NodeID2).String(), id) - v.nodeAnnDependencies[route.Vertex(msg.NodeID1)] = signals - v.nodeAnnDependencies[route.Vertex(msg.NodeID2)] = signals - } + return id, nil - // These other types don't have any dependants, so no further - // initialization needs to be done beyond just occupying a job slot. + // Populate the dependency mappings for the below child jobs. case *lnwire.ChannelUpdate1: - return + childJobID := JobID(v.idCtr.Add(1)) + populateDependencies(msg.ShortChannelID.String(), childJobID) + + return childJobID, nil case *lnwire.NodeAnnouncement: - // TODO(roasbeef): node ann needs to wait on existing channel updates - return + childJobID := JobID(v.idCtr.Add(1)) + populateDependencies( + route.Vertex(msg.NodeID).String(), childJobID, + ) + + return childJobID, nil case *lnwire.AnnounceSignatures1: // TODO(roasbeef): need to wait on chan ann? - return + // - We can do the above by calling populateDependencies. For + // now, while we evaluate potential side effects, don't do + // anything with childJobID and just return it. + childJobID := JobID(v.idCtr.Add(1)) + return childJobID, nil + + default: + // An invalid message was passed into InitJobDependencies. + // Return an error. + return JobID(0), errors.New("invalid message") } } @@ -149,16 +214,21 @@ func (v *ValidationBarrier) CompleteJob() { } } -// WaitForDependants will block until any jobs that this job dependants on have -// finished executing. This allows us a graceful way to schedule goroutines -// based on any pending uncompleted dependent jobs. If this job doesn't have an -// active dependent, then this function will return immediately. -func (v *ValidationBarrier) WaitForDependants(job interface{}) error { +// WaitForParents will block until all parent job dependencies have went +// through the validation pipeline. This allows us a graceful way to run jobs +// in goroutines and still have strict ordering guarantees. If this job doesn't +// have any parent job dependencies, then this function will return +// immediately. +func (v *ValidationBarrier) WaitForParents(childJobID JobID, + job interface{}) error { var ( - signals *validationSignals ok bool jobDesc string + + parentJobIDs fn.Set[JobID] + annID string + jobChan chan struct{} ) // Acquire a lock to read ValidationBarrier. @@ -168,88 +238,221 @@ func (v *ValidationBarrier) WaitForDependants(job interface{}) error { // Any ChannelUpdate or NodeAnnouncement jobs will need to wait on the // completion of any active ChannelAnnouncement jobs related to them. case *lnwire.ChannelUpdate1: - signals, ok = v.chanEdgeDependencies[msg.ShortChannelID] + annID = msg.ShortChannelID.String() + + parentJobIDs, ok = v.jobDependencies[childJobID] + if !ok { + // If ok is false, it means that this child job never + // had any parent jobs to wait on. + v.Unlock() + return nil + } jobDesc = fmt.Sprintf("job=lnwire.ChannelUpdate, scid=%v", msg.ShortChannelID.ToUint64()) case *lnwire.NodeAnnouncement: - vertex := route.Vertex(msg.NodeID) - signals, ok = v.nodeAnnDependencies[vertex] + annID = route.Vertex(msg.NodeID).String() + + parentJobIDs, ok = v.jobDependencies[childJobID] + if !ok { + // If ok is false, it means that this child job never + // had any parent jobs to wait on. + v.Unlock() + return nil + } + jobDesc = fmt.Sprintf("job=lnwire.NodeAnnouncement, pub=%s", - vertex) + route.Vertex(msg.NodeID)) // Other types of jobs can be executed immediately, so we'll just // return directly. case *lnwire.AnnounceSignatures1: // TODO(roasbeef): need to wait on chan ann? + v.Unlock() + return nil + case *lnwire.ChannelAnnouncement1: + v.Unlock() + return nil } // Release the lock once the above read is finished. v.Unlock() - // If it's not ok, it means either the job is not a dependent type, or - // it doesn't have a dependency signal. Either way, we can return - // early. + log.Debugf("Waiting for dependent on %s", jobDesc) + + v.Lock() + jobChan, ok = v.childJobChans[childJobID] if !ok { + v.Unlock() + + // The entry may not exist because this job does not depend on + // any parent jobs. return nil } + v.Unlock() - log.Debugf("Waiting for dependent on %s", jobDesc) - - // If we do have an active job, then we'll wait until either the signal - // is closed, or the set of jobs exits. - select { - case <-v.quit: - return NewErrf(ErrVBarrierShuttingDown, - "validation barrier shutting down") + for { + select { + case <-v.quit: + return NewErrf(ErrVBarrierShuttingDown, + "validation barrier shutting down") + + case <-jobChan: + // Every time this is sent on or if it's closed, a + // parent job has finished. The parent jobs have to + // also potentially close the channel because if all + // the parent jobs finish and call SignalDependents + // before the goroutine running WaitForParents has a + // chance to grab the notification chan from + // childJobChans, then the running goroutine will wait + // here for a notification forever. By having the last + // parent job close the notificiation chan, we avoid + // this issue. + + // Check and see if we have any parent jobs left. If we + // don't, we can finish up. + v.Lock() + info, found := v.jobInfoMap[annID] + if !found { + v.Unlock() + + // No parent job info found, proceed with + // validation. + return nil + } - case <-signals.deny: - log.Debugf("Signal deny for %s", jobDesc) - return NewErrf(ErrParentValidationFailed, - "parent validation failed") + x := parentJobIDs.Intersect(info.activeParentJobIDs) + v.Unlock() + if x.IsEmpty() { + // The parent jobs have all completed. We can + // proceed with validation. + return nil + } - case <-signals.allow: - log.Tracef("Signal allow for %s", jobDesc) - return nil + // If we've reached this point, we are still waiting on + // a parent job to complete. + } } } -// SignalDependants will allow/deny any jobs that are dependent on this job that -// they can continue execution. If the job doesn't have any dependants, then -// this function sill exit immediately. -func (v *ValidationBarrier) SignalDependants(job interface{}, allow bool) { +// SignalDependents signals to any child jobs that this parent job has +// finished. +func (v *ValidationBarrier) SignalDependents(job interface{}, id JobID) error { v.Lock() defer v.Unlock() - switch msg := job.(type) { + // removeJob either removes a child job or a parent job. If it is + // removing a child job, then it removes the child's JobID from the set + // of dependent jobs for the announcement ID. If this is removing a + // parent job, then it removes the parentJobID from the set of active + // parent jobs and notifies the child jobs that it has finished + // validating. + removeJob := func(annID string, id JobID, child bool) error { + if child { + // If we're removing a child job, check jobInfoMap and + // remove this job from activeDependentJobs. + info, ok := v.jobInfoMap[annID] + if ok { + info.activeDependentJobs.Remove(id) + } - // If we've just finished executing a ChannelAnnouncement, then we'll - // close out the signal, and remove the signal from the map of active - // ones. This will allow/deny any dependent jobs to continue execution. - case *lnwire.ChannelAnnouncement1: - finSignals, ok := v.chanAnnFinSignal[msg.ShortChannelID] - if ok { - if allow { - close(finSignals.allow) - } else { - close(finSignals.deny) + // Remove the notification chan from childJobChans. + delete(v.childJobChans, id) + + // Remove this job's dependency mapping. + delete(v.jobDependencies, id) + + return nil + } + + // Otherwise, we are removing a parent job. + jobInfo, found := v.jobInfoMap[annID] + if !found { + // NOTE: Some sort of consistency guarantee has been + // broken. + return fmt.Errorf("no job info found for "+ + "identifier(%v)", id) + } + + jobInfo.activeParentJobIDs.Remove(id) + + lastJob := jobInfo.activeParentJobIDs.IsEmpty() + + // Notify all dependent jobs that a parent job has completed. + for child := range jobInfo.activeDependentJobs { + notifyChan, ok := v.childJobChans[child] + if !ok { + // NOTE: Some sort of consistency guarantee has + // been broken. + return fmt.Errorf("no job info found for "+ + "identifier(%v)", id) + } + + // We don't want to block when sending out the signal. + select { + case notifyChan <- struct{}{}: + default: + } + + // If this is the last parent job for this annID, also + // close the channel. This is needed because it's + // possible that the parent job cleans up the job + // mappings before the goroutine handling the child job + // has a chance to call WaitForParents and catch the + // signal sent above. We are allowed to close because + // no other parent job will be able to send along the + // channel (or close) as we're removing the entry from + // the jobInfoMap below. + if lastJob { + close(notifyChan) } - delete(v.chanAnnFinSignal, msg.ShortChannelID) } - delete(v.chanEdgeDependencies, msg.ShortChannelID) + // Remove from jobInfoMap if last job. + if lastJob { + delete(v.jobInfoMap, annID) + } + + return nil + } + + switch msg := job.(type) { + case *lnwire.ChannelAnnouncement1: + // Signal to the child jobs that parent validation has + // finished. We have to call removeJob for each annID + // that this ChannelAnnouncement can be associated with. + err := removeJob(msg.ShortChannelID.String(), id, false) + if err != nil { + return err + } + + err = removeJob(route.Vertex(msg.NodeID1).String(), id, false) + if err != nil { + return err + } + + err = removeJob(route.Vertex(msg.NodeID2).String(), id, false) + if err != nil { + return err + } + + return nil - // For all other job types, we'll delete the tracking entries from the - // map, as if we reach this point, then all dependants have already - // finished executing and we can proceed. case *lnwire.NodeAnnouncement: - delete(v.nodeAnnDependencies, route.Vertex(msg.NodeID)) + // Remove child job info. + return removeJob(route.Vertex(msg.NodeID).String(), id, true) + case *lnwire.ChannelUpdate1: - delete(v.chanEdgeDependencies, msg.ShortChannelID) + // Remove child job info. + return removeJob(msg.ShortChannelID.String(), id, true) case *lnwire.AnnounceSignatures1: - return + // No dependency mappings are stored for AnnounceSignatures1, + // so do nothing. + return nil } + + return errors.New("invalid message - no job dependencies") } diff --git a/graph/validation_barrier_test.go b/graph/validation_barrier_test.go index 38fc7a0870..e6224d7f0e 100644 --- a/graph/validation_barrier_test.go +++ b/graph/validation_barrier_test.go @@ -7,6 +7,7 @@ import ( "github.com/lightningnetwork/lnd/graph" "github.com/lightningnetwork/lnd/lnwire" + "github.com/stretchr/testify/require" ) // TestValidationBarrierSemaphore checks basic properties of the validation @@ -74,23 +75,31 @@ func TestValidationBarrierQuit(t *testing.T) { // Create a set of unique channel announcements that we will prep for // validation. anns := make([]*lnwire.ChannelAnnouncement1, 0, numTasks) + parentJobIDs := make([]graph.JobID, 0, numTasks) for i := 0; i < numTasks; i++ { anns = append(anns, &lnwire.ChannelAnnouncement1{ ShortChannelID: lnwire.NewShortChanIDFromInt(uint64(i)), NodeID1: nodeIDFromInt(uint64(2 * i)), NodeID2: nodeIDFromInt(uint64(2*i + 1)), }) - barrier.InitJobDependencies(anns[i]) + parentJobID, err := barrier.InitJobDependencies(anns[i]) + require.NoError(t, err) + + parentJobIDs = append(parentJobIDs, parentJobID) } // Create a set of channel updates, that must wait until their // associated channel announcement has been verified. chanUpds := make([]*lnwire.ChannelUpdate1, 0, numTasks) + childJobIDs := make([]graph.JobID, 0, numTasks) for i := 0; i < numTasks; i++ { chanUpds = append(chanUpds, &lnwire.ChannelUpdate1{ ShortChannelID: lnwire.NewShortChanIDFromInt(uint64(i)), }) - barrier.InitJobDependencies(chanUpds[i]) + childJob, err := barrier.InitJobDependencies(chanUpds[i]) + require.NoError(t, err) + + childJobIDs = append(childJobIDs, childJob) } // Spawn additional tasks that will send the error returned after @@ -100,7 +109,9 @@ func TestValidationBarrierQuit(t *testing.T) { jobErrs := make(chan error) for i := 0; i < numTasks; i++ { go func(ii int) { - jobErrs <- barrier.WaitForDependants(chanUpds[ii]) + jobErrs <- barrier.WaitForParents( + childJobIDs[ii], chanUpds[ii], + ) }(i) } @@ -117,14 +128,12 @@ func TestValidationBarrierQuit(t *testing.T) { // with the correct error. for i := 0; i < numTasks; i++ { switch { - // Signal completion for the first half of tasks, but only allow - // dependents to be processed as well for the second quarter. - case i < numTasks/4: - barrier.SignalDependants(anns[i], false) - barrier.CompleteJob() - case i < numTasks/2: - barrier.SignalDependants(anns[i], true) + err := barrier.SignalDependents( + anns[i], parentJobIDs[i], + ) + require.NoError(t, err) + barrier.CompleteJob() // At midpoint, quit the validation barrier. @@ -141,12 +150,7 @@ func TestValidationBarrierQuit(t *testing.T) { switch { // First half should return without failure. - case i < numTasks/4 && !graph.IsError( - err, graph.ErrParentValidationFailed, - ): - t.Fatalf("unexpected failure while waiting: %v", err) - - case i >= numTasks/4 && i < numTasks/2 && err != nil: + case i < numTasks/2 && err != nil: t.Fatalf("unexpected failure while waiting: %v", err) // Last half should return the shutdown error. @@ -159,6 +163,128 @@ func TestValidationBarrierQuit(t *testing.T) { } } +// TestValidationBarrierParentJobsClear tests that creating two parent jobs for +// ChannelUpdate / NodeAnnouncement will pause child jobs until the set of +// parent jobs has cleared. +func TestValidationBarrierParentJobsClear(t *testing.T) { + t.Parallel() + + const ( + numTasks = 8 + timeout = time.Second + ) + + quit := make(chan struct{}) + barrier := graph.NewValidationBarrier(numTasks, quit) + + sharedScid := lnwire.NewShortChanIDFromInt(0) + sharedNodeID := nodeIDFromInt(0) + + // Create a set of gossip messages that depend on each other. ann1 and + // ann2 share the ShortChannelID field. ann1 and ann3 share both the + // ShortChannelID field and the NodeID1 field. These shared values let + // us test the "set" properties of the ValidationBarrier. + ann1 := &lnwire.ChannelAnnouncement1{ + ShortChannelID: sharedScid, + NodeID1: sharedNodeID, + NodeID2: nodeIDFromInt(1), + } + parentID1, err := barrier.InitJobDependencies(ann1) + require.NoError(t, err) + + ann2 := &lnwire.ChannelAnnouncement1{ + ShortChannelID: sharedScid, + NodeID1: nodeIDFromInt(2), + NodeID2: nodeIDFromInt(3), + } + parentID2, err := barrier.InitJobDependencies(ann2) + require.NoError(t, err) + + ann3 := &lnwire.ChannelAnnouncement1{ + ShortChannelID: sharedScid, + NodeID1: sharedNodeID, + NodeID2: nodeIDFromInt(10), + } + parentID3, err := barrier.InitJobDependencies(ann3) + require.NoError(t, err) + + // Create the ChannelUpdate & NodeAnnouncement messages. + upd1 := &lnwire.ChannelUpdate1{ + ShortChannelID: sharedScid, + } + childID1, err := barrier.InitJobDependencies(upd1) + require.NoError(t, err) + + node1 := &lnwire.NodeAnnouncement{ + NodeID: sharedNodeID, + } + childID2, err := barrier.InitJobDependencies(node1) + require.NoError(t, err) + + run := func(vb *graph.ValidationBarrier, childJobID graph.JobID, + job interface{}, resp chan error, start chan error) { + + close(start) + + err := vb.WaitForParents(childJobID, job) + resp <- err + } + + errChan := make(chan error, 2) + + startChan1 := make(chan error, 1) + startChan2 := make(chan error, 1) + + go run(barrier, childID1, upd1, errChan, startChan1) + go run(barrier, childID2, node1, errChan, startChan2) + + // Wait for the start signal since we are testing the case where the + // parent jobs only complete _after_ the child jobs have called. Note + // that there is technically an edge case where we receive the start + // signal and call SignalDependents before WaitForParents can actually + // be called in the goroutine launched above. In this case, which + // arises due to our inability to control precisely when these VB + // methods are scheduled (as they are in different goroutines), the + // test should still pass as we want to test that validation jobs are + // completing and not stalling. In other words, this issue with the + // test itself is good as it actually randomizes some of the ordering, + // occasionally. This tests that the VB is robust against ordering / + // concurrency issues. + select { + case <-startChan1: + case <-time.After(timeout): + t.Fatal("timed out waiting for startChan1") + } + + select { + case <-startChan2: + case <-time.After(timeout): + t.Fatal("timed out waiting for startChan2") + } + + // Now we can call SignalDependents for our parent jobs. + err = barrier.SignalDependents(ann1, parentID1) + require.NoError(t, err) + + err = barrier.SignalDependents(ann2, parentID2) + require.NoError(t, err) + + err = barrier.SignalDependents(ann3, parentID3) + require.NoError(t, err) + + select { + case <-errChan: + case <-time.After(timeout): + t.Fatal("unexpected timeout waiting for first error signal") + } + + select { + case <-errChan: + case <-time.After(timeout): + t.Fatal("unexpected timeout waiting for second error signal") + } +} + // nodeIDFromInt creates a node ID by writing a uint64 to the first 8 bytes. func nodeIDFromInt(i uint64) [33]byte { var nodeID [33]byte From 554d4910ebbaf43c96ca8f1bea83f34e1d2a5886 Mon Sep 17 00:00:00 2001 From: Eugene Siegel Date: Wed, 6 Nov 2024 12:55:42 -0500 Subject: [PATCH 3/3] release-notes: update for 0.19.0 --- docs/release-notes/release-notes-0.19.0.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/docs/release-notes/release-notes-0.19.0.md b/docs/release-notes/release-notes-0.19.0.md index 4fc64028d3..61feb29e17 100644 --- a/docs/release-notes/release-notes-0.19.0.md +++ b/docs/release-notes/release-notes-0.19.0.md @@ -198,6 +198,9 @@ added and they will be removed in a future release. The defaults values for these options have also been increased from max 3 log files to 10 and from max 10 MB to 20 MB. + +* Refactored the `ValidationBarrier` to use + [set-based dependency tracking](https://github.com/lightningnetwork/lnd/pull/9241). * [Deprecate `dust-threshold` config option](https://github.com/lightningnetwork/lnd/pull/9182) and introduce