Skip to content

Commit

Permalink
Implement updating channel id while handling accepting channels
Browse files Browse the repository at this point in the history
  • Loading branch information
contrun committed Apr 18, 2024
1 parent 5c73fe3 commit 1db7d68
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 8 deletions.
21 changes: 15 additions & 6 deletions src/ckb/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,21 @@ use tokio::sync::mpsc::error::TrySendError;

use std::fmt::Debug;

use crate::ckb::NetworkActorEvent;

use super::{
network::{OpenChannelCommand, PCNMessageWithPeerId},
serde_utils::EntityWrapperBase64,
types::{
AcceptChannel, ChannelReady, CommitmentSigned, Hash256, OpenChannel, PCNMessage, Privkey,
Pubkey, TxAdd, TxCollaborationMsg, TxComplete, TxRemove,
},
NetworkActorCommand, NetworkActorMessage,
NetworkActorCommand, NetworkActorEvent, NetworkActorMessage,
};

pub enum ChannelActorMessage {
/// Command are the messages that are sent to the channel actor to perform some action.
/// It is normally generated from a user request.
Command(ChannelCommand),
/// PeerMessage are the messages sent from the peer.
PeerMessage(PCNMessage),
}

Expand Down Expand Up @@ -403,13 +404,13 @@ impl Actor for ChannelActor {

async fn handle(
&self,
_myself: ActorRef<Self::Msg>,
myself: ActorRef<Self::Msg>,
message: Self::Msg,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
match message {
ChannelActorMessage::PeerMessage(message) => {
if let Err(error) = state.handle_peer_message(message) {
if let Err(error) = state.handle_peer_message(message, self.network.clone()) {
error!("Error while processing channel message: {:?}", error);
}
}
Expand Down Expand Up @@ -701,13 +702,21 @@ impl ChannelActorState {
pub fn handle_peer_message(
&mut self,
message: PCNMessage,
network: ActorRef<NetworkActorMessage>,
) -> Result<(), ProcessingChannelError> {
match message {
PCNMessage::OpenChannel(_) => {
panic!("OpenChannel message should be processed while prestarting")
}
PCNMessage::AcceptChannel(accept_channel) => {
self.handle_accept_channel_message(accept_channel)
self.handle_accept_channel_message(accept_channel)?;
self.fill_in_channel_id();
network
.send_message(NetworkActorMessage::new_event(
NetworkActorEvent::ChannelAccepted(self.id(), self.temp_id),
))
.expect("network actor alive");
Ok(())
}
PCNMessage::TxAdd(tx) => {
self.handle_tx_collaboration_msg(TxCollaborationMsg::TxAdd(tx))
Expand Down
14 changes: 12 additions & 2 deletions src/ckb/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,10 @@ pub enum NetworkActorEvent {
PeerMessage(PeerId, SessionContext, PCNMessage),

/// Channel related events.
/// A new channel is created and the peer id and actor reference is given here.
ChannelCreated(Hash256, PeerId, ActorRef<ChannelActorMessage>),
/// A channel has been accepted. The two Hash256 are respectively newly agreed channel id and temp channel id.
ChannelAccepted(Hash256, Hash256),

/// Network service events to be sent to outside observers.
NetworkServiceEvent(NetworkServiceEvent),
Expand Down Expand Up @@ -181,7 +184,7 @@ impl NetworkActor {
debug!("Test message {:?}", test);
}

_ => match state.channels.remove(&message.get_channel_id()) {
_ => match state.channels.get(&message.get_channel_id()) {
None => {
return Err(Error::ChannelNotFound(message.get_channel_id()));
}
Expand Down Expand Up @@ -328,9 +331,10 @@ impl NetworkActorState {
fn on_channel_created(
&mut self,
id: Hash256,
_peer_id: PeerId,
peer_id: PeerId,
actor: ActorRef<ChannelActorMessage>,
) {
debug!("Channel to peer {:?} created: {:?}", &peer_id, &id);
self.channels.insert(id, actor);
}
}
Expand Down Expand Up @@ -408,6 +412,12 @@ impl Actor for NetworkActor {
NetworkActorEvent::ChannelCreated(channel_id, peer_id, actor) => {
state.on_channel_created(channel_id, peer_id, actor)
}
NetworkActorEvent::ChannelAccepted(new, old) => {
assert_ne!(new, old, "new and old channel id must be different");
let channel = state.channels.remove(&old).expect("channel exists");
state.channels.insert(new, channel);
debug!("Channel accepted: {:?} -> {:?}", old, new);
}
NetworkActorEvent::PeerMessage(peer_id, session, message) => {
self.handle_peer_message(myself, state, peer_id, session, message)
.await?
Expand Down

0 comments on commit 1db7d68

Please sign in to comment.