Skip to content

Commit

Permalink
add handle add_tlc result for forwarding tlc
Browse files Browse the repository at this point in the history
  • Loading branch information
chenyukang committed Jan 5, 2025
1 parent 9ee7019 commit 5792caf
Show file tree
Hide file tree
Showing 5 changed files with 196 additions and 65 deletions.
226 changes: 170 additions & 56 deletions src/fiber/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ use tentacle::secio::PeerId;
use thiserror::Error;
use tokio::sync::oneshot;

use super::types::ForwardTlcResult;
use std::{
collections::HashSet,
fmt::{self, Debug},
Expand Down Expand Up @@ -146,6 +147,7 @@ pub enum ChannelCommand {
),
Shutdown(ShutdownCommand, RpcReplyPort<Result<(), String>>),
Update(UpdateCommand, RpcReplyPort<Result<(), String>>),
ForwardTlcResult(ForwardTlcResult),
#[cfg(test)]
ReloadState(),
}
Expand Down Expand Up @@ -718,7 +720,7 @@ where
}
}

async fn handle_add_tlc_error(
async fn process_add_tlc_error(
&self,
myself: &ActorRef<ChannelActorMessage>,
state: &mut ChannelActorState,
Expand Down Expand Up @@ -775,7 +777,7 @@ where
for add_tlc in apply_tlcs {
assert!(add_tlc.is_received());
if let Err(error) = self.apply_add_tlc_operation(myself, state, &add_tlc).await {
self.handle_add_tlc_error(
self.process_add_tlc_error(
myself,
state,
add_tlc.payment_hash,
Expand Down Expand Up @@ -1148,6 +1150,7 @@ where
peeled_onion_packet,
previous_tlc: Some((state.get_id(), added_tlc_id)),
payment_hash,
wait_for_add_tlc_reply: false,
},
rpc_reply,
),
Expand Down Expand Up @@ -1479,7 +1482,7 @@ where
peeled_onion_packet: PeeledPaymentOnionPacket,
) {
let forward_tlc =
RetryableTlcOperation::ForwardTlc(payment_hash, tlc_id, peeled_onion_packet);
RetryableTlcOperation::ForwardTlc(payment_hash, tlc_id, peeled_onion_packet, true);
self.register_retryable_tlc_operation(myself, state, forward_tlc)
.await;
}
Expand All @@ -1499,14 +1502,30 @@ where
}
}

fn set_forward_tlc_status(
&self,
state: &mut ChannelActorState,
payment_hash: Hash256,
try_one_time: bool,
) {
if let Some(RetryableTlcOperation::ForwardTlc(_, _, _, ref mut sent)) = state
.tlc_state
.retryable_tlc_operations
.iter_mut()
.find(|op| matches!(op, RetryableTlcOperation::ForwardTlc(ph, _, _, _) if *ph == payment_hash))
{
*sent = try_one_time;
}
}

pub async fn apply_retryable_tlc_operations(
&self,
myself: &ActorRef<ChannelActorMessage>,
state: &mut ChannelActorState,
) {
let pending_tlc_ops = state.tlc_state.get_pending_operations();
for retryable_operation in pending_tlc_ops.into_iter() {
let need_retry = match retryable_operation {
let keep = match retryable_operation {
RetryableTlcOperation::RemoveTlc(tlc_id, ref reason) => {
match self.handle_remove_tlc_command(
state,
Expand All @@ -1516,20 +1535,8 @@ where
},
) {
Ok(_) | Err(ProcessingChannelError::RepeatedProcessing(_)) => false,
Err(ProcessingChannelError::WaitingTlcAck) => {
error!(
"Failed to remove tlc: {:?} because of WaitingTlcAck, retry it later",
&retryable_operation
);
true
}
Err(err) => {
error!(
"Failed to remove tlc: {:?} with reason: {:?}, will not retry",
&retryable_operation, err
);
false
}
Err(ProcessingChannelError::WaitingTlcAck) => true,
Err(_err) => false,
}
}
RetryableTlcOperation::RelayRemoveTlc(channel_id, tlc_id, ref reason) => {
Expand Down Expand Up @@ -1557,59 +1564,107 @@ where
payment_hash,
tlc_id,
ref peeled_onion_packet,
try_one_time,
) => {
let res = self
// there is a potential deadlock for waiting the result from another channel actor
// for the scenario these two things happen at the same time:
// 1. channel A send forward tlc to channel B
// 2. channel B send forward tlc to channel A
// we may end up waiting for each other forever
//
// but we need the result for better error handling
// so we introduce the ForwardTlcResult to get the resule based on actor message

if !try_one_time {
// we need to decide whether to retry it until we get ForwardTlcResult
continue;
}
match self
.handle_forward_onion_packet(
state,
payment_hash,
peeled_onion_packet.clone(),
tlc_id.into(),
)
.await;
match res {
Ok(_) => false,
Err(err) => match err {
ProcessingChannelError::WaitingTlcAck
| ProcessingChannelError::TlcForwardingError(TlcErr {
error_code: TlcErrorCode::WaitingTlcAck,
..
}) => {
error!(
"Failed to forward tlc: {:?} because of WaitingTlcAck, retry it later",
&payment_hash
);
true
}
_ => {
let shared_secret = peeled_onion_packet.shared_secret.clone();
let err = err.with_shared_secret(shared_secret);
self.handle_add_tlc_error(myself, state, payment_hash, tlc_id, err)
.await;
error!(
"Failed to forward tlc: {:?} because of TlcForwardingError, will not retry",
&payment_hash
);
false
}
},
.await
{
Ok(_) => {
// here we just make sure the forward tlc is sent, we don't need to wait for the result
// retry it if necessary until we get ForwardTlcResult
self.set_forward_tlc_status(state, payment_hash, false);
true
}
Err(err) => {
let shared_secret = peeled_onion_packet.shared_secret.clone();
let err = err.with_shared_secret(shared_secret);
self.process_add_tlc_error(myself, state, payment_hash, tlc_id, err)
.await;
false
}
}
}
};
if !need_retry {
if !keep {
state
.tlc_state
.remove_pending_tlc_operation(&retryable_operation);
}
}

// If there are more pending removes, we will retry it later
if !state.tlc_state.get_pending_operations().is_empty() {
if state.tlc_state.has_pending_operations() {
myself.send_after(RETRYABLE_TLC_OPS_INTERVAL, || {
ChannelActorMessage::Event(ChannelEvent::CheckTlcRetryOperation)
});
}
}

async fn handle_forward_tlc_result(
&self,
myself: &ActorRef<ChannelActorMessage>,
state: &mut ChannelActorState,
result: ForwardTlcResult,
) {
let pending_ops = state.tlc_state.get_pending_operations();
if let Some((op, peeled_onion)) = pending_ops.iter().find_map(|op| match op {
RetryableTlcOperation::ForwardTlc(payment_hash, _, peel_onion_packet, _)
if *payment_hash == result.payment_hash =>
{
Some((op, peel_onion_packet))
}
_ => None,
}) {
if let Some(tlc_err) = result.tlc_err {
match result.channel_error {
ProcessingChannelError::WaitingTlcAck => {
// if we get WaitingTlcAck error, we will retry it later
self.set_forward_tlc_status(state, result.payment_hash, true);
myself.send_after(RETRYABLE_TLC_OPS_INTERVAL, || {
ChannelActorMessage::Event(ChannelEvent::CheckTlcRetryOperation)
});
}
ProcessingChannelError::RepeatedProcessing(_) => {
// ignore repeated processing error, we have already handled it
state.tlc_state.remove_pending_tlc_operation(op);
}
_ => {
let error = ProcessingChannelError::TlcForwardingError(tlc_err);
let err = error.with_shared_secret(peeled_onion.shared_secret.clone());
self.process_add_tlc_error(
myself,
state,
result.payment_hash,
TLCId::Received(result.tlc_id),
err,
)
.await;
state.tlc_state.remove_pending_tlc_operation(op);
}
}
}
}
}

// This is the dual of `handle_tx_collaboration_msg`. Any logic error here is likely
// to present in the other function as well.
pub fn handle_tx_collaboration_command(
Expand Down Expand Up @@ -1725,14 +1780,33 @@ where
}
ChannelCommand::CommitmentSigned() => self.handle_commitment_signed_command(state),
ChannelCommand::AddTlc(command, reply) => {
match self.handle_add_tlc_command(state, command) {
match self.handle_add_tlc_command(state, command.clone()) {
Ok(tlc_id) => {
let _ = reply.send(Ok(AddTlcResponse { tlc_id }));
Ok(())
}
Err(err) => {
debug!("Error processing AddTlc command: {:?}", &err);
let tlc_err = self.get_tlc_error(state, &err).await;
if let Some((channel_id, tlc_id)) = command.previous_tlc {
self.network
.send_message(NetworkActorMessage::new_command(
NetworkActorCommand::ControlFiberChannel(
ChannelCommandWithId {
channel_id: channel_id,
command: ChannelCommand::ForwardTlcResult(
ForwardTlcResult {
channel_id,
payment_hash: command.payment_hash,
tlc_id,
channel_error: err.clone(),
tlc_err: Some(tlc_err.clone()),
},
),
},
),
))
.expect("network actor alive");
}
let _ = reply.send(Err(tlc_err));
Err(err)
}
Expand Down Expand Up @@ -1789,6 +1863,11 @@ where
}
}
}
ChannelCommand::ForwardTlcResult(forward_tlc_res) => {
self.handle_forward_tlc_result(myself, state, forward_tlc_res)
.await;
Ok(())
}
#[cfg(test)]
ChannelCommand::ReloadState() => {
*state = self
Expand Down Expand Up @@ -2239,6 +2318,21 @@ where
self.store.insert_channel_actor_state(state.clone());
Ok(())
}

async fn post_start(
&self,
myself: ActorRef<Self::Msg>,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
if state.tlc_state.has_pending_operations() {
myself
.send_message(ChannelActorMessage::Event(
ChannelEvent::CheckTlcRetryOperation,
))
.expect("myself alive");
}
Ok(())
}
}

#[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
Expand Down Expand Up @@ -2484,7 +2578,7 @@ impl From<TlcInfo> for TlcNotifyInfo {
pub enum RetryableTlcOperation {
RemoveTlc(TLCId, RemoveTlcReason),
RelayRemoveTlc(Hash256, u64, RemoveTlcReason),
ForwardTlc(Hash256, TLCId, PeeledPaymentOnionPacket),
ForwardTlc(Hash256, TLCId, PeeledPaymentOnionPacket, bool),
}

impl Debug for RetryableTlcOperation {
Expand All @@ -2501,7 +2595,7 @@ impl Debug for RetryableTlcOperation {
.field(tlc_id)
.field(reason)
.finish(),
RetryableTlcOperation::ForwardTlc(payment_hash, tlc_id, _) => f
RetryableTlcOperation::ForwardTlc(payment_hash, tlc_id, ..) => f
.debug_tuple("ForwardTlc")
.field(payment_hash)
.field(tlc_id)
Expand Down Expand Up @@ -2667,9 +2761,25 @@ impl TlcState {
self.retryable_tlc_operations.clone()
}

pub fn has_pending_operations(&self) -> bool {
!self.retryable_tlc_operations.is_empty()
}

pub fn remove_pending_tlc_operation(&mut self, retryable_tlc_op: &RetryableTlcOperation) {
self.retryable_tlc_operations
.retain(|op| op != retryable_tlc_op);

// if we already finished the RemoveTlc operation for the tlc,
// we should also remove the ForwardTlc to avoid any later retry.
match retryable_tlc_op {
RetryableTlcOperation::RemoveTlc(tlc_id, _) => {
self.retryable_tlc_operations.retain(|op| match op {
RetryableTlcOperation::ForwardTlc(_, id, ..) => id != tlc_id,
_ => true,
});
}
_ => {}
}
}

pub fn add_offered_tlc(&mut self, tlc: TlcInfo) {
Expand Down Expand Up @@ -4467,6 +4577,10 @@ impl ChannelActorState {
to_local_amount, to_remote_amount, tlc_id, reason);
}
self.tlc_state.apply_remove_tlc(tlc_id);
debug!(
"Removed tlc payment_hash {:?} with reason {:?}",
current.payment_hash, reason
);

Ok((current.clone(), reason))
}
Expand Down Expand Up @@ -4637,10 +4751,10 @@ impl ChannelActorState {
local: local_commitment_number,
remote: remote_commitment_number,
} = tlc.get_commitment_numbers();
debug!(
"Local commitment number: {}, remote commitment number: {}",
local_commitment_number, remote_commitment_number
);
// debug!(
// "Local commitment number: {}, remote commitment number: {}",
// local_commitment_number, remote_commitment_number
// );
let local_pubkey = derive_tlc_pubkey(
&self.get_local_channel_public_keys().tlc_base_key,
&self.get_local_commitment_point(remote_commitment_number),
Expand Down
Loading

0 comments on commit 5792caf

Please sign in to comment.