From 5792caf68c4c670843011df8df7e9afc21bb029b Mon Sep 17 00:00:00 2001 From: yukang Date: Mon, 6 Jan 2025 00:29:45 +0800 Subject: [PATCH] add handle add_tlc result for forwarding tlc --- src/fiber/channel.rs | 226 ++++++++++++++++++++++++++++--------- src/fiber/network.rs | 12 +- src/fiber/tests/channel.rs | 4 +- src/fiber/tests/payment.rs | 6 +- src/fiber/types.rs | 13 ++- 5 files changed, 196 insertions(+), 65 deletions(-) diff --git a/src/fiber/channel.rs b/src/fiber/channel.rs index 8426b7c13..6aac40b15 100644 --- a/src/fiber/channel.rs +++ b/src/fiber/channel.rs @@ -66,6 +66,7 @@ use tentacle::secio::PeerId; use thiserror::Error; use tokio::sync::oneshot; +use super::types::ForwardTlcResult; use std::{ collections::HashSet, fmt::{self, Debug}, @@ -146,6 +147,7 @@ pub enum ChannelCommand { ), Shutdown(ShutdownCommand, RpcReplyPort>), Update(UpdateCommand, RpcReplyPort>), + ForwardTlcResult(ForwardTlcResult), #[cfg(test)] ReloadState(), } @@ -718,7 +720,7 @@ where } } - async fn handle_add_tlc_error( + async fn process_add_tlc_error( &self, myself: &ActorRef, state: &mut ChannelActorState, @@ -775,7 +777,7 @@ where for add_tlc in apply_tlcs { assert!(add_tlc.is_received()); if let Err(error) = self.apply_add_tlc_operation(myself, state, &add_tlc).await { - self.handle_add_tlc_error( + self.process_add_tlc_error( myself, state, add_tlc.payment_hash, @@ -1148,6 +1150,7 @@ where peeled_onion_packet, previous_tlc: Some((state.get_id(), added_tlc_id)), payment_hash, + wait_for_add_tlc_reply: false, }, rpc_reply, ), @@ -1479,7 +1482,7 @@ where peeled_onion_packet: PeeledPaymentOnionPacket, ) { let forward_tlc = - RetryableTlcOperation::ForwardTlc(payment_hash, tlc_id, peeled_onion_packet); + RetryableTlcOperation::ForwardTlc(payment_hash, tlc_id, peeled_onion_packet, true); self.register_retryable_tlc_operation(myself, state, forward_tlc) .await; } @@ -1499,6 +1502,22 @@ where } } + fn set_forward_tlc_status( + &self, + state: &mut ChannelActorState, + payment_hash: Hash256, + try_one_time: bool, + ) { + if let Some(RetryableTlcOperation::ForwardTlc(_, _, _, ref mut sent)) = state + .tlc_state + .retryable_tlc_operations + .iter_mut() + .find(|op| matches!(op, RetryableTlcOperation::ForwardTlc(ph, _, _, _) if *ph == payment_hash)) + { + *sent = try_one_time; + } + } + pub async fn apply_retryable_tlc_operations( &self, myself: &ActorRef, @@ -1506,7 +1525,7 @@ where ) { let pending_tlc_ops = state.tlc_state.get_pending_operations(); for retryable_operation in pending_tlc_ops.into_iter() { - let need_retry = match retryable_operation { + let keep = match retryable_operation { RetryableTlcOperation::RemoveTlc(tlc_id, ref reason) => { match self.handle_remove_tlc_command( state, @@ -1516,20 +1535,8 @@ where }, ) { Ok(_) | Err(ProcessingChannelError::RepeatedProcessing(_)) => false, - Err(ProcessingChannelError::WaitingTlcAck) => { - error!( - "Failed to remove tlc: {:?} because of WaitingTlcAck, retry it later", - &retryable_operation - ); - true - } - Err(err) => { - error!( - "Failed to remove tlc: {:?} with reason: {:?}, will not retry", - &retryable_operation, err - ); - false - } + Err(ProcessingChannelError::WaitingTlcAck) => true, + Err(_err) => false, } } RetryableTlcOperation::RelayRemoveTlc(channel_id, tlc_id, ref reason) => { @@ -1557,45 +1564,47 @@ where payment_hash, tlc_id, ref peeled_onion_packet, + try_one_time, ) => { - let res = self + // there is a potential deadlock for waiting the result from another channel actor + // for the scenario these two things happen at the same time: + // 1. channel A send forward tlc to channel B + // 2. channel B send forward tlc to channel A + // we may end up waiting for each other forever + // + // but we need the result for better error handling + // so we introduce the ForwardTlcResult to get the resule based on actor message + + if !try_one_time { + // we need to decide whether to retry it until we get ForwardTlcResult + continue; + } + match self .handle_forward_onion_packet( state, payment_hash, peeled_onion_packet.clone(), tlc_id.into(), ) - .await; - match res { - Ok(_) => false, - Err(err) => match err { - ProcessingChannelError::WaitingTlcAck - | ProcessingChannelError::TlcForwardingError(TlcErr { - error_code: TlcErrorCode::WaitingTlcAck, - .. - }) => { - error!( - "Failed to forward tlc: {:?} because of WaitingTlcAck, retry it later", - &payment_hash - ); - true - } - _ => { - let shared_secret = peeled_onion_packet.shared_secret.clone(); - let err = err.with_shared_secret(shared_secret); - self.handle_add_tlc_error(myself, state, payment_hash, tlc_id, err) - .await; - error!( - "Failed to forward tlc: {:?} because of TlcForwardingError, will not retry", - &payment_hash - ); - false - } - }, + .await + { + Ok(_) => { + // here we just make sure the forward tlc is sent, we don't need to wait for the result + // retry it if necessary until we get ForwardTlcResult + self.set_forward_tlc_status(state, payment_hash, false); + true + } + Err(err) => { + let shared_secret = peeled_onion_packet.shared_secret.clone(); + let err = err.with_shared_secret(shared_secret); + self.process_add_tlc_error(myself, state, payment_hash, tlc_id, err) + .await; + false + } } } }; - if !need_retry { + if !keep { state .tlc_state .remove_pending_tlc_operation(&retryable_operation); @@ -1603,13 +1612,59 @@ where } // If there are more pending removes, we will retry it later - if !state.tlc_state.get_pending_operations().is_empty() { + if state.tlc_state.has_pending_operations() { myself.send_after(RETRYABLE_TLC_OPS_INTERVAL, || { ChannelActorMessage::Event(ChannelEvent::CheckTlcRetryOperation) }); } } + async fn handle_forward_tlc_result( + &self, + myself: &ActorRef, + state: &mut ChannelActorState, + result: ForwardTlcResult, + ) { + let pending_ops = state.tlc_state.get_pending_operations(); + if let Some((op, peeled_onion)) = pending_ops.iter().find_map(|op| match op { + RetryableTlcOperation::ForwardTlc(payment_hash, _, peel_onion_packet, _) + if *payment_hash == result.payment_hash => + { + Some((op, peel_onion_packet)) + } + _ => None, + }) { + if let Some(tlc_err) = result.tlc_err { + match result.channel_error { + ProcessingChannelError::WaitingTlcAck => { + // if we get WaitingTlcAck error, we will retry it later + self.set_forward_tlc_status(state, result.payment_hash, true); + myself.send_after(RETRYABLE_TLC_OPS_INTERVAL, || { + ChannelActorMessage::Event(ChannelEvent::CheckTlcRetryOperation) + }); + } + ProcessingChannelError::RepeatedProcessing(_) => { + // ignore repeated processing error, we have already handled it + state.tlc_state.remove_pending_tlc_operation(op); + } + _ => { + let error = ProcessingChannelError::TlcForwardingError(tlc_err); + let err = error.with_shared_secret(peeled_onion.shared_secret.clone()); + self.process_add_tlc_error( + myself, + state, + result.payment_hash, + TLCId::Received(result.tlc_id), + err, + ) + .await; + state.tlc_state.remove_pending_tlc_operation(op); + } + } + } + } + } + // This is the dual of `handle_tx_collaboration_msg`. Any logic error here is likely // to present in the other function as well. pub fn handle_tx_collaboration_command( @@ -1725,14 +1780,33 @@ where } ChannelCommand::CommitmentSigned() => self.handle_commitment_signed_command(state), ChannelCommand::AddTlc(command, reply) => { - match self.handle_add_tlc_command(state, command) { + match self.handle_add_tlc_command(state, command.clone()) { Ok(tlc_id) => { let _ = reply.send(Ok(AddTlcResponse { tlc_id })); Ok(()) } Err(err) => { - debug!("Error processing AddTlc command: {:?}", &err); let tlc_err = self.get_tlc_error(state, &err).await; + if let Some((channel_id, tlc_id)) = command.previous_tlc { + self.network + .send_message(NetworkActorMessage::new_command( + NetworkActorCommand::ControlFiberChannel( + ChannelCommandWithId { + channel_id: channel_id, + command: ChannelCommand::ForwardTlcResult( + ForwardTlcResult { + channel_id, + payment_hash: command.payment_hash, + tlc_id, + channel_error: err.clone(), + tlc_err: Some(tlc_err.clone()), + }, + ), + }, + ), + )) + .expect("network actor alive"); + } let _ = reply.send(Err(tlc_err)); Err(err) } @@ -1789,6 +1863,11 @@ where } } } + ChannelCommand::ForwardTlcResult(forward_tlc_res) => { + self.handle_forward_tlc_result(myself, state, forward_tlc_res) + .await; + Ok(()) + } #[cfg(test)] ChannelCommand::ReloadState() => { *state = self @@ -2239,6 +2318,21 @@ where self.store.insert_channel_actor_state(state.clone()); Ok(()) } + + async fn post_start( + &self, + myself: ActorRef, + state: &mut Self::State, + ) -> Result<(), ActorProcessingErr> { + if state.tlc_state.has_pending_operations() { + myself + .send_message(ChannelActorMessage::Event( + ChannelEvent::CheckTlcRetryOperation, + )) + .expect("myself alive"); + } + Ok(()) + } } #[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] @@ -2484,7 +2578,7 @@ impl From for TlcNotifyInfo { pub enum RetryableTlcOperation { RemoveTlc(TLCId, RemoveTlcReason), RelayRemoveTlc(Hash256, u64, RemoveTlcReason), - ForwardTlc(Hash256, TLCId, PeeledPaymentOnionPacket), + ForwardTlc(Hash256, TLCId, PeeledPaymentOnionPacket, bool), } impl Debug for RetryableTlcOperation { @@ -2501,7 +2595,7 @@ impl Debug for RetryableTlcOperation { .field(tlc_id) .field(reason) .finish(), - RetryableTlcOperation::ForwardTlc(payment_hash, tlc_id, _) => f + RetryableTlcOperation::ForwardTlc(payment_hash, tlc_id, ..) => f .debug_tuple("ForwardTlc") .field(payment_hash) .field(tlc_id) @@ -2667,9 +2761,25 @@ impl TlcState { self.retryable_tlc_operations.clone() } + pub fn has_pending_operations(&self) -> bool { + !self.retryable_tlc_operations.is_empty() + } + pub fn remove_pending_tlc_operation(&mut self, retryable_tlc_op: &RetryableTlcOperation) { self.retryable_tlc_operations .retain(|op| op != retryable_tlc_op); + + // if we already finished the RemoveTlc operation for the tlc, + // we should also remove the ForwardTlc to avoid any later retry. + match retryable_tlc_op { + RetryableTlcOperation::RemoveTlc(tlc_id, _) => { + self.retryable_tlc_operations.retain(|op| match op { + RetryableTlcOperation::ForwardTlc(_, id, ..) => id != tlc_id, + _ => true, + }); + } + _ => {} + } } pub fn add_offered_tlc(&mut self, tlc: TlcInfo) { @@ -4467,6 +4577,10 @@ impl ChannelActorState { to_local_amount, to_remote_amount, tlc_id, reason); } self.tlc_state.apply_remove_tlc(tlc_id); + debug!( + "Removed tlc payment_hash {:?} with reason {:?}", + current.payment_hash, reason + ); Ok((current.clone(), reason)) } @@ -4637,10 +4751,10 @@ impl ChannelActorState { local: local_commitment_number, remote: remote_commitment_number, } = tlc.get_commitment_numbers(); - debug!( - "Local commitment number: {}, remote commitment number: {}", - local_commitment_number, remote_commitment_number - ); + // debug!( + // "Local commitment number: {}, remote commitment number: {}", + // local_commitment_number, remote_commitment_number + // ); let local_pubkey = derive_tlc_pubkey( &self.get_local_channel_public_keys().tlc_base_key, &self.get_local_commitment_point(remote_commitment_number), diff --git a/src/fiber/network.rs b/src/fiber/network.rs index a88afdfe2..aed2fb245 100644 --- a/src/fiber/network.rs +++ b/src/fiber/network.rs @@ -502,6 +502,7 @@ pub struct SendOnionPacketCommand { pub peeled_onion_packet: PeeledPaymentOnionPacket, pub previous_tlc: Option<(Hash256, u64)>, pub payment_hash: Hash256, + pub wait_for_add_tlc_reply: bool, } impl NetworkActorMessage { @@ -1366,6 +1367,7 @@ where peeled_onion_packet, previous_tlc, payment_hash, + wait_for_add_tlc_reply, } = command; let info = peeled_onion_packet.current.clone(); @@ -1407,8 +1409,12 @@ where // we have already checked the channel_id is valid, match state.send_command_to_channel(*channel_id, command).await { Ok(()) => { - let add_tlc_res = recv.await.expect("recv error").map(|res| res.tlc_id); - reply.send(add_tlc_res).expect("send error"); + if wait_for_add_tlc_reply { + let add_tlc_res = recv.await.expect("recv error").map(|res| res.tlc_id); + reply.send(add_tlc_res).expect("send error"); + } else { + reply.send(Ok(u64::MAX)).expect("send error"); + } } Err(err) => { error!( @@ -1613,6 +1619,7 @@ where peeled_onion_packet, previous_tlc: None, payment_hash: payment_data.payment_hash, + wait_for_add_tlc_reply: true, }; self.handle_send_onion_packet_command(state, command, rpc_reply) @@ -1679,7 +1686,6 @@ where // If this is the first hop error, like the WaitingTlcAck error, // we will just retry later, return Ok here for letting endpoint user // know payment session is created successfully - // self.store.insert_payment_session(payment_session.clone()); myself.send_after(Duration::from_millis(500), move || { NetworkActorMessage::new_event(NetworkActorEvent::RetrySendPayment( payment_hash, diff --git a/src/fiber/tests/channel.rs b/src/fiber/tests/channel.rs index 35d2072c9..a57480325 100644 --- a/src/fiber/tests/channel.rs +++ b/src/fiber/tests/channel.rs @@ -1450,7 +1450,7 @@ async fn test_send_payment_with_max_nodes() { assert!(res.fee > 0); // sleep for 2 seconds to make sure the payment is sent - tokio::time::sleep(tokio::time::Duration::from_millis(8000)).await; + tokio::time::sleep(tokio::time::Duration::from_millis(12000)).await; let message = |rpc_reply| -> NetworkActorMessage { NetworkActorMessage::Command(NetworkActorCommand::GetPayment(res.payment_hash, rpc_reply)) @@ -5298,7 +5298,7 @@ async fn test_send_payment_will_succeed_with_retry_in_middle_hops() { .unwrap(); let payment_hash = res.payment_hash; - tokio::time::sleep(tokio::time::Duration::from_secs(3)).await; + tokio::time::sleep(tokio::time::Duration::from_secs(4)).await; let fee = res.fee; eprintln!("fee: {:?}", fee); diff --git a/src/fiber/tests/payment.rs b/src/fiber/tests/payment.rs index 7d3f27ff0..2c34242bf 100644 --- a/src/fiber/tests/payment.rs +++ b/src/fiber/tests/payment.rs @@ -510,7 +510,7 @@ async fn test_send_payment_with_route_to_self_with_hop_hints() { assert!(res.is_ok()); // sleep for a while - tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; + tokio::time::sleep(tokio::time::Duration::from_secs(3)).await; let res = res.unwrap(); let payment_hash = res.payment_hash; node_0 @@ -1370,7 +1370,7 @@ async fn test_send_payment_bench_test() { let mut all_sent = HashSet::new(); - for i in 1..=15 { + for i in 1..=10 { let payment = node_0.send_payment_keysend(&node_2, 1000).await.unwrap(); eprintln!("payment: {:?}", payment); all_sent.insert(payment.payment_hash); @@ -1480,7 +1480,7 @@ async fn test_send_payment_three_nodes_send_each_other_bench_test() { let mut all_sent = vec![]; - for i in 1..=8 { + for i in 1..=5 { let payment1 = node_0.send_payment_keysend(&node_2, 1000).await.unwrap(); all_sent.push(payment1.payment_hash); eprintln!("send: {} payment_hash: {:?} sent", i, payment1.payment_hash); diff --git a/src/fiber/types.rs b/src/fiber/types.rs index c65461f45..a96c8334f 100644 --- a/src/fiber/types.rs +++ b/src/fiber/types.rs @@ -1,4 +1,6 @@ -use super::channel::{ChannelFlags, CHANNEL_DISABLED_FLAG, MESSAGE_OF_NODE2_FLAG}; +use super::channel::{ + ChannelFlags, ProcessingChannelError, CHANNEL_DISABLED_FLAG, MESSAGE_OF_NODE2_FLAG, +}; use super::config::AnnouncedNodeName; use super::gen::fiber::{ self as molecule_fiber, ChannelUpdateOpt, PaymentPreimageOpt, PubNonce as Byte66, PubkeyOpt, @@ -1653,6 +1655,15 @@ impl TryFrom for AnnouncementSignatures } } +#[derive(Debug, Clone)] +pub struct ForwardTlcResult { + pub channel_id: Hash256, + pub payment_hash: Hash256, + pub tlc_id: u64, + pub channel_error: ProcessingChannelError, + pub tlc_err: Option, +} + #[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Hash)] pub struct NodeAnnouncement { // Signature to this message, may be empty the message is not signed yet.