diff --git a/src/ckb/channel.rs b/src/ckb/channel.rs index bd39b899a..c6f8bb9a9 100644 --- a/src/ckb/channel.rs +++ b/src/ckb/channel.rs @@ -13,8 +13,6 @@ use tokio::sync::mpsc::error::TrySendError; use std::fmt::Debug; -use crate::ckb::NetworkActorEvent; - use super::{ network::{OpenChannelCommand, PCNMessageWithPeerId}, serde_utils::EntityWrapperBase64, @@ -22,11 +20,14 @@ use super::{ AcceptChannel, ChannelReady, CommitmentSigned, Hash256, OpenChannel, PCNMessage, Privkey, Pubkey, TxAdd, TxCollaborationMsg, TxComplete, TxRemove, }, - NetworkActorCommand, NetworkActorMessage, + NetworkActorCommand, NetworkActorEvent, NetworkActorMessage, }; pub enum ChannelActorMessage { + /// Command are the messages that are sent to the channel actor to perform some action. + /// It is normally generated from a user request. Command(ChannelCommand), + /// PeerMessage are the messages sent from the peer. PeerMessage(PCNMessage), } @@ -403,13 +404,13 @@ impl Actor for ChannelActor { async fn handle( &self, - _myself: ActorRef, + myself: ActorRef, message: Self::Msg, state: &mut Self::State, ) -> Result<(), ActorProcessingErr> { match message { ChannelActorMessage::PeerMessage(message) => { - if let Err(error) = state.handle_peer_message(message) { + if let Err(error) = state.handle_peer_message(message, self.network.clone()) { error!("Error while processing channel message: {:?}", error); } } @@ -701,13 +702,21 @@ impl ChannelActorState { pub fn handle_peer_message( &mut self, message: PCNMessage, + network: ActorRef, ) -> Result<(), ProcessingChannelError> { match message { PCNMessage::OpenChannel(_) => { panic!("OpenChannel message should be processed while prestarting") } PCNMessage::AcceptChannel(accept_channel) => { - self.handle_accept_channel_message(accept_channel) + self.handle_accept_channel_message(accept_channel)?; + self.fill_in_channel_id(); + network + .send_message(NetworkActorMessage::new_event( + NetworkActorEvent::ChannelAccepted(self.id(), self.temp_id), + )) + .expect("network actor alive"); + Ok(()) } PCNMessage::TxAdd(tx) => { self.handle_tx_collaboration_msg(TxCollaborationMsg::TxAdd(tx)) diff --git a/src/ckb/network.rs b/src/ckb/network.rs index a1e1af148..098f7d79e 100644 --- a/src/ckb/network.rs +++ b/src/ckb/network.rs @@ -88,7 +88,10 @@ pub enum NetworkActorEvent { PeerMessage(PeerId, SessionContext, PCNMessage), /// Channel related events. + /// A new channel is created and the peer id and actor reference is given here. ChannelCreated(Hash256, PeerId, ActorRef), + /// A channel has been accepted. The two Hash256 are respectively newly agreed channel id and temp channel id. + ChannelAccepted(Hash256, Hash256), /// Network service events to be sent to outside observers. NetworkServiceEvent(NetworkServiceEvent), @@ -181,7 +184,7 @@ impl NetworkActor { debug!("Test message {:?}", test); } - _ => match state.channels.remove(&message.get_channel_id()) { + _ => match state.channels.get(&message.get_channel_id()) { None => { return Err(Error::ChannelNotFound(message.get_channel_id())); } @@ -328,9 +331,10 @@ impl NetworkActorState { fn on_channel_created( &mut self, id: Hash256, - _peer_id: PeerId, + peer_id: PeerId, actor: ActorRef, ) { + debug!("Channel to peer {:?} created: {:?}", &peer_id, &id); self.channels.insert(id, actor); } } @@ -408,6 +412,12 @@ impl Actor for NetworkActor { NetworkActorEvent::ChannelCreated(channel_id, peer_id, actor) => { state.on_channel_created(channel_id, peer_id, actor) } + NetworkActorEvent::ChannelAccepted(new, old) => { + assert_ne!(new, old, "new and old channel id must be different"); + let channel = state.channels.remove(&old).expect("channel exists"); + state.channels.insert(new, channel); + debug!("Channel accepted: {:?} -> {:?}", old, new); + } NetworkActorEvent::PeerMessage(peer_id, session, message) => { self.handle_peer_message(myself, state, peer_id, session, message) .await?