diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 1e1c65ca38..290e529bd9 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -517,6 +517,9 @@ type AuthenticatedGossiper struct { // AuthenticatedGossiper lock. chanUpdateRateLimiter map[uint64][2]*rate.Limiter + // vb is used to enforce job dependency ordering of gossip messages. + vb *ValidationBarrier + sync.Mutex } @@ -542,6 +545,8 @@ func New(cfg Config, selfKeyDesc *keychain.KeyDescriptor) *AuthenticatedGossiper banman: newBanman(), } + gossiper.vb = NewValidationBarrier(1000, gossiper.quit) + gossiper.syncMgr = newSyncManager(&SyncManagerCfg{ ChainHash: cfg.ChainHash, ChanSeries: cfg.ChanSeries, @@ -1409,10 +1414,6 @@ func (d *AuthenticatedGossiper) networkHandler() { log.Errorf("Unable to rebroadcast stale announcements: %v", err) } - // We'll use this validation to ensure that we process jobs in their - // dependency order during parallel validation. - validationBarrier := graph.NewValidationBarrier(1000, d.quit) - for { select { // A new policy update has arrived. We'll commit it to the @@ -1481,11 +1482,17 @@ func (d *AuthenticatedGossiper) networkHandler() { // We'll set up any dependent, and wait until a free // slot for this job opens up, this allow us to not // have thousands of goroutines active. - validationBarrier.InitJobDependencies(announcement.msg) + annJobID, err := d.vb.InitJobDependencies( + announcement.msg, + ) + if err != nil { + announcement.err <- err + continue + } d.wg.Add(1) go d.handleNetworkMessages( - announcement, &announcements, validationBarrier, + announcement, &announcements, annJobID, ) // The trickle timer has ticked, which indicates we should @@ -1536,10 +1543,10 @@ func (d *AuthenticatedGossiper) networkHandler() { // // NOTE: must be run as a goroutine. func (d *AuthenticatedGossiper) handleNetworkMessages(nMsg *networkMsg, - deDuped *deDupedAnnouncements, vb *graph.ValidationBarrier) { + deDuped *deDupedAnnouncements, jobID JobID) { defer d.wg.Done() - defer vb.CompleteJob() + defer d.vb.CompleteJob() // We should only broadcast this message forward if it originated from // us or it wasn't received as part of our initial historical sync. @@ -1547,17 +1554,12 @@ func (d *AuthenticatedGossiper) handleNetworkMessages(nMsg *networkMsg, // If this message has an existing dependency, then we'll wait until // that has been fully validated before we proceed. - err := vb.WaitForDependants(nMsg.msg) + err := d.vb.WaitForParents(jobID, nMsg.msg) if err != nil { log.Debugf("Validating network message %s got err: %v", nMsg.msg.MsgType(), err) - if !graph.IsError( - err, - graph.ErrVBarrierShuttingDown, - graph.ErrParentValidationFailed, - ) { - + if errors.Is(err, ErrVBarrierShuttingDown) { log.Warnf("unexpected error during validation "+ "barrier shutdown: %v", err) } @@ -1577,7 +1579,16 @@ func (d *AuthenticatedGossiper) handleNetworkMessages(nMsg *networkMsg, // If this message had any dependencies, then we can now signal them to // continue. - vb.SignalDependants(nMsg.msg, allow) + err = d.vb.SignalDependents(nMsg.msg, jobID) + if err != nil { + // Something is wrong if SignalDependents returns an error. + log.Errorf("SignalDependents returned error for msg=%v with "+ + "JobID=%v", spew.Sdump(nMsg.msg), jobID) + + nMsg.err <- err + + return + } // If the announcement was accepted, then add the emitted announcements // to our announce batch to be broadcast once the trickle timer ticks @@ -2407,7 +2418,6 @@ func (d *AuthenticatedGossiper) handleNodeAnnouncement(nMsg *networkMsg, err, graph.ErrOutdated, graph.ErrIgnored, - graph.ErrVBarrierShuttingDown, ) { log.Error(err) @@ -3148,7 +3158,6 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg, if graph.IsError( err, graph.ErrOutdated, graph.ErrIgnored, - graph.ErrVBarrierShuttingDown, ) { log.Debugf("Update edge for short_chan_id(%v) got: %v", diff --git a/discovery/validation_barrier.go b/discovery/validation_barrier.go new file mode 100644 index 0000000000..bd3bc70392 --- /dev/null +++ b/discovery/validation_barrier.go @@ -0,0 +1,464 @@ +package discovery + +import ( + "fmt" + "sync" + "sync/atomic" + + "github.com/go-errors/errors" + "github.com/lightningnetwork/lnd/fn/v2" + "github.com/lightningnetwork/lnd/lnwire" + "github.com/lightningnetwork/lnd/routing/route" +) + +var ( + // ErrVBarrierShuttingDown signals that the barrier has been requested + // to shutdown, and that the caller should not treat the wait condition + // as fulfilled. + ErrVBarrierShuttingDown = errors.New("ValidationBarrier shutting down") +) + +// JobID identifies an active job in the validation barrier. It is large so +// that we don't need to worry about overflows. +type JobID uint64 + +// jobInfo stores job dependency info for a set of dependent gossip messages. +type jobInfo struct { + // activeParentJobIDs is the set of active parent job ids. + activeParentJobIDs fn.Set[JobID] + + // activeDependentJobs is the set of active dependent job ids. + activeDependentJobs fn.Set[JobID] +} + +// ValidationBarrier is a barrier used to enforce a strict validation order +// while concurrently validating other updates for channel edges. It uses a set +// of maps to track validation dependencies. This is needed in practice because +// gossip messages for a given channel may arive in order, but then due to +// scheduling in different goroutines, may be validated in the wrong order. +// With the ValidationBarrier, the dependent update will wait until the parent +// update completes. +type ValidationBarrier struct { + // validationSemaphore is a channel of structs which is used as a + // semaphore. Initially we'll fill this with a buffered channel of the + // size of the number of active requests. Each new job will consume + // from this channel, then restore the value upon completion. + validationSemaphore chan struct{} + + // jobInfoMap stores the set of job ids for each channel. + // NOTE: This MUST be used with the mutex. + // NOTE: This currently stores string representations of + // lnwire.ShortChannelID and route.Vertex. Since these are of different + // lengths, collision cannot occur in their string representations. + // N.B.: Check that any new string-converted types don't collide with + // existing string-converted types. + jobInfoMap map[string]*jobInfo + + // jobDependencies is a mapping from a child's JobID to the set of + // parent JobID that it depends on. + // NOTE: This MUST be used with the mutex. + jobDependencies map[JobID]fn.Set[JobID] + + // childJobChans stores the notification channel that each child job + // listens on for parent job completions. + // NOTE: This MUST be used with the mutex. + childJobChans map[JobID]chan struct{} + + // idCtr is an atomic integer that is used to assign JobIDs. + idCtr atomic.Uint64 + + quit chan struct{} + sync.Mutex +} + +// NewValidationBarrier creates a new instance of a validation barrier given +// the total number of active requests, and a quit channel which will be used +// to know when to kill pending, but unfilled jobs. +func NewValidationBarrier(numActiveReqs int, + quitChan chan struct{}) *ValidationBarrier { + + v := &ValidationBarrier{ + jobInfoMap: make(map[string]*jobInfo), + jobDependencies: make(map[JobID]fn.Set[JobID]), + childJobChans: make(map[JobID]chan struct{}), + quit: quitChan, + } + + // We'll first initialize a set of semaphores to limit our concurrency + // when validating incoming requests in parallel. + v.validationSemaphore = make(chan struct{}, numActiveReqs) + for i := 0; i < numActiveReqs; i++ { + v.validationSemaphore <- struct{}{} + } + + return v +} + +// InitJobDependencies will wait for a new job slot to become open, and then +// sets up any dependent signals/trigger for the new job. +func (v *ValidationBarrier) InitJobDependencies(job interface{}) (JobID, + error) { + + // We'll wait for either a new slot to become open, or for the quit + // channel to be closed. + select { + case <-v.validationSemaphore: + case <-v.quit: + } + + v.Lock() + defer v.Unlock() + + // updateOrCreateJobInfo modifies the set of activeParentJobs for this + // annID and updates jobInfoMap. + updateOrCreateJobInfo := func(annID string, annJobID JobID) { + info, ok := v.jobInfoMap[annID] + if ok { + // If an entry already exists for annID, then a job + // related to it is being validated. Add to the set of + // parent job ids. This addition will only affect + // _later_, _child_ jobs for the annID. + info.activeParentJobIDs.Add(annJobID) + return + } + + // No entry exists for annID, meaning that we should create + // one. + parentJobSet := fn.NewSet(annJobID) + + info = &jobInfo{ + activeParentJobIDs: parentJobSet, + activeDependentJobs: fn.NewSet[JobID](), + } + v.jobInfoMap[annID] = info + } + + // populateDependencies populates the job dependency mappings (i.e. + // which should complete after another) for the (annID, childJobID) + // tuple. + populateDependencies := func(annID string, childJobID JobID) { + // If there is no entry in the jobInfoMap, we don't have to + // wait on any parent jobs to finish. + info, ok := v.jobInfoMap[annID] + if !ok { + return + } + + // We want to see a snapshot of active parent jobs for this + // annID that are already registered in activeParentJobIDs. The + // child job identified by childJobID can only run after these + // parent jobs have run. After grabbing the snapshot, we then + // want to persist a slice of these jobs. + + // Create the notification chan that parent jobs will send (or + // close) on when they complete. + jobChan := make(chan struct{}) + + // Add to set of activeDependentJobs for this annID. + info.activeDependentJobs.Add(childJobID) + + // Store in childJobChans. The parent jobs will fetch this chan + // to notify on. The child job will later fetch this chan to + // listen on when WaitForParents is called. + v.childJobChans[childJobID] = jobChan + + // Copy over the parent job IDs at this moment for this annID. + // This job must be processed AFTER those parent IDs. + parentJobs := info.activeParentJobIDs.Copy() + + // Populate the jobDependencies mapping. + v.jobDependencies[childJobID] = parentJobs + } + + // Once a slot is open, we'll examine the message of the job, to see if + // there need to be any dependent barriers set up. + switch msg := job.(type) { + case *lnwire.ChannelAnnouncement1: + id := JobID(v.idCtr.Add(1)) + + updateOrCreateJobInfo(msg.ShortChannelID.String(), id) + updateOrCreateJobInfo(route.Vertex(msg.NodeID1).String(), id) + updateOrCreateJobInfo(route.Vertex(msg.NodeID2).String(), id) + + return id, nil + + // Populate the dependency mappings for the below child jobs. + case *lnwire.ChannelUpdate1: + childJobID := JobID(v.idCtr.Add(1)) + populateDependencies(msg.ShortChannelID.String(), childJobID) + + return childJobID, nil + case *lnwire.NodeAnnouncement: + childJobID := JobID(v.idCtr.Add(1)) + populateDependencies( + route.Vertex(msg.NodeID).String(), childJobID, + ) + + return childJobID, nil + case *lnwire.AnnounceSignatures1: + // TODO(roasbeef): need to wait on chan ann? + // - We can do the above by calling populateDependencies. For + // now, while we evaluate potential side effects, don't do + // anything with childJobID and just return it. + childJobID := JobID(v.idCtr.Add(1)) + return childJobID, nil + + default: + // An invalid message was passed into InitJobDependencies. + // Return an error. + return JobID(0), errors.New("invalid message") + } +} + +// CompleteJob returns a free slot to the set of available job slots. This +// should be called once a job has been fully completed. Otherwise, slots may +// not be returned to the internal scheduling, causing a deadlock when a new +// overflow job is attempted. +func (v *ValidationBarrier) CompleteJob() { + select { + case v.validationSemaphore <- struct{}{}: + case <-v.quit: + } +} + +// WaitForParents will block until all parent job dependencies have went +// through the validation pipeline. This allows us a graceful way to run jobs +// in goroutines and still have strict ordering guarantees. If this job doesn't +// have any parent job dependencies, then this function will return +// immediately. +func (v *ValidationBarrier) WaitForParents(childJobID JobID, + job interface{}) error { + + var ( + ok bool + jobDesc string + + parentJobIDs fn.Set[JobID] + annID string + jobChan chan struct{} + ) + + // Acquire a lock to read ValidationBarrier. + v.Lock() + + switch msg := job.(type) { + // Any ChannelUpdate or NodeAnnouncement jobs will need to wait on the + // completion of any active ChannelAnnouncement jobs related to them. + case *lnwire.ChannelUpdate1: + annID = msg.ShortChannelID.String() + + parentJobIDs, ok = v.jobDependencies[childJobID] + if !ok { + // If ok is false, it means that this child job never + // had any parent jobs to wait on. + v.Unlock() + return nil + } + + jobDesc = fmt.Sprintf("job=lnwire.ChannelUpdate, scid=%v", + msg.ShortChannelID.ToUint64()) + + case *lnwire.NodeAnnouncement: + annID = route.Vertex(msg.NodeID).String() + + parentJobIDs, ok = v.jobDependencies[childJobID] + if !ok { + // If ok is false, it means that this child job never + // had any parent jobs to wait on. + v.Unlock() + return nil + } + + jobDesc = fmt.Sprintf("job=lnwire.NodeAnnouncement, pub=%s", + route.Vertex(msg.NodeID)) + + // Other types of jobs can be executed immediately, so we'll just + // return directly. + case *lnwire.AnnounceSignatures1: + // TODO(roasbeef): need to wait on chan ann? + v.Unlock() + return nil + + case *lnwire.ChannelAnnouncement1: + v.Unlock() + return nil + } + + // Release the lock once the above read is finished. + v.Unlock() + + log.Debugf("Waiting for dependent on %s", jobDesc) + + v.Lock() + jobChan, ok = v.childJobChans[childJobID] + if !ok { + v.Unlock() + + // The entry may not exist because this job does not depend on + // any parent jobs. + return nil + } + v.Unlock() + + for { + select { + case <-v.quit: + return ErrVBarrierShuttingDown + + case <-jobChan: + // Every time this is sent on or if it's closed, a + // parent job has finished. The parent jobs have to + // also potentially close the channel because if all + // the parent jobs finish and call SignalDependents + // before the goroutine running WaitForParents has a + // chance to grab the notification chan from + // childJobChans, then the running goroutine will wait + // here for a notification forever. By having the last + // parent job close the notificiation chan, we avoid + // this issue. + + // Check and see if we have any parent jobs left. If we + // don't, we can finish up. + v.Lock() + info, found := v.jobInfoMap[annID] + if !found { + v.Unlock() + + // No parent job info found, proceed with + // validation. + return nil + } + + x := parentJobIDs.Intersect(info.activeParentJobIDs) + v.Unlock() + if x.IsEmpty() { + // The parent jobs have all completed. We can + // proceed with validation. + return nil + } + + // If we've reached this point, we are still waiting on + // a parent job to complete. + } + } +} + +// SignalDependents signals to any child jobs that this parent job has +// finished. +func (v *ValidationBarrier) SignalDependents(job interface{}, id JobID) error { + v.Lock() + defer v.Unlock() + + // removeJob either removes a child job or a parent job. If it is + // removing a child job, then it removes the child's JobID from the set + // of dependent jobs for the announcement ID. If this is removing a + // parent job, then it removes the parentJobID from the set of active + // parent jobs and notifies the child jobs that it has finished + // validating. + removeJob := func(annID string, id JobID, child bool) error { + if child { + // If we're removing a child job, check jobInfoMap and + // remove this job from activeDependentJobs. + info, ok := v.jobInfoMap[annID] + if ok { + info.activeDependentJobs.Remove(id) + } + + // Remove the notification chan from childJobChans. + delete(v.childJobChans, id) + + // Remove this job's dependency mapping. + delete(v.jobDependencies, id) + + return nil + } + + // Otherwise, we are removing a parent job. + jobInfo, found := v.jobInfoMap[annID] + if !found { + // NOTE: Some sort of consistency guarantee has been + // broken. + return fmt.Errorf("no job info found for "+ + "identifier(%v)", id) + } + + jobInfo.activeParentJobIDs.Remove(id) + + lastJob := jobInfo.activeParentJobIDs.IsEmpty() + + // Notify all dependent jobs that a parent job has completed. + for child := range jobInfo.activeDependentJobs { + notifyChan, ok := v.childJobChans[child] + if !ok { + // NOTE: Some sort of consistency guarantee has + // been broken. + return fmt.Errorf("no job info found for "+ + "identifier(%v)", id) + } + + // We don't want to block when sending out the signal. + select { + case notifyChan <- struct{}{}: + default: + } + + // If this is the last parent job for this annID, also + // close the channel. This is needed because it's + // possible that the parent job cleans up the job + // mappings before the goroutine handling the child job + // has a chance to call WaitForParents and catch the + // signal sent above. We are allowed to close because + // no other parent job will be able to send along the + // channel (or close) as we're removing the entry from + // the jobInfoMap below. + if lastJob { + close(notifyChan) + } + } + + // Remove from jobInfoMap if last job. + if lastJob { + delete(v.jobInfoMap, annID) + } + + return nil + } + + switch msg := job.(type) { + case *lnwire.ChannelAnnouncement1: + // Signal to the child jobs that parent validation has + // finished. We have to call removeJob for each annID + // that this ChannelAnnouncement can be associated with. + err := removeJob(msg.ShortChannelID.String(), id, false) + if err != nil { + return err + } + + err = removeJob(route.Vertex(msg.NodeID1).String(), id, false) + if err != nil { + return err + } + + err = removeJob(route.Vertex(msg.NodeID2).String(), id, false) + if err != nil { + return err + } + + return nil + + case *lnwire.NodeAnnouncement: + // Remove child job info. + return removeJob(route.Vertex(msg.NodeID).String(), id, true) + + case *lnwire.ChannelUpdate1: + // Remove child job info. + return removeJob(msg.ShortChannelID.String(), id, true) + + case *lnwire.AnnounceSignatures1: + // No dependency mappings are stored for AnnounceSignatures1, + // so do nothing. + return nil + } + + return errors.New("invalid message - no job dependencies") +} diff --git a/discovery/validation_barrier_test.go b/discovery/validation_barrier_test.go new file mode 100644 index 0000000000..bc58fe9ec9 --- /dev/null +++ b/discovery/validation_barrier_test.go @@ -0,0 +1,315 @@ +package discovery + +import ( + "encoding/binary" + "errors" + "sync" + "testing" + "time" + + "github.com/lightningnetwork/lnd/lnwire" + "github.com/stretchr/testify/require" +) + +// TestValidationBarrierSemaphore checks basic properties of the validation +// barrier's semaphore wrt. enqueuing/dequeuing. +func TestValidationBarrierSemaphore(t *testing.T) { + t.Parallel() + + const ( + numTasks = 8 + numPendingTasks = 8 + timeout = 50 * time.Millisecond + ) + + quit := make(chan struct{}) + barrier := NewValidationBarrier(numTasks, quit) + + var scidMtx sync.RWMutex + currentScid := lnwire.ShortChannelID{} + + // Saturate the semaphore with jobs. + for i := 0; i < numTasks; i++ { + scidMtx.Lock() + dummyUpdate := &lnwire.ChannelUpdate1{ + ShortChannelID: currentScid, + } + currentScid.TxIndex++ + scidMtx.Unlock() + + _, err := barrier.InitJobDependencies(dummyUpdate) + require.NoError(t, err) + } + + // Spawn additional tasks that will signal completion when added. + jobAdded := make(chan struct{}) + for i := 0; i < numPendingTasks; i++ { + go func() { + scidMtx.Lock() + dummyUpdate := &lnwire.ChannelUpdate1{ + ShortChannelID: currentScid, + } + currentScid.TxIndex++ + scidMtx.Unlock() + + _, err := barrier.InitJobDependencies(dummyUpdate) + require.NoError(t, err) + + jobAdded <- struct{}{} + }() + } + + // Check that no jobs are added while semaphore is full. + select { + case <-time.After(timeout): + // Expected since no slots open. + case <-jobAdded: + t.Fatalf("job should not have been added") + } + + // Complete jobs one at a time and verify that they get added. + for i := 0; i < numPendingTasks; i++ { + barrier.CompleteJob() + + select { + case <-time.After(timeout): + t.Fatalf("timeout waiting for job to be added") + case <-jobAdded: + // Expected since one slot opened up. + } + } +} + +// TestValidationBarrierQuit checks that pending validation tasks will return an +// error from WaitForDependants if the barrier's quit signal is canceled. +func TestValidationBarrierQuit(t *testing.T) { + t.Parallel() + + const ( + numTasks = 8 + timeout = 50 * time.Millisecond + ) + + quit := make(chan struct{}) + barrier := NewValidationBarrier(2*numTasks, quit) + + // Create a set of unique channel announcements that we will prep for + // validation. + anns := make([]*lnwire.ChannelAnnouncement1, 0, numTasks) + parentJobIDs := make([]JobID, 0, numTasks) + for i := 0; i < numTasks; i++ { + anns = append(anns, &lnwire.ChannelAnnouncement1{ + ShortChannelID: lnwire.NewShortChanIDFromInt(uint64(i)), + NodeID1: nodeIDFromInt(uint64(2 * i)), + NodeID2: nodeIDFromInt(uint64(2*i + 1)), + }) + parentJobID, err := barrier.InitJobDependencies(anns[i]) + require.NoError(t, err) + + parentJobIDs = append(parentJobIDs, parentJobID) + } + + // Create a set of channel updates, that must wait until their + // associated channel announcement has been verified. + chanUpds := make([]*lnwire.ChannelUpdate1, 0, numTasks) + childJobIDs := make([]JobID, 0, numTasks) + for i := 0; i < numTasks; i++ { + chanUpds = append(chanUpds, &lnwire.ChannelUpdate1{ + ShortChannelID: lnwire.NewShortChanIDFromInt(uint64(i)), + }) + childJob, err := barrier.InitJobDependencies(chanUpds[i]) + require.NoError(t, err) + + childJobIDs = append(childJobIDs, childJob) + } + + // Spawn additional tasks that will send the error returned after + // waiting for the announcements to finish. In the background, we will + // iteratively queue the channel updates, which will send back the error + // returned from waiting. + jobErrs := make(chan error) + for i := 0; i < numTasks; i++ { + go func(ii int) { + jobErrs <- barrier.WaitForParents( + childJobIDs[ii], chanUpds[ii], + ) + }(i) + } + + // Check that no jobs are added while semaphore is full. + select { + case <-time.After(timeout): + // Expected since no slots open. + case <-jobErrs: + t.Fatalf("job should not have been signaled") + } + + // Complete the first half of jobs, one at a time, verifying that they + // get signaled. Then, quit the barrier and check that all others exit + // with the correct error. + for i := 0; i < numTasks; i++ { + switch { + case i < numTasks/2: + err := barrier.SignalDependents( + anns[i], parentJobIDs[i], + ) + require.NoError(t, err) + + barrier.CompleteJob() + + // At midpoint, quit the validation barrier. + case i == numTasks/2: + close(quit) + } + + var err error + select { + case <-time.After(timeout): + t.Fatalf("timeout waiting for job to be signaled") + case err = <-jobErrs: + } + + switch { + // First half should return without failure. + case i < numTasks/2 && err != nil: + t.Fatalf("unexpected failure while waiting: %v", err) + + // Last half should return the shutdown error. + case i >= numTasks/2 && !errors.Is( + err, ErrVBarrierShuttingDown, + ): + + t.Fatalf("expected failure after quitting: want %v, "+ + "got %v", ErrVBarrierShuttingDown, err) + } + } +} + +// TestValidationBarrierParentJobsClear tests that creating two parent jobs for +// ChannelUpdate / NodeAnnouncement will pause child jobs until the set of +// parent jobs has cleared. +func TestValidationBarrierParentJobsClear(t *testing.T) { + t.Parallel() + + const ( + numTasks = 8 + timeout = time.Second + ) + + quit := make(chan struct{}) + barrier := NewValidationBarrier(numTasks, quit) + + sharedScid := lnwire.NewShortChanIDFromInt(0) + sharedNodeID := nodeIDFromInt(0) + + // Create a set of gossip messages that depend on each other. ann1 and + // ann2 share the ShortChannelID field. ann1 and ann3 share both the + // ShortChannelID field and the NodeID1 field. These shared values let + // us test the "set" properties of the ValidationBarrier. + ann1 := &lnwire.ChannelAnnouncement1{ + ShortChannelID: sharedScid, + NodeID1: sharedNodeID, + NodeID2: nodeIDFromInt(1), + } + parentID1, err := barrier.InitJobDependencies(ann1) + require.NoError(t, err) + + ann2 := &lnwire.ChannelAnnouncement1{ + ShortChannelID: sharedScid, + NodeID1: nodeIDFromInt(2), + NodeID2: nodeIDFromInt(3), + } + parentID2, err := barrier.InitJobDependencies(ann2) + require.NoError(t, err) + + ann3 := &lnwire.ChannelAnnouncement1{ + ShortChannelID: sharedScid, + NodeID1: sharedNodeID, + NodeID2: nodeIDFromInt(10), + } + parentID3, err := barrier.InitJobDependencies(ann3) + require.NoError(t, err) + + // Create the ChannelUpdate & NodeAnnouncement messages. + upd1 := &lnwire.ChannelUpdate1{ + ShortChannelID: sharedScid, + } + childID1, err := barrier.InitJobDependencies(upd1) + require.NoError(t, err) + + node1 := &lnwire.NodeAnnouncement{ + NodeID: sharedNodeID, + } + childID2, err := barrier.InitJobDependencies(node1) + require.NoError(t, err) + + run := func(vb *ValidationBarrier, childJobID JobID, job interface{}, + resp chan error, start chan error) { + + close(start) + + err := vb.WaitForParents(childJobID, job) + resp <- err + } + + errChan := make(chan error, 2) + + startChan1 := make(chan error, 1) + startChan2 := make(chan error, 1) + + go run(barrier, childID1, upd1, errChan, startChan1) + go run(barrier, childID2, node1, errChan, startChan2) + + // Wait for the start signal since we are testing the case where the + // parent jobs only complete _after_ the child jobs have called. Note + // that there is technically an edge case where we receive the start + // signal and call SignalDependents before WaitForParents can actually + // be called in the goroutine launched above. In this case, which + // arises due to our inability to control precisely when these VB + // methods are scheduled (as they are in different goroutines), the + // test should still pass as we want to test that validation jobs are + // completing and not stalling. In other words, this issue with the + // test itself is good as it actually randomizes some of the ordering, + // occasionally. This tests that the VB is robust against ordering / + // concurrency issues. + select { + case <-startChan1: + case <-time.After(timeout): + t.Fatal("timed out waiting for startChan1") + } + + select { + case <-startChan2: + case <-time.After(timeout): + t.Fatal("timed out waiting for startChan2") + } + + // Now we can call SignalDependents for our parent jobs. + err = barrier.SignalDependents(ann1, parentID1) + require.NoError(t, err) + + err = barrier.SignalDependents(ann2, parentID2) + require.NoError(t, err) + + err = barrier.SignalDependents(ann3, parentID3) + require.NoError(t, err) + + select { + case <-errChan: + case <-time.After(timeout): + t.Fatal("unexpected timeout waiting for first error signal") + } + + select { + case <-errChan: + case <-time.After(timeout): + t.Fatal("unexpected timeout waiting for second error signal") + } +} + +// nodeIDFromInt creates a node ID by writing a uint64 to the first 8 bytes. +func nodeIDFromInt(i uint64) [33]byte { + var nodeID [33]byte + binary.BigEndian.PutUint64(nodeID[:8], i) + return nodeID +} diff --git a/docs/release-notes/release-notes-0.19.0.md b/docs/release-notes/release-notes-0.19.0.md index 2d4edf1e20..d9f5c14928 100644 --- a/docs/release-notes/release-notes-0.19.0.md +++ b/docs/release-notes/release-notes-0.19.0.md @@ -220,6 +220,9 @@ added and they will be removed in a future release. The defaults values for these options have also been increased from max 3 log files to 10 and from max 10 MB to 20 MB. + +* Refactored the `ValidationBarrier` to use + [set-based dependency tracking](https://github.com/lightningnetwork/lnd/pull/9241). * [Deprecate `dust-threshold` config option](https://github.com/lightningnetwork/lnd/pull/9182) and introduce diff --git a/graph/builder.go b/graph/builder.go index 1aa9488007..6321562822 100644 --- a/graph/builder.go +++ b/graph/builder.go @@ -3,7 +3,6 @@ package graph import ( "bytes" "fmt" - "runtime" "strings" "sync" "sync/atomic" @@ -669,51 +668,21 @@ func (b *Builder) pruneZombieChans() error { // notifies topology changes, if any. // // NOTE: must be run inside goroutine. -func (b *Builder) handleNetworkUpdate(vb *ValidationBarrier, - update *routingMsg) { - +func (b *Builder) handleNetworkUpdate(update *routingMsg) { defer b.wg.Done() - defer vb.CompleteJob() - - // If this message has an existing dependency, then we'll wait until - // that has been fully validated before we proceed. - err := vb.WaitForDependants(update.msg) - if err != nil { - switch { - case IsError(err, ErrVBarrierShuttingDown): - update.err <- err - - case IsError(err, ErrParentValidationFailed): - update.err <- NewErrf(ErrIgnored, err.Error()) //nolint - - default: - log.Warnf("unexpected error during validation "+ - "barrier shutdown: %v", err) - update.err <- err - } - - return - } // Process the routing update to determine if this is either a new // update from our PoV or an update to a prior vertex/edge we // previously accepted. - err = b.processUpdate(update.msg, update.op...) + err := b.processUpdate(update.msg, update.op...) update.err <- err - // If this message had any dependencies, then we can now signal them to - // continue. - allowDependents := err == nil || IsError(err, ErrIgnored, ErrOutdated) - vb.SignalDependants(update.msg, allowDependents) - // If the error is not nil here, there's no need to send topology // change. if err != nil { - // We now decide to log an error or not. If allowDependents is - // false, it means there is an error and the error is neither - // ErrIgnored or ErrOutdated. In this case, we'll log an error. - // Otherwise, we'll add debug log only. - if allowDependents { + // Log as a debug message if this is not an error we need to be + // concerned about. + if IsError(err, ErrIgnored, ErrOutdated) { log.Debugf("process network updates got: %v", err) } else { log.Errorf("process network updates got: %v", err) @@ -753,31 +722,6 @@ func (b *Builder) networkHandler() { b.stats.Reset() - // We'll use this validation barrier to ensure that we process all jobs - // in the proper order during parallel validation. - // - // NOTE: For AssumeChannelValid, we bump up the maximum number of - // concurrent validation requests since there are no blocks being - // fetched. This significantly increases the performance of IGD for - // neutrino nodes. - // - // However, we dial back to use multiple of the number of cores when - // fully validating, to avoid fetching up to 1000 blocks from the - // backend. On bitcoind, this will empirically cause massive latency - // spikes when executing this many concurrent RPC calls. Critical - // subsystems or basic rpc calls that rely on calls such as GetBestBlock - // will hang due to excessive load. - // - // See https://github.com/lightningnetwork/lnd/issues/4892. - var validationBarrier *ValidationBarrier - if b.cfg.AssumeChannelValid { - validationBarrier = NewValidationBarrier(1000, b.quit) - } else { - validationBarrier = NewValidationBarrier( - 4*runtime.NumCPU(), b.quit, - ) - } - for { // If there are stats, resume the statTicker. if !b.stats.Empty() { @@ -789,13 +733,8 @@ func (b *Builder) networkHandler() { // result we'll modify the channel graph accordingly depending // on the exact type of the message. case update := <-b.networkUpdates: - // We'll set up any dependants, and wait until a free - // slot for this job opens up, this allows us to not - // have thousands of goroutines active. - validationBarrier.InitJobDependencies(update.msg) - b.wg.Add(1) - go b.handleNetworkUpdate(validationBarrier, update) + go b.handleNetworkUpdate(update) // TODO(roasbeef): remove all unconnected vertexes // after N blocks pass with no corresponding diff --git a/graph/errors.go b/graph/errors.go index 0a1d6fd244..30996bb948 100644 --- a/graph/errors.go +++ b/graph/errors.go @@ -28,15 +28,6 @@ const ( // ErrInvalidFundingOutput is returned if the channel funding output // fails validation. ErrInvalidFundingOutput - - // ErrVBarrierShuttingDown signals that the barrier has been requested - // to shutdown, and that the caller should not treat the wait condition - // as fulfilled. - ErrVBarrierShuttingDown - - // ErrParentValidationFailed signals that the validation of a - // dependent's parent failed, so the dependent must not be processed. - ErrParentValidationFailed ) // Error is a structure that represent the error inside the graph package, diff --git a/graph/validation_barrier.go b/graph/validation_barrier.go deleted file mode 100644 index a97709605e..0000000000 --- a/graph/validation_barrier.go +++ /dev/null @@ -1,306 +0,0 @@ -package graph - -import ( - "fmt" - "sync" - - "github.com/lightningnetwork/lnd/graph/db/models" - "github.com/lightningnetwork/lnd/lnwire" - "github.com/lightningnetwork/lnd/routing/route" -) - -// validationSignals contains two signals which allows the ValidationBarrier to -// communicate back to the caller whether a dependent should be processed or not -// based on whether its parent was successfully validated. Only one of these -// signals is to be used at a time. -type validationSignals struct { - // allow is the signal used to allow a dependent to be processed. - allow chan struct{} - - // deny is the signal used to prevent a dependent from being processed. - deny chan struct{} -} - -// ValidationBarrier is a barrier used to ensure proper validation order while -// concurrently validating new announcements for channel edges, and the -// attributes of channel edges. It uses this set of maps (protected by this -// mutex) to track validation dependencies. For a given channel our -// dependencies look like this: chanAnn <- chanUp <- nodeAnn. That is we must -// validate the item on the left of the arrow before that on the right. -type ValidationBarrier struct { - // validationSemaphore is a channel of structs which is used as a - // semaphore. Initially we'll fill this with a buffered channel of the - // size of the number of active requests. Each new job will consume - // from this channel, then restore the value upon completion. - validationSemaphore chan struct{} - - // chanAnnFinSignal is map that keep track of all the pending - // ChannelAnnouncement like validation job going on. Once the job has - // been completed, the channel will be closed unblocking any - // dependants. - chanAnnFinSignal map[lnwire.ShortChannelID]*validationSignals - - // chanEdgeDependencies tracks any channel edge updates which should - // wait until the completion of the ChannelAnnouncement before - // proceeding. This is a dependency, as we can't validate the update - // before we validate the announcement which creates the channel - // itself. - chanEdgeDependencies map[lnwire.ShortChannelID]*validationSignals - - // nodeAnnDependencies tracks any pending NodeAnnouncement validation - // jobs which should wait until the completion of the - // ChannelAnnouncement before proceeding. - nodeAnnDependencies map[route.Vertex]*validationSignals - - quit chan struct{} - sync.Mutex -} - -// NewValidationBarrier creates a new instance of a validation barrier given -// the total number of active requests, and a quit channel which will be used -// to know when to kill pending, but unfilled jobs. -func NewValidationBarrier(numActiveReqs int, - quitChan chan struct{}) *ValidationBarrier { - - v := &ValidationBarrier{ - chanAnnFinSignal: make(map[lnwire.ShortChannelID]*validationSignals), - chanEdgeDependencies: make(map[lnwire.ShortChannelID]*validationSignals), - nodeAnnDependencies: make(map[route.Vertex]*validationSignals), - quit: quitChan, - } - - // We'll first initialize a set of semaphores to limit our concurrency - // when validating incoming requests in parallel. - v.validationSemaphore = make(chan struct{}, numActiveReqs) - for i := 0; i < numActiveReqs; i++ { - v.validationSemaphore <- struct{}{} - } - - return v -} - -// InitJobDependencies will wait for a new job slot to become open, and then -// sets up any dependent signals/trigger for the new job -func (v *ValidationBarrier) InitJobDependencies(job interface{}) { - // We'll wait for either a new slot to become open, or for the quit - // channel to be closed. - select { - case <-v.validationSemaphore: - case <-v.quit: - } - - v.Lock() - defer v.Unlock() - - // Once a slot is open, we'll examine the message of the job, to see if - // there need to be any dependent barriers set up. - switch msg := job.(type) { - - // If this is a channel announcement, then we'll need to set up den - // tenancies, as we'll need to verify this before we verify any - // ChannelUpdates for the same channel, or NodeAnnouncements of nodes - // that are involved in this channel. This goes for both the wire - // type,s and also the types that we use within the database. - case *lnwire.ChannelAnnouncement1: - - // We ensure that we only create a new announcement signal iff, - // one doesn't already exist, as there may be duplicate - // announcements. We'll close this signal once the - // ChannelAnnouncement has been validated. This will result in - // all the dependent jobs being unlocked so they can finish - // execution themselves. - if _, ok := v.chanAnnFinSignal[msg.ShortChannelID]; !ok { - // We'll create the channel that we close after we - // validate this announcement. All dependants will - // point to this same channel, so they'll be unblocked - // at the same time. - signals := &validationSignals{ - allow: make(chan struct{}), - deny: make(chan struct{}), - } - - v.chanAnnFinSignal[msg.ShortChannelID] = signals - v.chanEdgeDependencies[msg.ShortChannelID] = signals - - v.nodeAnnDependencies[route.Vertex(msg.NodeID1)] = signals - v.nodeAnnDependencies[route.Vertex(msg.NodeID2)] = signals - } - case *models.ChannelEdgeInfo: - - shortID := lnwire.NewShortChanIDFromInt(msg.ChannelID) - if _, ok := v.chanAnnFinSignal[shortID]; !ok { - signals := &validationSignals{ - allow: make(chan struct{}), - deny: make(chan struct{}), - } - - v.chanAnnFinSignal[shortID] = signals - v.chanEdgeDependencies[shortID] = signals - - v.nodeAnnDependencies[route.Vertex(msg.NodeKey1Bytes)] = signals - v.nodeAnnDependencies[route.Vertex(msg.NodeKey2Bytes)] = signals - } - - // These other types don't have any dependants, so no further - // initialization needs to be done beyond just occupying a job slot. - case *models.ChannelEdgePolicy: - return - case *lnwire.ChannelUpdate1: - return - case *lnwire.NodeAnnouncement: - // TODO(roasbeef): node ann needs to wait on existing channel updates - return - case *models.LightningNode: - return - case *lnwire.AnnounceSignatures1: - // TODO(roasbeef): need to wait on chan ann? - return - } -} - -// CompleteJob returns a free slot to the set of available job slots. This -// should be called once a job has been fully completed. Otherwise, slots may -// not be returned to the internal scheduling, causing a deadlock when a new -// overflow job is attempted. -func (v *ValidationBarrier) CompleteJob() { - select { - case v.validationSemaphore <- struct{}{}: - case <-v.quit: - } -} - -// WaitForDependants will block until any jobs that this job dependants on have -// finished executing. This allows us a graceful way to schedule goroutines -// based on any pending uncompleted dependent jobs. If this job doesn't have an -// active dependent, then this function will return immediately. -func (v *ValidationBarrier) WaitForDependants(job interface{}) error { - - var ( - signals *validationSignals - ok bool - jobDesc string - ) - - // Acquire a lock to read ValidationBarrier. - v.Lock() - - switch msg := job.(type) { - // Any ChannelUpdate or NodeAnnouncement jobs will need to wait on the - // completion of any active ChannelAnnouncement jobs related to them. - case *models.ChannelEdgePolicy: - shortID := lnwire.NewShortChanIDFromInt(msg.ChannelID) - signals, ok = v.chanEdgeDependencies[shortID] - - jobDesc = fmt.Sprintf("job=lnwire.ChannelEdgePolicy, scid=%v", - msg.ChannelID) - - case *models.LightningNode: - vertex := route.Vertex(msg.PubKeyBytes) - signals, ok = v.nodeAnnDependencies[vertex] - - jobDesc = fmt.Sprintf("job=channeldb.LightningNode, pub=%s", - vertex) - - case *lnwire.ChannelUpdate1: - signals, ok = v.chanEdgeDependencies[msg.ShortChannelID] - - jobDesc = fmt.Sprintf("job=lnwire.ChannelUpdate, scid=%v", - msg.ShortChannelID.ToUint64()) - - case *lnwire.NodeAnnouncement: - vertex := route.Vertex(msg.NodeID) - signals, ok = v.nodeAnnDependencies[vertex] - jobDesc = fmt.Sprintf("job=lnwire.NodeAnnouncement, pub=%s", - vertex) - - // Other types of jobs can be executed immediately, so we'll just - // return directly. - case *lnwire.AnnounceSignatures1: - // TODO(roasbeef): need to wait on chan ann? - case *models.ChannelEdgeInfo: - case *lnwire.ChannelAnnouncement1: - } - - // Release the lock once the above read is finished. - v.Unlock() - - // If it's not ok, it means either the job is not a dependent type, or - // it doesn't have a dependency signal. Either way, we can return - // early. - if !ok { - return nil - } - - log.Debugf("Waiting for dependent on %s", jobDesc) - - // If we do have an active job, then we'll wait until either the signal - // is closed, or the set of jobs exits. - select { - case <-v.quit: - return NewErrf(ErrVBarrierShuttingDown, - "validation barrier shutting down") - - case <-signals.deny: - log.Debugf("Signal deny for %s", jobDesc) - return NewErrf(ErrParentValidationFailed, - "parent validation failed") - - case <-signals.allow: - log.Tracef("Signal allow for %s", jobDesc) - return nil - } -} - -// SignalDependants will allow/deny any jobs that are dependent on this job that -// they can continue execution. If the job doesn't have any dependants, then -// this function sill exit immediately. -func (v *ValidationBarrier) SignalDependants(job interface{}, allow bool) { - v.Lock() - defer v.Unlock() - - switch msg := job.(type) { - - // If we've just finished executing a ChannelAnnouncement, then we'll - // close out the signal, and remove the signal from the map of active - // ones. This will allow/deny any dependent jobs to continue execution. - case *models.ChannelEdgeInfo: - shortID := lnwire.NewShortChanIDFromInt(msg.ChannelID) - finSignals, ok := v.chanAnnFinSignal[shortID] - if ok { - if allow { - close(finSignals.allow) - } else { - close(finSignals.deny) - } - delete(v.chanAnnFinSignal, shortID) - } - case *lnwire.ChannelAnnouncement1: - finSignals, ok := v.chanAnnFinSignal[msg.ShortChannelID] - if ok { - if allow { - close(finSignals.allow) - } else { - close(finSignals.deny) - } - delete(v.chanAnnFinSignal, msg.ShortChannelID) - } - - delete(v.chanEdgeDependencies, msg.ShortChannelID) - - // For all other job types, we'll delete the tracking entries from the - // map, as if we reach this point, then all dependants have already - // finished executing and we can proceed. - case *models.LightningNode: - delete(v.nodeAnnDependencies, route.Vertex(msg.PubKeyBytes)) - case *lnwire.NodeAnnouncement: - delete(v.nodeAnnDependencies, route.Vertex(msg.NodeID)) - case *lnwire.ChannelUpdate1: - delete(v.chanEdgeDependencies, msg.ShortChannelID) - case *models.ChannelEdgePolicy: - shortID := lnwire.NewShortChanIDFromInt(msg.ChannelID) - delete(v.chanEdgeDependencies, shortID) - - case *lnwire.AnnounceSignatures1: - return - } -} diff --git a/graph/validation_barrier_test.go b/graph/validation_barrier_test.go deleted file mode 100644 index 38fc7a0870..0000000000 --- a/graph/validation_barrier_test.go +++ /dev/null @@ -1,167 +0,0 @@ -package graph_test - -import ( - "encoding/binary" - "testing" - "time" - - "github.com/lightningnetwork/lnd/graph" - "github.com/lightningnetwork/lnd/lnwire" -) - -// TestValidationBarrierSemaphore checks basic properties of the validation -// barrier's semaphore wrt. enqueuing/dequeuing. -func TestValidationBarrierSemaphore(t *testing.T) { - t.Parallel() - - const ( - numTasks = 8 - numPendingTasks = 8 - timeout = 50 * time.Millisecond - ) - - quit := make(chan struct{}) - barrier := graph.NewValidationBarrier(numTasks, quit) - - // Saturate the semaphore with jobs. - for i := 0; i < numTasks; i++ { - barrier.InitJobDependencies(nil) - } - - // Spawn additional tasks that will signal completion when added. - jobAdded := make(chan struct{}) - for i := 0; i < numPendingTasks; i++ { - go func() { - barrier.InitJobDependencies(nil) - jobAdded <- struct{}{} - }() - } - - // Check that no jobs are added while semaphore is full. - select { - case <-time.After(timeout): - // Expected since no slots open. - case <-jobAdded: - t.Fatalf("job should not have been added") - } - - // Complete jobs one at a time and verify that they get added. - for i := 0; i < numPendingTasks; i++ { - barrier.CompleteJob() - - select { - case <-time.After(timeout): - t.Fatalf("timeout waiting for job to be added") - case <-jobAdded: - // Expected since one slot opened up. - } - } -} - -// TestValidationBarrierQuit checks that pending validation tasks will return an -// error from WaitForDependants if the barrier's quit signal is canceled. -func TestValidationBarrierQuit(t *testing.T) { - t.Parallel() - - const ( - numTasks = 8 - timeout = 50 * time.Millisecond - ) - - quit := make(chan struct{}) - barrier := graph.NewValidationBarrier(2*numTasks, quit) - - // Create a set of unique channel announcements that we will prep for - // validation. - anns := make([]*lnwire.ChannelAnnouncement1, 0, numTasks) - for i := 0; i < numTasks; i++ { - anns = append(anns, &lnwire.ChannelAnnouncement1{ - ShortChannelID: lnwire.NewShortChanIDFromInt(uint64(i)), - NodeID1: nodeIDFromInt(uint64(2 * i)), - NodeID2: nodeIDFromInt(uint64(2*i + 1)), - }) - barrier.InitJobDependencies(anns[i]) - } - - // Create a set of channel updates, that must wait until their - // associated channel announcement has been verified. - chanUpds := make([]*lnwire.ChannelUpdate1, 0, numTasks) - for i := 0; i < numTasks; i++ { - chanUpds = append(chanUpds, &lnwire.ChannelUpdate1{ - ShortChannelID: lnwire.NewShortChanIDFromInt(uint64(i)), - }) - barrier.InitJobDependencies(chanUpds[i]) - } - - // Spawn additional tasks that will send the error returned after - // waiting for the announcements to finish. In the background, we will - // iteratively queue the channel updates, which will send back the error - // returned from waiting. - jobErrs := make(chan error) - for i := 0; i < numTasks; i++ { - go func(ii int) { - jobErrs <- barrier.WaitForDependants(chanUpds[ii]) - }(i) - } - - // Check that no jobs are added while semaphore is full. - select { - case <-time.After(timeout): - // Expected since no slots open. - case <-jobErrs: - t.Fatalf("job should not have been signaled") - } - - // Complete the first half of jobs, one at a time, verifying that they - // get signaled. Then, quit the barrier and check that all others exit - // with the correct error. - for i := 0; i < numTasks; i++ { - switch { - // Signal completion for the first half of tasks, but only allow - // dependents to be processed as well for the second quarter. - case i < numTasks/4: - barrier.SignalDependants(anns[i], false) - barrier.CompleteJob() - - case i < numTasks/2: - barrier.SignalDependants(anns[i], true) - barrier.CompleteJob() - - // At midpoint, quit the validation barrier. - case i == numTasks/2: - close(quit) - } - - var err error - select { - case <-time.After(timeout): - t.Fatalf("timeout waiting for job to be signaled") - case err = <-jobErrs: - } - - switch { - // First half should return without failure. - case i < numTasks/4 && !graph.IsError( - err, graph.ErrParentValidationFailed, - ): - t.Fatalf("unexpected failure while waiting: %v", err) - - case i >= numTasks/4 && i < numTasks/2 && err != nil: - t.Fatalf("unexpected failure while waiting: %v", err) - - // Last half should return the shutdown error. - case i >= numTasks/2 && !graph.IsError( - err, graph.ErrVBarrierShuttingDown, - ): - t.Fatalf("expected failure after quitting: want %v, "+ - "got %v", graph.ErrVBarrierShuttingDown, err) - } - } -} - -// nodeIDFromInt creates a node ID by writing a uint64 to the first 8 bytes. -func nodeIDFromInt(i uint64) [33]byte { - var nodeID [33]byte - binary.BigEndian.PutUint64(nodeID[:8], i) - return nodeID -}