From f6e213781157d63f6b889677fd88705e54d51abc Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Mon, 20 Jan 2025 23:33:03 +0800 Subject: [PATCH 1/5] sweep: rename `Failed` to `Fatal` This commit renames `Failed` to `Fatal` as it sounds too close to `PublishFailed`. We also wanna emphasize that inputs in this state won't be retried. --- sweep/sweeper.go | 33 ++++++++++++++++++--------------- sweep/sweeper_test.go | 40 ++++++++++++++++++++-------------------- 2 files changed, 38 insertions(+), 35 deletions(-) diff --git a/sweep/sweeper.go b/sweep/sweeper.go index 88404e52a8..8f74a6da9e 100644 --- a/sweep/sweeper.go +++ b/sweep/sweeper.go @@ -119,9 +119,12 @@ const ( // sweeping transactions confirmed, the remaining two will be excluded. Excluded - // Failed is the state when a pending input has too many failed publish - // atttempts or unknown broadcast error is returned. - Failed + // Fatal is the final state of a pending input. Inputs ending in this + // state won't be retried. This could happen, + // - when a pending input has too many failed publish attempts; + // - the input has been spent by another party; + // - unknown broadcast error is returned. + Fatal ) // String gives a human readable text for the sweep states. @@ -145,8 +148,8 @@ func (s SweepState) String() string { case Excluded: return "Excluded" - case Failed: - return "Failed" + case Fatal: + return "Fatal" default: return "Unknown" @@ -215,7 +218,7 @@ func (p *SweeperInput) terminated() bool { // If the input has reached a final state, that it's either // been swept, or failed, or excluded, we will remove it from // our sweeper. - case Failed, Swept, Excluded: + case Fatal, Swept, Excluded: return true default: @@ -1264,7 +1267,7 @@ func (s *UtxoSweeper) handleNewInput(input *sweepInputMessage) error { ) if err != nil { err := fmt.Errorf("wait for spend: %w", err) - s.markInputFailed(pi, err) + s.markInputFatal(pi, err) return err } @@ -1477,12 +1480,12 @@ func (s *UtxoSweeper) markInputsSwept(tx *wire.MsgTx, isOurTx bool) { } } -// markInputFailed marks the given input as failed and won't be retried. It +// markInputFatal marks the given input as fatal and won't be retried. It // will also notify all the subscribers of this input. -func (s *UtxoSweeper) markInputFailed(pi *SweeperInput, err error) { +func (s *UtxoSweeper) markInputFatal(pi *SweeperInput, err error) { log.Errorf("Failed to sweep input: %v, error: %v", pi, err) - pi.state = Failed + pi.state = Fatal s.signalResult(pi, Result{Err: err}) } @@ -1784,15 +1787,15 @@ func (s *UtxoSweeper) handleBumpEventTxFatal(resp *bumpResp) error { } } - // Mark the inputs as failed. - s.markInputsFailed(resp.set, r.Err) + // Mark the inputs as fatal. + s.markInputsFatal(resp.set, r.Err) return nil } -// markInputsFailed marks all inputs found in the tx as failed. It will also +// markInputsFatal marks all inputs in the input set as failed. It will also // notify all the subscribers of these inputs. -func (s *UtxoSweeper) markInputsFailed(set InputSet, err error) { +func (s *UtxoSweeper) markInputsFatal(set InputSet, err error) { for _, inp := range set.Inputs() { outpoint := inp.OutPoint() @@ -1816,7 +1819,7 @@ func (s *UtxoSweeper) markInputsFailed(set InputSet, err error) { continue } - s.markInputFailed(input, err) + s.markInputFatal(input, err) } } diff --git a/sweep/sweeper_test.go b/sweep/sweeper_test.go index 16a4a46fbe..40b25425db 100644 --- a/sweep/sweeper_test.go +++ b/sweep/sweeper_test.go @@ -215,7 +215,7 @@ func TestMarkInputsPublishFailed(t *testing.T) { // published. // - inputSwept specifies an input that's swept. // - inputExcluded specifies an input that's excluded. - // - inputFailed specifies an input that's failed. + // - inputFatal specifies an input that's fatal. var ( inputInit = createMockInput(t, s, Init) inputPendingPublish = createMockInput(t, s, PendingPublish) @@ -223,13 +223,13 @@ func TestMarkInputsPublishFailed(t *testing.T) { inputPublishFailed = createMockInput(t, s, PublishFailed) inputSwept = createMockInput(t, s, Swept) inputExcluded = createMockInput(t, s, Excluded) - inputFailed = createMockInput(t, s, Failed) + inputFatal = createMockInput(t, s, Fatal) ) // Gather all inputs. set.On("Inputs").Return([]input.Input{ inputInit, inputPendingPublish, inputPublished, - inputPublishFailed, inputSwept, inputExcluded, inputFailed, + inputPublishFailed, inputSwept, inputExcluded, inputFatal, }) // Mark the test inputs. We expect the non-exist input and the @@ -264,7 +264,7 @@ func TestMarkInputsPublishFailed(t *testing.T) { require.Equal(Excluded, s.inputs[inputExcluded.OutPoint()].state) // We expect the failed input to stay unchanged. - require.Equal(Failed, s.inputs[inputFailed.OutPoint()].state) + require.Equal(Fatal, s.inputs[inputFatal.OutPoint()].state) // Assert mocked statements are executed as expected. mockStore.AssertExpectations(t) @@ -437,7 +437,7 @@ func TestUpdateSweeperInputs(t *testing.T) { // These inputs won't hit RequiredLockTime so we won't mock. input4 := &SweeperInput{state: Swept, Input: inp1} input5 := &SweeperInput{state: Excluded, Input: inp1} - input6 := &SweeperInput{state: Failed, Input: inp1} + input6 := &SweeperInput{state: Fatal, Input: inp1} // Mock the input to have a locktime in the future so it will NOT be // returned. @@ -575,7 +575,7 @@ func TestDecideStateAndRBFInfo(t *testing.T) { require.Equal(Published, state) } -// TestMarkInputFailed checks that the input is marked as failed as expected. +// TestMarkInputFatal checks that the input is marked as expected. func TestMarkInputFailed(t *testing.T) { t.Parallel() @@ -596,10 +596,10 @@ func TestMarkInputFailed(t *testing.T) { } // Call the method under test. - s.markInputFailed(pi, errors.New("dummy error")) + s.markInputFatal(pi, errors.New("dummy error")) // Assert the state is updated. - require.Equal(t, Failed, pi.state) + require.Equal(t, Fatal, pi.state) } // TestSweepPendingInputs checks that `sweepPendingInputs` correctly executes @@ -1102,7 +1102,7 @@ func TestMarkInputsFailed(t *testing.T) { // published. // - inputSwept specifies an input that's swept. // - inputExcluded specifies an input that's excluded. - // - inputFailed specifies an input that's failed. + // - inputFatal specifies an input that's fatal. var ( inputInit = createMockInput(t, s, Init) inputPendingPublish = createMockInput(t, s, PendingPublish) @@ -1110,33 +1110,33 @@ func TestMarkInputsFailed(t *testing.T) { inputPublishFailed = createMockInput(t, s, PublishFailed) inputSwept = createMockInput(t, s, Swept) inputExcluded = createMockInput(t, s, Excluded) - inputFailed = createMockInput(t, s, Failed) + inputFatal = createMockInput(t, s, Fatal) ) // Gather all inputs. set.On("Inputs").Return([]input.Input{ inputInit, inputPendingPublish, inputPublished, - inputPublishFailed, inputSwept, inputExcluded, inputFailed, + inputPublishFailed, inputSwept, inputExcluded, inputFatal, }) // Mark the test inputs. We expect the non-exist input and - // inputSwept/inputExcluded/inputFailed to be skipped. - s.markInputsFailed(set, errDummy) + // inputSwept/inputExcluded/inputFatal to be skipped. + s.markInputsFatal(set, errDummy) // We expect unchanged number of pending inputs. require.Len(s.inputs, 7) - // We expect the init input's to be marked as failed. - require.Equal(Failed, s.inputs[inputInit.OutPoint()].state) + // We expect the init input's to be marked as fatal. + require.Equal(Fatal, s.inputs[inputInit.OutPoint()].state) // We expect the pending-publish input to be marked as failed. - require.Equal(Failed, s.inputs[inputPendingPublish.OutPoint()].state) + require.Equal(Fatal, s.inputs[inputPendingPublish.OutPoint()].state) - // We expect the published input to be marked as failed. - require.Equal(Failed, s.inputs[inputPublished.OutPoint()].state) + // We expect the published input to be marked as fatal. + require.Equal(Fatal, s.inputs[inputPublished.OutPoint()].state) // We expect the publish failed input to be markd as failed. - require.Equal(Failed, s.inputs[inputPublishFailed.OutPoint()].state) + require.Equal(Fatal, s.inputs[inputPublishFailed.OutPoint()].state) // We expect the swept input to stay unchanged. require.Equal(Swept, s.inputs[inputSwept.OutPoint()].state) @@ -1145,7 +1145,7 @@ func TestMarkInputsFailed(t *testing.T) { require.Equal(Excluded, s.inputs[inputExcluded.OutPoint()].state) // We expect the failed input to stay unchanged. - require.Equal(Failed, s.inputs[inputFailed.OutPoint()].state) + require.Equal(Fatal, s.inputs[inputFatal.OutPoint()].state) } // TestHandleBumpEventTxFatal checks that `handleBumpEventTxFatal` correctly From 0ac5bfa31be562e3ecd06e7ecf55398f7bd43532 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Tue, 21 Jan 2025 00:12:04 +0800 Subject: [PATCH 2/5] sweep: shorten `storeRecord` method signature This commit shortens the function signature of `storeRecord`, also makes sure we don't call `t.records.Store` directly but always using `storeRecord` instead so it's easier to trace the record creation. --- sweep/fee_bumper.go | 42 ++++++++++--------------- sweep/fee_bumper_test.go | 68 +++++++++++++++++++++++++++++++--------- 2 files changed, 70 insertions(+), 40 deletions(-) diff --git a/sweep/fee_bumper.go b/sweep/fee_bumper.go index 57f9f6e303..3325b0c74a 100644 --- a/sweep/fee_bumper.go +++ b/sweep/fee_bumper.go @@ -440,6 +440,20 @@ func (t *TxPublisher) storeInitialRecord(req *BumpRequest) ( return requestID, record } +// storeRecord stores the given record in the records map. +func (t *TxPublisher) storeRecord(requestID uint64, sweepCtx *sweepTxCtx, + req *BumpRequest, f FeeFunction) { + + // Register the record. + t.records.Store(requestID, &monitorRecord{ + tx: sweepCtx.tx, + req: req, + feeFunction: f, + fee: sweepCtx.fee, + outpointToTxIndex: sweepCtx.outpointToTxIndex, + }) +} + // NOTE: part of the `chainio.Consumer` interface. func (t *TxPublisher) Name() string { return "TxPublisher" @@ -508,10 +522,7 @@ func (t *TxPublisher) createRBFCompliantTx(requestID uint64, req *BumpRequest, switch { case err == nil: // The tx is valid, store it. - t.storeRecord( - requestID, sweepCtx.tx, req, f, sweepCtx.fee, - sweepCtx.outpointToTxIndex, - ) + t.storeRecord(requestID, sweepCtx, req, f) log.Infof("Created initial sweep tx=%v for %v inputs: "+ "feerate=%v, fee=%v, inputs:\n%v", @@ -565,21 +576,6 @@ func (t *TxPublisher) createRBFCompliantTx(requestID uint64, req *BumpRequest, } } -// storeRecord stores the given record in the records map. -func (t *TxPublisher) storeRecord(requestID uint64, tx *wire.MsgTx, - req *BumpRequest, f FeeFunction, fee btcutil.Amount, - outpointToTxIndex map[wire.OutPoint]int) { - - // Register the record. - t.records.Store(requestID, &monitorRecord{ - tx: tx, - req: req, - feeFunction: f, - fee: fee, - outpointToTxIndex: outpointToTxIndex, - }) -} - // createAndCheckTx creates a tx based on the given inputs, change output // script, and the fee rate. In addition, it validates the tx's mempool // acceptance before returning a tx that can be published directly, along with @@ -1195,13 +1191,7 @@ func (t *TxPublisher) createAndPublishTx(requestID uint64, // The tx has been created without any errors, we now register a new // record by overwriting the same requestID. - t.records.Store(requestID, &monitorRecord{ - tx: sweepCtx.tx, - req: r.req, - feeFunction: r.feeFunction, - fee: sweepCtx.fee, - outpointToTxIndex: sweepCtx.outpointToTxIndex, - }) + t.storeRecord(requestID, sweepCtx, r.req, r.feeFunction) // Attempt to broadcast this new tx. result, err := t.broadcast(requestID) diff --git a/sweep/fee_bumper_test.go b/sweep/fee_bumper_test.go index 54c67dbe28..0ce83301a7 100644 --- a/sweep/fee_bumper_test.go +++ b/sweep/fee_bumper_test.go @@ -351,8 +351,15 @@ func TestStoreRecord(t *testing.T) { op: 0, } + // Create a sweepTxCtx. + sweepCtx := &sweepTxCtx{ + tx: tx, + fee: fee, + outpointToTxIndex: utxoIndex, + } + // Call the method under test. - tp.storeRecord(initialCounter, tx, req, feeFunc, fee, utxoIndex) + tp.storeRecord(initialCounter, sweepCtx, req, feeFunc) // Read the saved record and compare. record, ok := tp.records.Load(initialCounter) @@ -698,7 +705,15 @@ func TestTxPublisherBroadcast(t *testing.T) { // Create a testing record and put it in the map. fee := btcutil.Amount(1000) requestID := uint64(1) - tp.storeRecord(requestID, tx, req, m.feeFunc, fee, utxoIndex) + + // Create a sweepTxCtx. + sweepCtx := &sweepTxCtx{ + tx: tx, + fee: fee, + outpointToTxIndex: utxoIndex, + } + + tp.storeRecord(requestID, sweepCtx, req, m.feeFunc) // Quickly check when the requestID cannot be found, an error is // returned. @@ -796,6 +811,13 @@ func TestRemoveResult(t *testing.T) { // Create a test request ID counter. requestCounter := atomic.Uint64{} + // Create a sweepTxCtx. + sweepCtx := &sweepTxCtx{ + tx: tx, + fee: fee, + outpointToTxIndex: utxoIndex, + } + testCases := []struct { name string setupRecord func() uint64 @@ -808,9 +830,7 @@ func TestRemoveResult(t *testing.T) { name: "remove on TxConfirmed", setupRecord: func() uint64 { rid := requestCounter.Add(1) - tp.storeRecord( - rid, tx, req, m.feeFunc, fee, utxoIndex, - ) + tp.storeRecord(rid, sweepCtx, req, m.feeFunc) tp.subscriberChans.Store(rid, nil) return rid @@ -826,9 +846,7 @@ func TestRemoveResult(t *testing.T) { name: "remove on TxFailed", setupRecord: func() uint64 { rid := requestCounter.Add(1) - tp.storeRecord( - rid, tx, req, m.feeFunc, fee, utxoIndex, - ) + tp.storeRecord(rid, sweepCtx, req, m.feeFunc) tp.subscriberChans.Store(rid, nil) return rid @@ -845,9 +863,7 @@ func TestRemoveResult(t *testing.T) { name: "noop when tx is not confirmed or failed", setupRecord: func() uint64 { rid := requestCounter.Add(1) - tp.storeRecord( - rid, tx, req, m.feeFunc, fee, utxoIndex, - ) + tp.storeRecord(rid, sweepCtx, req, m.feeFunc) tp.subscriberChans.Store(rid, nil) return rid @@ -906,7 +922,15 @@ func TestNotifyResult(t *testing.T) { // Create a testing record and put it in the map. fee := btcutil.Amount(1000) requestID := uint64(1) - tp.storeRecord(requestID, tx, req, m.feeFunc, fee, utxoIndex) + + // Create a sweepTxCtx. + sweepCtx := &sweepTxCtx{ + tx: tx, + fee: fee, + outpointToTxIndex: utxoIndex, + } + + tp.storeRecord(requestID, sweepCtx, req, m.feeFunc) // Create a subscription to the event. subscriber := make(chan *BumpResult, 1) @@ -1208,7 +1232,15 @@ func TestHandleTxConfirmed(t *testing.T) { // Create a testing record and put it in the map. fee := btcutil.Amount(1000) requestID := uint64(1) - tp.storeRecord(requestID, tx, req, m.feeFunc, fee, utxoIndex) + + // Create a sweepTxCtx. + sweepCtx := &sweepTxCtx{ + tx: tx, + fee: fee, + outpointToTxIndex: utxoIndex, + } + + tp.storeRecord(requestID, sweepCtx, req, m.feeFunc) record, ok := tp.records.Load(requestID) require.True(t, ok) @@ -1289,7 +1321,15 @@ func TestHandleFeeBumpTx(t *testing.T) { // Create a testing record and put it in the map. fee := btcutil.Amount(1000) requestID := uint64(1) - tp.storeRecord(requestID, tx, req, m.feeFunc, fee, utxoIndex) + + // Create a sweepTxCtx. + sweepCtx := &sweepTxCtx{ + tx: tx, + fee: fee, + outpointToTxIndex: utxoIndex, + } + + tp.storeRecord(requestID, sweepCtx, req, m.feeFunc) // Create a subscription to the event. subscriber := make(chan *BumpResult, 1) From 0ae9f1a72e1037a8be4b5f53f9664c0a4f8a7bf8 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Tue, 21 Jan 2025 01:19:02 +0800 Subject: [PATCH 3/5] sweep: add `requestID` to `monitorRecord` This way we can greatly simplify the method signatures, also paving the upcoming changes where we wanna make it clear when updating the monitorRecord, we only touch a portion of it. --- sweep/fee_bumper.go | 106 ++++++++++++++++++++++----------------- sweep/fee_bumper_test.go | 45 +++++++++++------ 2 files changed, 88 insertions(+), 63 deletions(-) diff --git a/sweep/fee_bumper.go b/sweep/fee_bumper.go index 3325b0c74a..7db98f99e7 100644 --- a/sweep/fee_bumper.go +++ b/sweep/fee_bumper.go @@ -410,34 +410,35 @@ func (t *TxPublisher) Broadcast(req *BumpRequest) <-chan *BumpResult { lnutils.SpewLogClosure(req)) // Store the request. - requestID, record := t.storeInitialRecord(req) + record := t.storeInitialRecord(req) // Create a chan to send the result to the caller. subscriber := make(chan *BumpResult, 1) - t.subscriberChans.Store(requestID, subscriber) + t.subscriberChans.Store(record.requestID, subscriber) // Publish the tx immediately if specified. if req.Immediate { - t.handleInitialBroadcast(record, requestID) + t.handleInitialBroadcast(record) } return subscriber } // storeInitialRecord initializes a monitor record and saves it in the map. -func (t *TxPublisher) storeInitialRecord(req *BumpRequest) ( - uint64, *monitorRecord) { - +func (t *TxPublisher) storeInitialRecord(req *BumpRequest) *monitorRecord { // Increase the request counter. // // NOTE: this is the only place where we increase the counter. requestID := t.requestCounter.Add(1) // Register the record. - record := &monitorRecord{req: req} + record := &monitorRecord{ + requestID: requestID, + req: req, + } t.records.Store(requestID, record) - return requestID, record + return record } // storeRecord stores the given record in the records map. @@ -446,6 +447,7 @@ func (t *TxPublisher) storeRecord(requestID uint64, sweepCtx *sweepTxCtx, // Register the record. t.records.Store(requestID, &monitorRecord{ + requestID: requestID, tx: sweepCtx.tx, req: req, feeFunction: f, @@ -461,16 +463,25 @@ func (t *TxPublisher) Name() string { // initializeTx initializes a fee function and creates an RBF-compliant tx. If // succeeded, the initial tx is stored in the records map. -func (t *TxPublisher) initializeTx(requestID uint64, req *BumpRequest) error { +func (t *TxPublisher) initializeTx(r *monitorRecord) error { // Create a fee bumping algorithm to be used for future RBF. - feeAlgo, err := t.initializeFeeFunction(req) + feeAlgo, err := t.initializeFeeFunction(r.req) if err != nil { return fmt.Errorf("init fee function: %w", err) } + // Attach the newly created fee function. + // + // TODO(yy): current we'd initialize a monitorRecord before creating the + // fee function, while we could instead create the fee function first + // then save it to the record. To make this happen we need to change the + // conf target calculation below since we would be initializing the fee + // function one block before. + r.feeFunction = feeAlgo + // Create the initial tx to be broadcasted. This tx is guaranteed to // comply with the RBF restrictions. - err = t.createRBFCompliantTx(requestID, req, feeAlgo) + err = t.createRBFCompliantTx(r) if err != nil { return fmt.Errorf("create RBF-compliant tx: %w", err) } @@ -511,24 +522,24 @@ func (t *TxPublisher) initializeFeeFunction( // so by creating a tx, validate it using `TestMempoolAccept`, and bump its fee // and redo the process until the tx is valid, or return an error when non-RBF // related errors occur or the budget has been used up. -func (t *TxPublisher) createRBFCompliantTx(requestID uint64, req *BumpRequest, - f FeeFunction) error { +func (t *TxPublisher) createRBFCompliantTx(r *monitorRecord) error { + f := r.feeFunction for { // Create a new tx with the given fee rate and check its // mempool acceptance. - sweepCtx, err := t.createAndCheckTx(req, f) + sweepCtx, err := t.createAndCheckTx(r.req, f) switch { case err == nil: // The tx is valid, store it. - t.storeRecord(requestID, sweepCtx, req, f) + t.storeRecord(r.requestID, sweepCtx, r.req, f) log.Infof("Created initial sweep tx=%v for %v inputs: "+ "feerate=%v, fee=%v, inputs:\n%v", - sweepCtx.tx.TxHash(), len(req.Inputs), + sweepCtx.tx.TxHash(), len(r.req.Inputs), f.FeeRate(), sweepCtx.fee, - inputTypeSummary(req.Inputs)) + inputTypeSummary(r.req.Inputs)) return nil @@ -773,6 +784,9 @@ func (t *TxPublisher) handleResult(result *BumpResult) { // monitorRecord is used to keep track of the tx being monitored by the // publisher internally. type monitorRecord struct { + // requestID is the ID of the request that created this record. + requestID uint64 + // tx is the tx being monitored. tx *wire.MsgTx @@ -787,6 +801,10 @@ type monitorRecord struct { // outpointToTxIndex is a map of outpoint to tx index. outpointToTxIndex map[wire.OutPoint]int + + // spendNotifiers is a map of spend notifiers registered for all the + // inputs. + spendNotifiers map[wire.OutPoint]*chainntnfs.SpendEvent } // Start starts the publisher by subscribing to block epoch updates and kicking @@ -915,35 +933,35 @@ func (t *TxPublisher) processRecords() { t.records.ForEach(visitor) // Handle the initial broadcast. - for requestID, r := range initialRecords { - t.handleInitialBroadcast(r, requestID) + for _, r := range initialRecords { + t.handleInitialBroadcast(r) } // For records that are confirmed, we'll notify the caller about this // result. - for requestID, r := range confirmedRecords { + for _, r := range confirmedRecords { log.Debugf("Tx=%v is confirmed", r.tx.TxHash()) t.wg.Add(1) - go t.handleTxConfirmed(r, requestID) + go t.handleTxConfirmed(r) } // Get the current height to be used in the following goroutines. currentHeight := t.currentHeight.Load() // For records that are not confirmed, we perform a fee bump if needed. - for requestID, r := range feeBumpRecords { + for _, r := range feeBumpRecords { log.Debugf("Attempting to fee bump Tx=%v", r.tx.TxHash()) t.wg.Add(1) - go t.handleFeeBumpTx(requestID, r, currentHeight) + go t.handleFeeBumpTx(r, currentHeight) } // For records that are failed, we'll notify the caller about this // result. - for requestID, r := range failedRecords { + for _, r := range failedRecords { log.Debugf("Tx=%v has inputs been spent by a third party, "+ "failing it now", r.tx.TxHash()) t.wg.Add(1) - go t.handleThirdPartySpent(r, requestID) + go t.handleThirdPartySpent(r) } } @@ -951,7 +969,7 @@ func (t *TxPublisher) processRecords() { // notify the subscriber then remove the record from the maps . // // NOTE: Must be run as a goroutine to avoid blocking on sending the result. -func (t *TxPublisher) handleTxConfirmed(r *monitorRecord, requestID uint64) { +func (t *TxPublisher) handleTxConfirmed(r *monitorRecord) { defer t.wg.Done() // Create a result that will be sent to the resultChan which is @@ -959,7 +977,7 @@ func (t *TxPublisher) handleTxConfirmed(r *monitorRecord, requestID uint64) { result := &BumpResult{ Event: TxConfirmed, Tx: r.tx, - requestID: requestID, + requestID: r.requestID, Fee: r.fee, FeeRate: r.feeFunction.FeeRate(), } @@ -1017,10 +1035,8 @@ func (t *TxPublisher) handleInitialTxError(requestID uint64, err error) { // 1. init a fee function based on the given strategy. // 2. create an RBF-compliant tx and monitor it for confirmation. // 3. notify the initial broadcast result back to the caller. -func (t *TxPublisher) handleInitialBroadcast(r *monitorRecord, - requestID uint64) { - - log.Debugf("Initial broadcast for requestID=%v", requestID) +func (t *TxPublisher) handleInitialBroadcast(r *monitorRecord) { + log.Debugf("Initial broadcast for requestID=%v", r.requestID) var ( result *BumpResult @@ -1031,18 +1047,18 @@ func (t *TxPublisher) handleInitialBroadcast(r *monitorRecord, // RBF rules. // // Create the initial tx to be broadcasted. - err = t.initializeTx(requestID, r.req) + err = t.initializeTx(r) if err != nil { log.Errorf("Initial broadcast failed: %v", err) // We now handle the initialization error and exit. - t.handleInitialTxError(requestID, err) + t.handleInitialTxError(r.requestID, err) return } // Successfully created the first tx, now broadcast it. - result, err = t.broadcast(requestID) + result, err = t.broadcast(r.requestID) if err != nil { // The broadcast failed, which can only happen if the tx record // cannot be found or the aux sweeper returns an error. In @@ -1051,7 +1067,7 @@ func (t *TxPublisher) handleInitialBroadcast(r *monitorRecord, result = &BumpResult{ Event: TxFailed, Err: err, - requestID: requestID, + requestID: r.requestID, } } @@ -1062,9 +1078,7 @@ func (t *TxPublisher) handleInitialBroadcast(r *monitorRecord, // attempt to bump the fee of the tx. // // NOTE: Must be run as a goroutine to avoid blocking on sending the result. -func (t *TxPublisher) handleFeeBumpTx(requestID uint64, r *monitorRecord, - currentHeight int32) { - +func (t *TxPublisher) handleFeeBumpTx(r *monitorRecord, currentHeight int32) { defer t.wg.Done() oldTxid := r.tx.TxHash() @@ -1095,7 +1109,7 @@ func (t *TxPublisher) handleFeeBumpTx(requestID uint64, r *monitorRecord, // The fee function now has a new fee rate, we will use it to bump the // fee of the tx. - resultOpt := t.createAndPublishTx(requestID, r) + resultOpt := t.createAndPublishTx(r) // If there's a result, we will notify the caller about the result. resultOpt.WhenSome(func(result BumpResult) { @@ -1109,9 +1123,7 @@ func (t *TxPublisher) handleFeeBumpTx(requestID uint64, r *monitorRecord, // and send a TxFailed event to the subscriber. // // NOTE: Must be run as a goroutine to avoid blocking on sending the result. -func (t *TxPublisher) handleThirdPartySpent(r *monitorRecord, - requestID uint64) { - +func (t *TxPublisher) handleThirdPartySpent(r *monitorRecord) { defer t.wg.Done() // Create a result that will be sent to the resultChan which is @@ -1123,7 +1135,7 @@ func (t *TxPublisher) handleThirdPartySpent(r *monitorRecord, result := &BumpResult{ Event: TxFailed, Tx: r.tx, - requestID: requestID, + requestID: r.requestID, Err: ErrThirdPartySpent, } @@ -1134,7 +1146,7 @@ func (t *TxPublisher) handleThirdPartySpent(r *monitorRecord, // createAndPublishTx creates a new tx with a higher fee rate and publishes it // to the network. It will update the record with the new tx and fee rate if // successfully created, and return the result when published successfully. -func (t *TxPublisher) createAndPublishTx(requestID uint64, +func (t *TxPublisher) createAndPublishTx( r *monitorRecord) fn.Option[BumpResult] { // Fetch the old tx. @@ -1185,16 +1197,16 @@ func (t *TxPublisher) createAndPublishTx(requestID uint64, Event: TxFailed, Tx: oldTx, Err: err, - requestID: requestID, + requestID: r.requestID, }) } // The tx has been created without any errors, we now register a new // record by overwriting the same requestID. - t.storeRecord(requestID, sweepCtx, r.req, r.feeFunction) + t.storeRecord(r.requestID, sweepCtx, r.req, r.feeFunction) // Attempt to broadcast this new tx. - result, err := t.broadcast(requestID) + result, err := t.broadcast(r.requestID) if err != nil { log.Infof("Failed to broadcast replacement tx %v: %v", sweepCtx.tx.TxHash(), err) diff --git a/sweep/fee_bumper_test.go b/sweep/fee_bumper_test.go index 0ce83301a7..527e259538 100644 --- a/sweep/fee_bumper_test.go +++ b/sweep/fee_bumper_test.go @@ -664,11 +664,19 @@ func TestCreateRBFCompliantTx(t *testing.T) { tc := tc rid := requestCounter.Add(1) + + // Create a test record. + record := &monitorRecord{ + requestID: rid, + req: req, + feeFunction: m.feeFunc, + } + t.Run(tc.name, func(t *testing.T) { tc.setupMock() // Call the method under test. - err := tp.createRBFCompliantTx(rid, req, m.feeFunc) + err := tp.createRBFCompliantTx(record) // Check the result is as expected. require.ErrorIs(t, err, tc.expectedErr) @@ -1082,6 +1090,7 @@ func TestCreateAnPublishFail(t *testing.T) { // Overwrite the budget to make it smaller than the fee. req.Budget = 100 record := &monitorRecord{ + requestID: requestID, req: req, feeFunction: m.feeFunc, tx: &wire.MsgTx{}, @@ -1097,7 +1106,7 @@ func TestCreateAnPublishFail(t *testing.T) { mock.Anything).Return(script, nil) // Call the createAndPublish method. - resultOpt := tp.createAndPublishTx(requestID, record) + resultOpt := tp.createAndPublishTx(record) result := resultOpt.UnwrapOrFail(t) // We expect the result to be TxFailed and the error is set in the @@ -1116,7 +1125,7 @@ func TestCreateAnPublishFail(t *testing.T) { mock.Anything).Return(lnwallet.ErrMempoolFee).Once() // Call the createAndPublish method and expect a none option. - resultOpt = tp.createAndPublishTx(requestID, record) + resultOpt = tp.createAndPublishTx(record) require.True(t, resultOpt.IsNone()) // Mock the testmempoolaccept to return a fee related error that should @@ -1125,7 +1134,7 @@ func TestCreateAnPublishFail(t *testing.T) { mock.Anything).Return(chain.ErrInsufficientFee).Once() // Call the createAndPublish method and expect a none option. - resultOpt = tp.createAndPublishTx(requestID, record) + resultOpt = tp.createAndPublishTx(record) require.True(t, resultOpt.IsNone()) } @@ -1147,6 +1156,7 @@ func TestCreateAnPublishSuccess(t *testing.T) { // Create a testing monitor record. req := createTestBumpRequest() record := &monitorRecord{ + requestID: requestID, req: req, feeFunction: m.feeFunc, tx: &wire.MsgTx{}, @@ -1169,7 +1179,7 @@ func TestCreateAnPublishSuccess(t *testing.T) { mock.Anything, mock.Anything).Return(errDummy).Once() // Call the createAndPublish method and expect a failure result. - resultOpt := tp.createAndPublishTx(requestID, record) + resultOpt := tp.createAndPublishTx(record) result := resultOpt.UnwrapOrFail(t) // We expect the result to be TxFailed and the error is set. @@ -1190,7 +1200,7 @@ func TestCreateAnPublishSuccess(t *testing.T) { mock.Anything, mock.Anything).Return(nil).Once() // Call the createAndPublish method and expect a success result. - resultOpt = tp.createAndPublishTx(requestID, record) + resultOpt = tp.createAndPublishTx(record) result = resultOpt.UnwrapOrFail(t) require.True(t, resultOpt.IsSome()) @@ -1258,7 +1268,7 @@ func TestHandleTxConfirmed(t *testing.T) { tp.wg.Add(1) done := make(chan struct{}) go func() { - tp.handleTxConfirmed(record, requestID) + tp.handleTxConfirmed(record) close(done) }() @@ -1304,7 +1314,11 @@ func TestHandleFeeBumpTx(t *testing.T) { // Create a testing monitor record. req := createTestBumpRequest() + + // Create a testing record and put it in the map. + requestID := uint64(1) record := &monitorRecord{ + requestID: requestID, req: req, feeFunction: m.feeFunc, tx: tx, @@ -1317,10 +1331,7 @@ func TestHandleFeeBumpTx(t *testing.T) { utxoIndex := map[wire.OutPoint]int{ op: 0, } - - // Create a testing record and put it in the map. fee := btcutil.Amount(1000) - requestID := uint64(1) // Create a sweepTxCtx. sweepCtx := &sweepTxCtx{ @@ -1345,7 +1356,7 @@ func TestHandleFeeBumpTx(t *testing.T) { // Call the method and expect no result received. tp.wg.Add(1) - go tp.handleFeeBumpTx(requestID, record, testHeight) + go tp.handleFeeBumpTx(record, testHeight) // Check there's no result sent back. select { @@ -1359,7 +1370,7 @@ func TestHandleFeeBumpTx(t *testing.T) { // Call the method and expect no result received. tp.wg.Add(1) - go tp.handleFeeBumpTx(requestID, record, testHeight) + go tp.handleFeeBumpTx(record, testHeight) // Check there's no result sent back. select { @@ -1391,7 +1402,7 @@ func TestHandleFeeBumpTx(t *testing.T) { // // NOTE: must be called in a goroutine in case it blocks. tp.wg.Add(1) - go tp.handleFeeBumpTx(requestID, record, testHeight) + go tp.handleFeeBumpTx(record, testHeight) select { case <-time.After(time.Second): @@ -1437,6 +1448,7 @@ func TestProcessRecords(t *testing.T) { // Create a monitor record that's confirmed. recordConfirmed := &monitorRecord{ + requestID: requestID1, req: req1, feeFunction: m.feeFunc, tx: tx1, @@ -1450,6 +1462,7 @@ func TestProcessRecords(t *testing.T) { // Create a monitor record that's not confirmed. We know it's not // confirmed because the num of confirms is zero. recordFeeBump := &monitorRecord{ + requestID: requestID2, req: req2, feeFunction: m.feeFunc, tx: tx2, @@ -1588,7 +1601,7 @@ func TestHandleInitialBroadcastSuccess(t *testing.T) { // Call the method under test. tp.wg.Add(1) - tp.handleInitialBroadcast(rec, rid) + tp.handleInitialBroadcast(rec) // Check the result is sent back. select { @@ -1659,7 +1672,7 @@ func TestHandleInitialBroadcastFail(t *testing.T) { // Call the method under test and expect an error returned. tp.wg.Add(1) - tp.handleInitialBroadcast(rec, rid) + tp.handleInitialBroadcast(rec) // Check the result is sent back. select { @@ -1692,7 +1705,7 @@ func TestHandleInitialBroadcastFail(t *testing.T) { // Call the method under test. tp.wg.Add(1) - tp.handleInitialBroadcast(rec, rid) + tp.handleInitialBroadcast(rec) // Check the result is sent back. select { From ed4c7f772bd84c3a58cc35c3fb929bdf27643c9a Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Tue, 21 Jan 2025 01:50:58 +0800 Subject: [PATCH 4/5] sweep: refactor `storeRecord` to `updateRecord` To make it clear we are only updating fields, which will be handy for the following commit where we start tracking for spending notifications. --- sweep/fee_bumper.go | 66 +++++++++++++---------------- sweep/fee_bumper_test.go | 89 ++++++++++++++++++++++++++++++++-------- 2 files changed, 100 insertions(+), 55 deletions(-) diff --git a/sweep/fee_bumper.go b/sweep/fee_bumper.go index 7db98f99e7..620ad7554b 100644 --- a/sweep/fee_bumper.go +++ b/sweep/fee_bumper.go @@ -441,19 +441,19 @@ func (t *TxPublisher) storeInitialRecord(req *BumpRequest) *monitorRecord { return record } -// storeRecord stores the given record in the records map. -func (t *TxPublisher) storeRecord(requestID uint64, sweepCtx *sweepTxCtx, - req *BumpRequest, f FeeFunction) { +// updateRecord updates the given record's tx and fee, and saves it in the +// records map. +func (t *TxPublisher) updateRecord(r *monitorRecord, + sweepCtx *sweepTxCtx) *monitorRecord { + + r.tx = sweepCtx.tx + r.fee = sweepCtx.fee + r.outpointToTxIndex = sweepCtx.outpointToTxIndex // Register the record. - t.records.Store(requestID, &monitorRecord{ - requestID: requestID, - tx: sweepCtx.tx, - req: req, - feeFunction: f, - fee: sweepCtx.fee, - outpointToTxIndex: sweepCtx.outpointToTxIndex, - }) + t.records.Store(r.requestID, r) + + return r } // NOTE: part of the `chainio.Consumer` interface. @@ -463,11 +463,11 @@ func (t *TxPublisher) Name() string { // initializeTx initializes a fee function and creates an RBF-compliant tx. If // succeeded, the initial tx is stored in the records map. -func (t *TxPublisher) initializeTx(r *monitorRecord) error { +func (t *TxPublisher) initializeTx(r *monitorRecord) (*monitorRecord, error) { // Create a fee bumping algorithm to be used for future RBF. feeAlgo, err := t.initializeFeeFunction(r.req) if err != nil { - return fmt.Errorf("init fee function: %w", err) + return nil, fmt.Errorf("init fee function: %w", err) } // Attach the newly created fee function. @@ -481,12 +481,12 @@ func (t *TxPublisher) initializeTx(r *monitorRecord) error { // Create the initial tx to be broadcasted. This tx is guaranteed to // comply with the RBF restrictions. - err = t.createRBFCompliantTx(r) + record, err := t.createRBFCompliantTx(r) if err != nil { - return fmt.Errorf("create RBF-compliant tx: %w", err) + return nil, fmt.Errorf("create RBF-compliant tx: %w", err) } - return nil + return record, nil } // initializeFeeFunction initializes a fee function to be used for this request @@ -522,7 +522,9 @@ func (t *TxPublisher) initializeFeeFunction( // so by creating a tx, validate it using `TestMempoolAccept`, and bump its fee // and redo the process until the tx is valid, or return an error when non-RBF // related errors occur or the budget has been used up. -func (t *TxPublisher) createRBFCompliantTx(r *monitorRecord) error { +func (t *TxPublisher) createRBFCompliantTx( + r *monitorRecord) (*monitorRecord, error) { + f := r.feeFunction for { @@ -533,7 +535,7 @@ func (t *TxPublisher) createRBFCompliantTx(r *monitorRecord) error { switch { case err == nil: // The tx is valid, store it. - t.storeRecord(r.requestID, sweepCtx, r.req, f) + record := t.updateRecord(r, sweepCtx) log.Infof("Created initial sweep tx=%v for %v inputs: "+ "feerate=%v, fee=%v, inputs:\n%v", @@ -541,7 +543,7 @@ func (t *TxPublisher) createRBFCompliantTx(r *monitorRecord) error { f.FeeRate(), sweepCtx.fee, inputTypeSummary(r.req.Inputs)) - return nil + return record, nil // If the error indicates the fees paid is not enough, we will // ask the fee function to increase the fee rate and retry. @@ -572,7 +574,7 @@ func (t *TxPublisher) createRBFCompliantTx(r *monitorRecord) error { // cluster these inputs differetly. increased, err = f.Increment() if err != nil { - return err + return nil, err } } @@ -582,7 +584,7 @@ func (t *TxPublisher) createRBFCompliantTx(r *monitorRecord) error { // mempool acceptance. default: log.Debugf("Failed to create RBF-compliant tx: %v", err) - return err + return nil, err } } } @@ -645,13 +647,7 @@ func (t *TxPublisher) createAndCheckTx(req *BumpRequest, // the event channel to the record. Any broadcast-related errors will not be // returned here, instead, they will be put inside the `BumpResult` and // returned to the caller. -func (t *TxPublisher) broadcast(requestID uint64) (*BumpResult, error) { - // Get the record being monitored. - record, ok := t.records.Load(requestID) - if !ok { - return nil, fmt.Errorf("tx record %v not found", requestID) - } - +func (t *TxPublisher) broadcast(record *monitorRecord) (*BumpResult, error) { txid := record.tx.TxHash() tx := record.tx @@ -698,7 +694,7 @@ func (t *TxPublisher) broadcast(requestID uint64) (*BumpResult, error) { Fee: record.fee, FeeRate: record.feeFunction.FeeRate(), Err: err, - requestID: requestID, + requestID: record.requestID, } return result, nil @@ -801,10 +797,6 @@ type monitorRecord struct { // outpointToTxIndex is a map of outpoint to tx index. outpointToTxIndex map[wire.OutPoint]int - - // spendNotifiers is a map of spend notifiers registered for all the - // inputs. - spendNotifiers map[wire.OutPoint]*chainntnfs.SpendEvent } // Start starts the publisher by subscribing to block epoch updates and kicking @@ -1047,7 +1039,7 @@ func (t *TxPublisher) handleInitialBroadcast(r *monitorRecord) { // RBF rules. // // Create the initial tx to be broadcasted. - err = t.initializeTx(r) + record, err := t.initializeTx(r) if err != nil { log.Errorf("Initial broadcast failed: %v", err) @@ -1058,7 +1050,7 @@ func (t *TxPublisher) handleInitialBroadcast(r *monitorRecord) { } // Successfully created the first tx, now broadcast it. - result, err = t.broadcast(r.requestID) + result, err = t.broadcast(record) if err != nil { // The broadcast failed, which can only happen if the tx record // cannot be found or the aux sweeper returns an error. In @@ -1203,10 +1195,10 @@ func (t *TxPublisher) createAndPublishTx( // The tx has been created without any errors, we now register a new // record by overwriting the same requestID. - t.storeRecord(r.requestID, sweepCtx, r.req, r.feeFunction) + record := t.updateRecord(r, sweepCtx) // Attempt to broadcast this new tx. - result, err := t.broadcast(r.requestID) + result, err := t.broadcast(record) if err != nil { log.Infof("Failed to broadcast replacement tx %v: %v", sweepCtx.tx.TxHash(), err) diff --git a/sweep/fee_bumper_test.go b/sweep/fee_bumper_test.go index 527e259538..0531dec8d9 100644 --- a/sweep/fee_bumper_test.go +++ b/sweep/fee_bumper_test.go @@ -313,9 +313,9 @@ func TestInitializeFeeFunction(t *testing.T) { require.Equal(t, feerate, f.FeeRate()) } -// TestStoreRecord correctly increases the request counter and saves the +// TestUpdateRecord correctly updates the fields fee and tx, and saves the // record. -func TestStoreRecord(t *testing.T) { +func TestUpdateRecord(t *testing.T) { t.Parallel() // Create a test input. @@ -358,8 +358,15 @@ func TestStoreRecord(t *testing.T) { outpointToTxIndex: utxoIndex, } + // Create a test record. + record := &monitorRecord{ + requestID: initialCounter, + req: req, + feeFunction: feeFunc, + } + // Call the method under test. - tp.storeRecord(initialCounter, sweepCtx, req, feeFunc) + tp.updateRecord(record, sweepCtx) // Read the saved record and compare. record, ok := tp.records.Load(initialCounter) @@ -676,10 +683,19 @@ func TestCreateRBFCompliantTx(t *testing.T) { tc.setupMock() // Call the method under test. - err := tp.createRBFCompliantTx(record) + rec, err := tp.createRBFCompliantTx(record) // Check the result is as expected. require.ErrorIs(t, err, tc.expectedErr) + + if tc.expectedErr != nil { + return + } + + // Assert the returned record has the following fields + // populated. + require.NotEmpty(t, rec.tx) + require.NotEmpty(t, rec.fee) }) } } @@ -721,13 +737,13 @@ func TestTxPublisherBroadcast(t *testing.T) { outpointToTxIndex: utxoIndex, } - tp.storeRecord(requestID, sweepCtx, req, m.feeFunc) - - // Quickly check when the requestID cannot be found, an error is - // returned. - result, err := tp.broadcast(uint64(1000)) - require.Error(t, err) - require.Nil(t, result) + // Create a test record. + record := &monitorRecord{ + requestID: requestID, + req: req, + feeFunction: m.feeFunc, + } + rec := tp.updateRecord(record, sweepCtx) testCases := []struct { name string @@ -782,7 +798,7 @@ func TestTxPublisherBroadcast(t *testing.T) { tc.setupMock() // Call the method under test. - result, err := tp.broadcast(requestID) + result, err := tp.broadcast(rec) // Check the result is as expected. require.ErrorIs(t, err, tc.expectedErr) @@ -838,7 +854,15 @@ func TestRemoveResult(t *testing.T) { name: "remove on TxConfirmed", setupRecord: func() uint64 { rid := requestCounter.Add(1) - tp.storeRecord(rid, sweepCtx, req, m.feeFunc) + + // Create a test record. + record := &monitorRecord{ + requestID: rid, + req: req, + feeFunction: m.feeFunc, + } + + tp.updateRecord(record, sweepCtx) tp.subscriberChans.Store(rid, nil) return rid @@ -854,7 +878,15 @@ func TestRemoveResult(t *testing.T) { name: "remove on TxFailed", setupRecord: func() uint64 { rid := requestCounter.Add(1) - tp.storeRecord(rid, sweepCtx, req, m.feeFunc) + + // Create a test record. + record := &monitorRecord{ + requestID: rid, + req: req, + feeFunction: m.feeFunc, + } + + tp.updateRecord(record, sweepCtx) tp.subscriberChans.Store(rid, nil) return rid @@ -871,7 +903,15 @@ func TestRemoveResult(t *testing.T) { name: "noop when tx is not confirmed or failed", setupRecord: func() uint64 { rid := requestCounter.Add(1) - tp.storeRecord(rid, sweepCtx, req, m.feeFunc) + + // Create a test record. + record := &monitorRecord{ + requestID: rid, + req: req, + feeFunction: m.feeFunc, + } + + tp.updateRecord(record, sweepCtx) tp.subscriberChans.Store(rid, nil) return rid @@ -937,8 +977,14 @@ func TestNotifyResult(t *testing.T) { fee: fee, outpointToTxIndex: utxoIndex, } + // Create a test record. + record := &monitorRecord{ + requestID: requestID, + req: req, + feeFunction: m.feeFunc, + } - tp.storeRecord(requestID, sweepCtx, req, m.feeFunc) + tp.updateRecord(record, sweepCtx) // Create a subscription to the event. subscriber := make(chan *BumpResult, 1) @@ -1250,7 +1296,14 @@ func TestHandleTxConfirmed(t *testing.T) { outpointToTxIndex: utxoIndex, } - tp.storeRecord(requestID, sweepCtx, req, m.feeFunc) + // Create a test record. + record := &monitorRecord{ + requestID: requestID, + req: req, + feeFunction: m.feeFunc, + } + + tp.updateRecord(record, sweepCtx) record, ok := tp.records.Load(requestID) require.True(t, ok) @@ -1340,7 +1393,7 @@ func TestHandleFeeBumpTx(t *testing.T) { outpointToTxIndex: utxoIndex, } - tp.storeRecord(requestID, sweepCtx, req, m.feeFunc) + tp.updateRecord(record, sweepCtx) // Create a subscription to the event. subscriber := make(chan *BumpResult, 1) From a738e7fc2926edcf304c26d4d7d865b82c1e1e05 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Sun, 26 Jan 2025 09:58:06 +0800 Subject: [PATCH 5/5] docs: update release notes --- docs/release-notes/release-notes-0.19.0.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/docs/release-notes/release-notes-0.19.0.md b/docs/release-notes/release-notes-0.19.0.md index fb1a6ebf36..8450416a9e 100644 --- a/docs/release-notes/release-notes-0.19.0.md +++ b/docs/release-notes/release-notes-0.19.0.md @@ -86,6 +86,7 @@ ## Functional Enhancements * [Add ability](https://github.com/lightningnetwork/lnd/pull/8998) to paginate wallet transactions. + ## RPC Additions * [Add a new rpc endpoint](https://github.com/lightningnetwork/lnd/pull/8843) @@ -291,6 +292,10 @@ The underlying functionality between those two options remain the same. StateMachine](https://github.com/lightningnetwork/lnd/pull/9342) to use the new GoroutineManager API along with structured logging. +* A minor [refactor](https://github.com/lightningnetwork/lnd/pull/9446) is done + to the sweeper to improve code quality, with a renaming of the internal state + (`Failed` -> `Fatal`) used by the inputs tracked in the sweeper. + ## Tooling and Documentation * [Improved `lncli create` command help text](https://github.com/lightningnetwork/lnd/pull/9077)