Skip to content

Commit

Permalink
Simplify channel message handling
Browse files Browse the repository at this point in the history
  • Loading branch information
contrun committed Apr 18, 2024
1 parent 074f542 commit 5c73fe3
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 69 deletions.
117 changes: 58 additions & 59 deletions src/ckb/channel.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use bitflags::bitflags;
use ckb_hash::{blake2b_256, new_blake2b};
use ckb_types::packed::{OutPoint, Transaction};
use log::{debug, error, info};
use log::{debug, error, info, warn};
use molecule::prelude::Entity;
use musig2::SecNonce;
use ractor::{async_trait as rasync_trait, Actor, ActorProcessingErr, ActorRef};
Expand Down Expand Up @@ -137,23 +137,6 @@ impl ChannelActor {
Ok(())
}

pub fn handle_peer_message(
&self,
state: &mut ChannelActorState,
message: PCNMessage,
) -> Result<(), ProcessingChannelError> {
match message {
PCNMessage::OpenChannel(_) => {
panic!("OpenChannel message should be processed while prestarting")
}
PCNMessage::AcceptChannel(accept_channel) => {
state.step(ChannelEvent::AcceptChannel(accept_channel))?;
}
_ => {}
}
Ok(())
}

pub fn handle_tx_collaboration_command(
&self,
state: &mut ChannelActorState,
Expand Down Expand Up @@ -426,7 +409,7 @@ impl Actor for ChannelActor {
) -> Result<(), ActorProcessingErr> {
match message {
ChannelActorMessage::PeerMessage(message) => {
if let Err(error) = self.handle_peer_message(state, message) {
if let Err(error) = state.handle_peer_message(message) {
error!("Error while processing channel message: {:?}", error);
}
}
Expand Down Expand Up @@ -503,12 +486,6 @@ fn blake2b_hash_with_salt(data: &[u8], salt: &[u8]) -> [u8; 32] {
result
}

impl ChannelActorState {
pub fn id(&self) -> Hash256 {
self.id.unwrap_or(self.temp_id)
}
}

#[derive(PartialEq, Eq, Clone, Debug)]
pub struct ClosedChannel {}

Expand Down Expand Up @@ -717,6 +694,62 @@ impl ChannelActorState {
}
}

pub fn id(&self) -> Hash256 {
self.id.unwrap_or(self.temp_id)
}

pub fn handle_peer_message(
&mut self,
message: PCNMessage,
) -> Result<(), ProcessingChannelError> {
match message {
PCNMessage::OpenChannel(_) => {
panic!("OpenChannel message should be processed while prestarting")
}
PCNMessage::AcceptChannel(accept_channel) => {
self.handle_accept_channel_message(accept_channel)
}
PCNMessage::TxAdd(tx) => {
self.handle_tx_collaboration_msg(TxCollaborationMsg::TxAdd(tx))
}
PCNMessage::TxRemove(tx) => {
self.handle_tx_collaboration_msg(TxCollaborationMsg::TxRemove(tx))
}
PCNMessage::TxComplete(tx) => {
self.handle_tx_collaboration_msg(TxCollaborationMsg::TxComplete(tx))
}
PCNMessage::CommitmentSigned(commitment_signed) => {
self.handle_commitment_signed_message(commitment_signed)
}
PCNMessage::ChannelReady(channel_ready) => {
match self.state {
ChannelState::FundingNegotiated => {
self.state = ChannelState::AwaitingChannelReady(
AwaitingChannelReadyFlags::OUR_CHANNEL_READY,
);
}
ChannelState::AwaitingChannelReady(
AwaitingChannelReadyFlags::THEIR_CHANNEL_READY,
) => {
self.state = ChannelState::ChannelReady(ChannelReadyFlags::empty());
}
_ => {
return Err(ProcessingChannelError::InvalidState(
"received ChannelReady message, but we're not ready for ChannelReady"
.to_string(),
));
}
}
debug!("ChannelReady: {:?}", &channel_ready);
Ok(())
}
_ => {
warn!("Received unsupported message: {:?}", &message);
Ok(())
}
}
}

pub fn handle_accept_channel_message(
&mut self,
accept_channel: AcceptChannel,
Expand Down Expand Up @@ -841,40 +874,6 @@ impl ChannelActorState {
self.id = Some(channel_id);
}

pub fn step(&mut self, event: ChannelEvent) -> ProcessingChannelResult {
match event {
ChannelEvent::AcceptChannel(accept_channel) => {
self.handle_accept_channel_message(accept_channel)
}
ChannelEvent::TxCollaborationMsg(msg) => self.handle_tx_collaboration_msg(msg),
ChannelEvent::CommitmentSigned(commitment_signed) => {
self.handle_commitment_signed_message(commitment_signed)
}
ChannelEvent::ChannelReady(channel_ready) => {
match self.state {
ChannelState::FundingNegotiated => {
self.state = ChannelState::AwaitingChannelReady(
AwaitingChannelReadyFlags::OUR_CHANNEL_READY,
);
}
ChannelState::AwaitingChannelReady(
AwaitingChannelReadyFlags::THEIR_CHANNEL_READY,
) => {
self.state = ChannelState::ChannelReady(ChannelReadyFlags::empty());
}
_ => {
return Err(ProcessingChannelError::InvalidState(
"received ChannelReady message, but we're not ready for ChannelReady"
.to_string(),
));
}
}
debug!("ChannelReady: {:?}", &channel_ready);
Ok(())
}
}
}

pub fn is_funded(&self) -> bool {
match self.state {
ChannelState::ChannelReady(_) => {
Expand Down
14 changes: 4 additions & 10 deletions src/ckb/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,21 +181,15 @@ impl NetworkActor {
debug!("Test message {:?}", test);
}

PCNMessage::AcceptChannel(m) => match state.channels.remove(&m.channel_id) {
_ => match state.channels.remove(&message.get_channel_id()) {
None => {
return Err(Error::ChannelNotFound(m.channel_id));
return Err(Error::ChannelNotFound(message.get_channel_id()));
}
Some(c) => {
c.send_message(ChannelActorMessage::PeerMessage(PCNMessage::AcceptChannel(
m,
)))
.expect("channel actor alive");
c.send_message(ChannelActorMessage::PeerMessage(message))
.expect("channel actor alive");
}
},

_ => {
error!("Message handling for {:?} unimplemented", message);
}
};
Ok(())
}
Expand Down
25 changes: 25 additions & 0 deletions src/ckb/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1004,6 +1004,31 @@ pub enum PCNMessage {
RemoveTlc(RemoveTlc),
}

impl PCNMessage {
pub fn get_channel_id(&self) -> Hash256 {
match &self {
PCNMessage::TestMessage(_) => unreachable!(),
PCNMessage::OpenChannel(open_channel) => open_channel.channel_id,
PCNMessage::AcceptChannel(accept_channel) => accept_channel.channel_id,
PCNMessage::CommitmentSigned(commitment_signed) => commitment_signed.channel_id,
PCNMessage::TxSignatures(tx_signatures) => tx_signatures.channel_id,
PCNMessage::ChannelReady(channel_ready) => channel_ready.channel_id,
PCNMessage::TxAdd(tx_add) => tx_add.channel_id,
PCNMessage::TxRemove(tx_remove) => tx_remove.channel_id,
PCNMessage::TxComplete(tx_complete) => tx_complete.channel_id,
PCNMessage::TxAbort(tx_abort) => tx_abort.channel_id,
PCNMessage::TxInitRBF(tx_init_rbf) => tx_init_rbf.channel_id,
PCNMessage::TxAckRBF(tx_ack_rbf) => tx_ack_rbf.channel_id,
PCNMessage::Shutdown(shutdown) => shutdown.channel_id,
PCNMessage::ClosingSigned(closing_signed) => closing_signed.channel_id,
PCNMessage::AddTlc(add_tlc) => add_tlc.channel_id,
PCNMessage::TlcsSigned(tlcs_signed) => tlcs_signed.channel_id,
PCNMessage::RevokeAndAck(revoke_and_ack) => revoke_and_ack.channel_id,
PCNMessage::RemoveTlc(remove_tlc) => remove_tlc.channel_id,
}
}
}

impl From<PCNMessage> for molecule_pcn::PCNMessageUnion {
fn from(pcn_message: PCNMessage) -> Self {
match pcn_message {
Expand Down

0 comments on commit 5c73fe3

Please sign in to comment.