Skip to content

Commit

Permalink
go.mod+htlcswitch+protofsm: update fn package to v2.0.7
Browse files Browse the repository at this point in the history
  • Loading branch information
Crypt-iQ committed Jan 7, 2025
1 parent a388c1f commit 5ab7582
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 38 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ require (
github.com/lightningnetwork/lightning-onion v1.2.1-0.20240712235311-98bd56499dfb
github.com/lightningnetwork/lnd/cert v1.2.2
github.com/lightningnetwork/lnd/clock v1.1.1
github.com/lightningnetwork/lnd/fn/v2 v2.0.2
github.com/lightningnetwork/lnd/fn/v2 v2.0.7
github.com/lightningnetwork/lnd/healthcheck v1.2.6
github.com/lightningnetwork/lnd/kvdb v1.4.12
github.com/lightningnetwork/lnd/queue v1.1.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -456,8 +456,8 @@ github.com/lightningnetwork/lnd/cert v1.2.2 h1:71YK6hogeJtxSxw2teq3eGeuy4rHGKcFf
github.com/lightningnetwork/lnd/cert v1.2.2/go.mod h1:jQmFn/Ez4zhDgq2hnYSw8r35bqGVxViXhX6Cd7HXM6U=
github.com/lightningnetwork/lnd/clock v1.1.1 h1:OfR3/zcJd2RhH0RU+zX/77c0ZiOnIMsDIBjgjWdZgA0=
github.com/lightningnetwork/lnd/clock v1.1.1/go.mod h1:mGnAhPyjYZQJmebS7aevElXKTFDuO+uNFFfMXK1W8xQ=
github.com/lightningnetwork/lnd/fn/v2 v2.0.2 h1:M7o2lYrh/zCp+lntPB3WP/rWTu5U+4ssyHW+kqNJ0fs=
github.com/lightningnetwork/lnd/fn/v2 v2.0.2/go.mod h1:TOzwrhjB/Azw1V7aa8t21ufcQmdsQOQMDtxVOQWNl8s=
github.com/lightningnetwork/lnd/fn/v2 v2.0.7 h1:2LkgcGk20vXcUJyrlYLWMptnEouOBnCixskMsQW+GxU=
github.com/lightningnetwork/lnd/fn/v2 v2.0.7/go.mod h1:TOzwrhjB/Azw1V7aa8t21ufcQmdsQOQMDtxVOQWNl8s=
github.com/lightningnetwork/lnd/healthcheck v1.2.6 h1:1sWhqr93GdkWy4+6U7JxBfcyZIE78MhIHTJZfPx7qqI=
github.com/lightningnetwork/lnd/healthcheck v1.2.6/go.mod h1:Mu02um4CWY/zdTOvFje7WJgJcHyX2zq/FG3MhOAiGaQ=
github.com/lightningnetwork/lnd/kvdb v1.4.12 h1:Y0WY5Tbjyjn6eCYh068qkWur5oFtioJlfxc8w5SlJeQ=
Expand Down
41 changes: 21 additions & 20 deletions htlcswitch/link.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package htlcswitch

import (
"bytes"
"context"
crand "crypto/rand"
"crypto/sha256"
"errors"
Expand Down Expand Up @@ -596,7 +597,7 @@ func (l *channelLink) Start() error {

l.updateFeeTimer = time.NewTimer(l.randomFeeUpdateTimeout())

l.Wg.Add(1)
l.WgAdd(1)
go l.htlcManager()

return nil
Expand Down Expand Up @@ -636,8 +637,8 @@ func (l *channelLink) Stop() {
l.hodlQueue.Stop()
}

close(l.Quit)
l.Wg.Wait()
l.Quit()
l.WgWait()

// Now that the htlcManager has completely exited, reset the packet
// courier. This allows the mailbox to revaluate any lingering Adds that
Expand All @@ -662,7 +663,7 @@ func (l *channelLink) Stop() {
// WaitForShutdown blocks until the link finishes shutting down, which includes
// termination of all dependent goroutines.
func (l *channelLink) WaitForShutdown() {
l.Wg.Wait()
l.WgWait()
}

// EligibleToForward returns a bool indicating if the channel is able to
Expand Down Expand Up @@ -740,7 +741,7 @@ func (l *channelLink) IsFlushing(linkDirection LinkDirection) bool {
func (l *channelLink) OnFlushedOnce(hook func()) {
select {
case l.flushHooks.newTransients <- hook:
case <-l.Quit:
case <-l.Done():
}
}

Expand All @@ -759,7 +760,7 @@ func (l *channelLink) OnCommitOnce(direction LinkDirection, hook func()) {

select {
case queue <- hook:
case <-l.Quit:
case <-l.Done():
}
}

Expand All @@ -777,7 +778,7 @@ func (l *channelLink) InitStfu() <-chan fn.Result[lntypes.ChannelParty] {

select {
case l.quiescenceReqs <- req:
case <-l.Quit:
case <-l.Done():
req.Resolve(fn.Err[lntypes.ChannelParty](ErrLinkShuttingDown))
}

Expand Down Expand Up @@ -989,7 +990,7 @@ func (l *channelLink) syncChanStates() error {
// We've just received a ChanSync message from the remote
// party, so we'll process the message in order to determine
// if we need to re-transmit any messages to the remote party.
ctx, cancel := l.WithCtxQuitNoTimeout()
ctx, cancel := l.Create(context.Background())
defer cancel()
msgsToReSend, openedCircuits, closedCircuits, err =
l.channel.ProcessChanSyncMsg(ctx, remoteChanSyncMsg)
Expand Down Expand Up @@ -1021,7 +1022,7 @@ func (l *channelLink) syncChanStates() error {
l.cfg.Peer.SendMessage(false, msg)
}

case <-l.Quit:
case <-l.Done():
return ErrLinkShuttingDown
}

Expand Down Expand Up @@ -1111,7 +1112,7 @@ func (l *channelLink) resolveFwdPkg(fwdPkg *channeldb.FwdPkg) error {
//
// NOTE: This MUST be run as a goroutine.
func (l *channelLink) fwdPkgGarbager() {
defer l.Wg.Done()
defer l.WgDone()

l.cfg.FwdPkgGCTicker.Resume()
defer l.cfg.FwdPkgGCTicker.Stop()
Expand All @@ -1128,7 +1129,7 @@ func (l *channelLink) fwdPkgGarbager() {
err)
continue
}
case <-l.Quit:
case <-l.Done():
return
}
}
Expand Down Expand Up @@ -1251,7 +1252,7 @@ func (l *channelLink) handleChanSyncErr(err error) {
func (l *channelLink) htlcManager() {
defer func() {
l.cfg.BatchTicker.Stop()
l.Wg.Done()
l.WgDone()
l.log.Infof("exited")
}()

Expand Down Expand Up @@ -1345,7 +1346,7 @@ func (l *channelLink) htlcManager() {
// With our link's in-memory state fully reconstructed, spawn a
// goroutine to manage the reclamation of disk space occupied by
// completed forwarding packages.
l.Wg.Add(1)
l.WgAdd(1)
go l.fwdPkgGarbager()
}

Expand Down Expand Up @@ -1543,7 +1544,7 @@ func (l *channelLink) htlcManager() {
}
}

case <-l.Quit:
case <-l.Done():
return
}
}
Expand Down Expand Up @@ -2418,7 +2419,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
}

select {
case <-l.Quit:
case <-l.Done():
return
default:
}
Expand Down Expand Up @@ -2488,7 +2489,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
}

select {
case <-l.Quit:
case <-l.Done():
return
default:
}
Expand Down Expand Up @@ -2782,7 +2783,7 @@ func (l *channelLink) updateCommitTx() error {
return nil
}

ctx, done := l.WithCtxQuitNoTimeout()
ctx, done := l.Create(context.Background())
defer done()

newCommit, err := l.channel.SignNextCommitment(ctx)
Expand Down Expand Up @@ -2822,7 +2823,7 @@ func (l *channelLink) updateCommitTx() error {
}

select {
case <-l.Quit:
case <-l.Done():
return ErrLinkShuttingDown
default:
}
Expand Down Expand Up @@ -3529,7 +3530,7 @@ func (l *channelLink) handleSwitchPacket(pkt *htlcPacket) error {
// NOTE: Part of the ChannelLink interface.
func (l *channelLink) HandleChannelUpdate(message lnwire.Message) {
select {
case <-l.Quit:
case <-l.Done():
// Return early if the link is already in the process of
// quitting. It doesn't make sense to hand the message to the
// mailbox here.
Expand Down Expand Up @@ -4290,7 +4291,7 @@ func (l *channelLink) forwardBatch(replay bool, packets ...*htlcPacket) {
filteredPkts = append(filteredPkts, pkt)
}

err := l.cfg.ForwardPackets(l.Quit, replay, filteredPkts...)
err := l.cfg.ForwardPackets(l.Done(), replay, filteredPkts...)
if err != nil {
log.Errorf("Unhandled error while reforwarding htlc "+
"settle/fail over htlcswitch: %v", err)
Expand Down
14 changes: 7 additions & 7 deletions htlcswitch/link_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2257,7 +2257,7 @@ func newSingleLinkTestHarness(t *testing.T, chanAmt,
for {
select {
case <-notifyUpdateChan:
case <-chanLink.Quit:
case <-chanLink.Done():
close(doneChan)
return
}
Expand Down Expand Up @@ -2326,7 +2326,7 @@ func handleStateUpdate(link *channelLink,
}
link.HandleChannelUpdate(remoteRev)

ctx, done := link.WithCtxQuitNoTimeout()
ctx, done := link.Create(context.Background())
defer done()

remoteSigs, err := remoteChannel.SignNextCommitment(ctx)
Expand Down Expand Up @@ -2372,15 +2372,15 @@ func updateState(batchTick chan time.Time, link *channelLink,
// Trigger update by ticking the batchTicker.
select {
case batchTick <- time.Now():
case <-link.Quit:
case <-link.Done():
return fmt.Errorf("link shutting down")
}
return handleStateUpdate(link, remoteChannel)
}

// The remote is triggering the state update, emulate this by
// signing and sending CommitSig to the link.
ctx, done := link.WithCtxQuitNoTimeout()
ctx, done := link.Create(context.Background())
defer done()

remoteSigs, err := remoteChannel.SignNextCommitment(ctx)
Expand Down Expand Up @@ -4946,7 +4946,7 @@ func (h *persistentLinkHarness) restartLink(
for {
select {
case <-notifyUpdateChan:
case <-chanLink.Quit:
case <-chanLink.Done():
close(doneChan)
return
}
Expand Down Expand Up @@ -5932,7 +5932,7 @@ func TestChannelLinkFail(t *testing.T) {

// Sign a commitment that will include
// signature for the HTLC just sent.
quitCtx, done := c.WithCtxQuitNoTimeout()
quitCtx, done := c.Create(context.Background())
defer done()

sigs, err := remoteChannel.SignNextCommitment(
Expand Down Expand Up @@ -5979,7 +5979,7 @@ func TestChannelLinkFail(t *testing.T) {

// Sign a commitment that will include
// signature for the HTLC just sent.
quitCtx, done := c.WithCtxQuitNoTimeout()
quitCtx, done := c.Create(context.Background())
defer done()

sigs, err := remoteChannel.SignNextCommitment(
Expand Down
2 changes: 1 addition & 1 deletion htlcswitch/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -1190,7 +1190,7 @@ func (h *hopNetwork) createChannelLink(server, peer *mockServer,
for {
select {
case <-notifyUpdateChan:
case <-chanLink.Quit:
case <-chanLink.Done():
close(doneChan)
return
}
Expand Down
13 changes: 6 additions & 7 deletions protofsm/state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,12 +195,11 @@ type StateMachineCfg[Event any, Env Environment] struct {
// state such as a txid confirmation event.
func NewStateMachine[Event any, Env Environment](cfg StateMachineCfg[Event, Env], //nolint:ll
) StateMachine[Event, Env] {

return StateMachine[Event, Env]{
cfg: cfg,
events: make(chan Event, 1),
stateQuery: make(chan stateQuery[Event, Env]),
wg: *fn.NewGoroutineManager(context.Background()),
wg: *fn.NewGoroutineManager(),
newStateEvents: fn.NewEventDistributor[State[Event, Env]](),
quit: make(chan struct{}),
}
Expand All @@ -210,7 +209,7 @@ func NewStateMachine[Event any, Env Environment](cfg StateMachineCfg[Event, Env]
// the state machine to completion.
func (s *StateMachine[Event, Env]) Start() {
s.startOnce.Do(func() {
_ = s.wg.Go(func(ctx context.Context) {
_ = s.wg.Go(context.Background(), func(ctx context.Context) {
s.driveMachine()
})
})
Expand Down Expand Up @@ -355,7 +354,7 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(
// If a post-send event was specified, then we'll funnel
// that back into the main state machine now as well.
return fn.MapOptionZ(daemonEvent.PostSendEvent, func(event Event) error { //nolint:ll
launched := s.wg.Go(func(ctx context.Context) {
launched := s.wg.Go(context.Background(), func(ctx context.Context) {

Check failure on line 357 in protofsm/state_machine.go

View workflow job for this annotation

GitHub Actions / lint code

the line is 101 characters long, which exceeds the maximum of 80 characters. (ll)
log.Debugf("FSM(%v): sending "+
"post-send event: %v",
s.cfg.Env.Name(),
Expand All @@ -382,7 +381,7 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(
// Otherwise, this has a SendWhen predicate, so we'll need
// launch a goroutine to poll the SendWhen, then send only once
// the predicate is true.
launched := s.wg.Go(func(ctx context.Context) {
launched := s.wg.Go(context.Background(), func(ctx context.Context) {

Check failure on line 384 in protofsm/state_machine.go

View workflow job for this annotation

GitHub Actions / lint code

the line is 85 characters long, which exceeds the maximum of 80 characters. (ll)
predicateTicker := time.NewTicker(
s.cfg.CustomPollInterval.UnwrapOr(pollInterval),
)
Expand Down Expand Up @@ -456,7 +455,7 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(
return fmt.Errorf("unable to register spend: %w", err)
}

launched := s.wg.Go(func(ctx context.Context) {
launched := s.wg.Go(context.Background(), func(ctx context.Context) {

Check failure on line 458 in protofsm/state_machine.go

View workflow job for this annotation

GitHub Actions / lint code

the line is 85 characters long, which exceeds the maximum of 80 characters. (ll)
for {
select {
case spend, ok := <-spendEvent.Spend:
Expand Down Expand Up @@ -502,7 +501,7 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(
return fmt.Errorf("unable to register conf: %w", err)
}

launched := s.wg.Go(func(ctx context.Context) {
launched := s.wg.Go(context.Background(), func(ctx context.Context) {

Check failure on line 504 in protofsm/state_machine.go

View workflow job for this annotation

GitHub Actions / lint code

the line is 85 characters long, which exceeds the maximum of 80 characters. (ll)
for {
select {
case <-confEvent.Confirmed:
Expand Down

0 comments on commit 5ab7582

Please sign in to comment.