From 5ab7582fe518bcf380b2d59656d0e8ad1bd80b75 Mon Sep 17 00:00:00 2001 From: Eugene Siegel Date: Tue, 7 Jan 2025 14:08:41 -0500 Subject: [PATCH] go.mod+htlcswitch+protofsm: update fn package to v2.0.7 --- go.mod | 2 +- go.sum | 4 ++-- htlcswitch/link.go | 41 ++++++++++++++++++++------------------- htlcswitch/link_test.go | 14 ++++++------- htlcswitch/test_utils.go | 2 +- protofsm/state_machine.go | 13 ++++++------- 6 files changed, 38 insertions(+), 38 deletions(-) diff --git a/go.mod b/go.mod index 7680509fd3..adf765cfbd 100644 --- a/go.mod +++ b/go.mod @@ -36,7 +36,7 @@ require ( github.com/lightningnetwork/lightning-onion v1.2.1-0.20240712235311-98bd56499dfb github.com/lightningnetwork/lnd/cert v1.2.2 github.com/lightningnetwork/lnd/clock v1.1.1 - github.com/lightningnetwork/lnd/fn/v2 v2.0.2 + github.com/lightningnetwork/lnd/fn/v2 v2.0.7 github.com/lightningnetwork/lnd/healthcheck v1.2.6 github.com/lightningnetwork/lnd/kvdb v1.4.12 github.com/lightningnetwork/lnd/queue v1.1.1 diff --git a/go.sum b/go.sum index 5ed9cd2046..74a6fcafb7 100644 --- a/go.sum +++ b/go.sum @@ -456,8 +456,8 @@ github.com/lightningnetwork/lnd/cert v1.2.2 h1:71YK6hogeJtxSxw2teq3eGeuy4rHGKcFf github.com/lightningnetwork/lnd/cert v1.2.2/go.mod h1:jQmFn/Ez4zhDgq2hnYSw8r35bqGVxViXhX6Cd7HXM6U= github.com/lightningnetwork/lnd/clock v1.1.1 h1:OfR3/zcJd2RhH0RU+zX/77c0ZiOnIMsDIBjgjWdZgA0= github.com/lightningnetwork/lnd/clock v1.1.1/go.mod h1:mGnAhPyjYZQJmebS7aevElXKTFDuO+uNFFfMXK1W8xQ= -github.com/lightningnetwork/lnd/fn/v2 v2.0.2 h1:M7o2lYrh/zCp+lntPB3WP/rWTu5U+4ssyHW+kqNJ0fs= -github.com/lightningnetwork/lnd/fn/v2 v2.0.2/go.mod h1:TOzwrhjB/Azw1V7aa8t21ufcQmdsQOQMDtxVOQWNl8s= +github.com/lightningnetwork/lnd/fn/v2 v2.0.7 h1:2LkgcGk20vXcUJyrlYLWMptnEouOBnCixskMsQW+GxU= +github.com/lightningnetwork/lnd/fn/v2 v2.0.7/go.mod h1:TOzwrhjB/Azw1V7aa8t21ufcQmdsQOQMDtxVOQWNl8s= github.com/lightningnetwork/lnd/healthcheck v1.2.6 h1:1sWhqr93GdkWy4+6U7JxBfcyZIE78MhIHTJZfPx7qqI= github.com/lightningnetwork/lnd/healthcheck v1.2.6/go.mod h1:Mu02um4CWY/zdTOvFje7WJgJcHyX2zq/FG3MhOAiGaQ= github.com/lightningnetwork/lnd/kvdb v1.4.12 h1:Y0WY5Tbjyjn6eCYh068qkWur5oFtioJlfxc8w5SlJeQ= diff --git a/htlcswitch/link.go b/htlcswitch/link.go index 6e67e87def..508a73522e 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -2,6 +2,7 @@ package htlcswitch import ( "bytes" + "context" crand "crypto/rand" "crypto/sha256" "errors" @@ -596,7 +597,7 @@ func (l *channelLink) Start() error { l.updateFeeTimer = time.NewTimer(l.randomFeeUpdateTimeout()) - l.Wg.Add(1) + l.WgAdd(1) go l.htlcManager() return nil @@ -636,8 +637,8 @@ func (l *channelLink) Stop() { l.hodlQueue.Stop() } - close(l.Quit) - l.Wg.Wait() + l.Quit() + l.WgWait() // Now that the htlcManager has completely exited, reset the packet // courier. This allows the mailbox to revaluate any lingering Adds that @@ -662,7 +663,7 @@ func (l *channelLink) Stop() { // WaitForShutdown blocks until the link finishes shutting down, which includes // termination of all dependent goroutines. func (l *channelLink) WaitForShutdown() { - l.Wg.Wait() + l.WgWait() } // EligibleToForward returns a bool indicating if the channel is able to @@ -740,7 +741,7 @@ func (l *channelLink) IsFlushing(linkDirection LinkDirection) bool { func (l *channelLink) OnFlushedOnce(hook func()) { select { case l.flushHooks.newTransients <- hook: - case <-l.Quit: + case <-l.Done(): } } @@ -759,7 +760,7 @@ func (l *channelLink) OnCommitOnce(direction LinkDirection, hook func()) { select { case queue <- hook: - case <-l.Quit: + case <-l.Done(): } } @@ -777,7 +778,7 @@ func (l *channelLink) InitStfu() <-chan fn.Result[lntypes.ChannelParty] { select { case l.quiescenceReqs <- req: - case <-l.Quit: + case <-l.Done(): req.Resolve(fn.Err[lntypes.ChannelParty](ErrLinkShuttingDown)) } @@ -989,7 +990,7 @@ func (l *channelLink) syncChanStates() error { // We've just received a ChanSync message from the remote // party, so we'll process the message in order to determine // if we need to re-transmit any messages to the remote party. - ctx, cancel := l.WithCtxQuitNoTimeout() + ctx, cancel := l.Create(context.Background()) defer cancel() msgsToReSend, openedCircuits, closedCircuits, err = l.channel.ProcessChanSyncMsg(ctx, remoteChanSyncMsg) @@ -1021,7 +1022,7 @@ func (l *channelLink) syncChanStates() error { l.cfg.Peer.SendMessage(false, msg) } - case <-l.Quit: + case <-l.Done(): return ErrLinkShuttingDown } @@ -1111,7 +1112,7 @@ func (l *channelLink) resolveFwdPkg(fwdPkg *channeldb.FwdPkg) error { // // NOTE: This MUST be run as a goroutine. func (l *channelLink) fwdPkgGarbager() { - defer l.Wg.Done() + defer l.WgDone() l.cfg.FwdPkgGCTicker.Resume() defer l.cfg.FwdPkgGCTicker.Stop() @@ -1128,7 +1129,7 @@ func (l *channelLink) fwdPkgGarbager() { err) continue } - case <-l.Quit: + case <-l.Done(): return } } @@ -1251,7 +1252,7 @@ func (l *channelLink) handleChanSyncErr(err error) { func (l *channelLink) htlcManager() { defer func() { l.cfg.BatchTicker.Stop() - l.Wg.Done() + l.WgDone() l.log.Infof("exited") }() @@ -1345,7 +1346,7 @@ func (l *channelLink) htlcManager() { // With our link's in-memory state fully reconstructed, spawn a // goroutine to manage the reclamation of disk space occupied by // completed forwarding packages. - l.Wg.Add(1) + l.WgAdd(1) go l.fwdPkgGarbager() } @@ -1543,7 +1544,7 @@ func (l *channelLink) htlcManager() { } } - case <-l.Quit: + case <-l.Done(): return } } @@ -2418,7 +2419,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { } select { - case <-l.Quit: + case <-l.Done(): return default: } @@ -2488,7 +2489,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { } select { - case <-l.Quit: + case <-l.Done(): return default: } @@ -2782,7 +2783,7 @@ func (l *channelLink) updateCommitTx() error { return nil } - ctx, done := l.WithCtxQuitNoTimeout() + ctx, done := l.Create(context.Background()) defer done() newCommit, err := l.channel.SignNextCommitment(ctx) @@ -2822,7 +2823,7 @@ func (l *channelLink) updateCommitTx() error { } select { - case <-l.Quit: + case <-l.Done(): return ErrLinkShuttingDown default: } @@ -3529,7 +3530,7 @@ func (l *channelLink) handleSwitchPacket(pkt *htlcPacket) error { // NOTE: Part of the ChannelLink interface. func (l *channelLink) HandleChannelUpdate(message lnwire.Message) { select { - case <-l.Quit: + case <-l.Done(): // Return early if the link is already in the process of // quitting. It doesn't make sense to hand the message to the // mailbox here. @@ -4290,7 +4291,7 @@ func (l *channelLink) forwardBatch(replay bool, packets ...*htlcPacket) { filteredPkts = append(filteredPkts, pkt) } - err := l.cfg.ForwardPackets(l.Quit, replay, filteredPkts...) + err := l.cfg.ForwardPackets(l.Done(), replay, filteredPkts...) if err != nil { log.Errorf("Unhandled error while reforwarding htlc "+ "settle/fail over htlcswitch: %v", err) diff --git a/htlcswitch/link_test.go b/htlcswitch/link_test.go index 1747105597..35f39c0267 100644 --- a/htlcswitch/link_test.go +++ b/htlcswitch/link_test.go @@ -2257,7 +2257,7 @@ func newSingleLinkTestHarness(t *testing.T, chanAmt, for { select { case <-notifyUpdateChan: - case <-chanLink.Quit: + case <-chanLink.Done(): close(doneChan) return } @@ -2326,7 +2326,7 @@ func handleStateUpdate(link *channelLink, } link.HandleChannelUpdate(remoteRev) - ctx, done := link.WithCtxQuitNoTimeout() + ctx, done := link.Create(context.Background()) defer done() remoteSigs, err := remoteChannel.SignNextCommitment(ctx) @@ -2372,7 +2372,7 @@ func updateState(batchTick chan time.Time, link *channelLink, // Trigger update by ticking the batchTicker. select { case batchTick <- time.Now(): - case <-link.Quit: + case <-link.Done(): return fmt.Errorf("link shutting down") } return handleStateUpdate(link, remoteChannel) @@ -2380,7 +2380,7 @@ func updateState(batchTick chan time.Time, link *channelLink, // The remote is triggering the state update, emulate this by // signing and sending CommitSig to the link. - ctx, done := link.WithCtxQuitNoTimeout() + ctx, done := link.Create(context.Background()) defer done() remoteSigs, err := remoteChannel.SignNextCommitment(ctx) @@ -4946,7 +4946,7 @@ func (h *persistentLinkHarness) restartLink( for { select { case <-notifyUpdateChan: - case <-chanLink.Quit: + case <-chanLink.Done(): close(doneChan) return } @@ -5932,7 +5932,7 @@ func TestChannelLinkFail(t *testing.T) { // Sign a commitment that will include // signature for the HTLC just sent. - quitCtx, done := c.WithCtxQuitNoTimeout() + quitCtx, done := c.Create(context.Background()) defer done() sigs, err := remoteChannel.SignNextCommitment( @@ -5979,7 +5979,7 @@ func TestChannelLinkFail(t *testing.T) { // Sign a commitment that will include // signature for the HTLC just sent. - quitCtx, done := c.WithCtxQuitNoTimeout() + quitCtx, done := c.Create(context.Background()) defer done() sigs, err := remoteChannel.SignNextCommitment( diff --git a/htlcswitch/test_utils.go b/htlcswitch/test_utils.go index 2123465884..943f8cabb5 100644 --- a/htlcswitch/test_utils.go +++ b/htlcswitch/test_utils.go @@ -1190,7 +1190,7 @@ func (h *hopNetwork) createChannelLink(server, peer *mockServer, for { select { case <-notifyUpdateChan: - case <-chanLink.Quit: + case <-chanLink.Done(): close(doneChan) return } diff --git a/protofsm/state_machine.go b/protofsm/state_machine.go index 2cc1219022..4464991633 100644 --- a/protofsm/state_machine.go +++ b/protofsm/state_machine.go @@ -195,12 +195,11 @@ type StateMachineCfg[Event any, Env Environment] struct { // state such as a txid confirmation event. func NewStateMachine[Event any, Env Environment](cfg StateMachineCfg[Event, Env], //nolint:ll ) StateMachine[Event, Env] { - return StateMachine[Event, Env]{ cfg: cfg, events: make(chan Event, 1), stateQuery: make(chan stateQuery[Event, Env]), - wg: *fn.NewGoroutineManager(context.Background()), + wg: *fn.NewGoroutineManager(), newStateEvents: fn.NewEventDistributor[State[Event, Env]](), quit: make(chan struct{}), } @@ -210,7 +209,7 @@ func NewStateMachine[Event any, Env Environment](cfg StateMachineCfg[Event, Env] // the state machine to completion. func (s *StateMachine[Event, Env]) Start() { s.startOnce.Do(func() { - _ = s.wg.Go(func(ctx context.Context) { + _ = s.wg.Go(context.Background(), func(ctx context.Context) { s.driveMachine() }) }) @@ -355,7 +354,7 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent( // If a post-send event was specified, then we'll funnel // that back into the main state machine now as well. return fn.MapOptionZ(daemonEvent.PostSendEvent, func(event Event) error { //nolint:ll - launched := s.wg.Go(func(ctx context.Context) { + launched := s.wg.Go(context.Background(), func(ctx context.Context) { log.Debugf("FSM(%v): sending "+ "post-send event: %v", s.cfg.Env.Name(), @@ -382,7 +381,7 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent( // Otherwise, this has a SendWhen predicate, so we'll need // launch a goroutine to poll the SendWhen, then send only once // the predicate is true. - launched := s.wg.Go(func(ctx context.Context) { + launched := s.wg.Go(context.Background(), func(ctx context.Context) { predicateTicker := time.NewTicker( s.cfg.CustomPollInterval.UnwrapOr(pollInterval), ) @@ -456,7 +455,7 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent( return fmt.Errorf("unable to register spend: %w", err) } - launched := s.wg.Go(func(ctx context.Context) { + launched := s.wg.Go(context.Background(), func(ctx context.Context) { for { select { case spend, ok := <-spendEvent.Spend: @@ -502,7 +501,7 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent( return fmt.Errorf("unable to register conf: %w", err) } - launched := s.wg.Go(func(ctx context.Context) { + launched := s.wg.Go(context.Background(), func(ctx context.Context) { for { select { case <-confEvent.Confirmed: