diff --git a/.vscode/launch.json b/.vscode/launch.json index ad700c9..67d34b1 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -21,6 +21,7 @@ "args": [], "cwd": "${workspaceFolder}", "env": { + "PSN_MGMT_PORT": "1918", "PSN_BROKER_INBOUND_HTTP_SERVER_ACCESSIBLE_ADDRESS_PREFIX": "http://example.com", "RUST_LOG": "service_network=trace,service_broker=trace" } diff --git a/Cargo.lock b/Cargo.lock index d91e488..7310618 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3964,6 +3964,25 @@ dependencies = [ "rustc-hex", ] +[[package]] +name = "rust-fsm" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b9ee4731a0f53c973772b83c43c57a26e146b6fa024af5aeab972b63d678b65" +dependencies = [ + "rust-fsm-dsl", +] + +[[package]] +name = "rust-fsm-dsl" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44237c429621e3606374941c3061fe95686bdaddb9b4f6524e4edc2d21da9c58" +dependencies = [ + "quote 1.0.21", + "syn 1.0.99", +] + [[package]] name = "rustc-demangle" version = "0.1.21" @@ -4270,6 +4289,7 @@ dependencies = [ "prost 0.11.0", "rand 0.8.5", "reqwest", + "rust-fsm", "serde", "serde_json", "tokio", diff --git a/Cargo.toml b/Cargo.toml index d54d038..c5503f0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,5 +30,6 @@ phactory-api = { git = "https://github.com/Phala-Network/phala-blockchain", vers prost = "0.11.0" fast-socks5 = { git = "https://github.com/Phala-Network/fast-socks5" } trust-dns-resolver = { version = "0.22.0", features = ["dns-over-https-rustls"] } +rust-fsm = "0.6.0" [workspace] diff --git a/src/bin/service_worker/main.rs b/src/bin/service_worker/main.rs index ad566a2..067a729 100644 --- a/src/bin/service_worker/main.rs +++ b/src/bin/service_worker/main.rs @@ -1,23 +1,21 @@ +use crate::runtime::*; +use crate::WorkerRuntimeChannelMessage::*; use anyhow::Result; use env_logger::{Builder as LoggerBuilder, Target}; +use futures::future::try_join_all; use log::{debug, info, warn}; use mdns_sd::{ServiceDaemon, ServiceInfo}; use phactory_api::pruntime_client::{new_pruntime_client, PRuntimeClient}; use service_network::config::{PeerConfig, PeerRole}; -use service_network::peer::{ - my_ipv4_interfaces, BrokerPeerUpdate, BrokerPeerUpdateReceiver, BrokerPeerUpdateSender, - SERVICE_PSN_LOCAL_WORKER, -}; +use service_network::peer::broker::BrokerPeerUpdate; +use service_network::peer::local_worker::{BrokerPeerUpdateReceiver, BrokerPeerUpdateSender}; +use service_network::peer::{my_ipv4_interfaces, SERVICE_PSN_LOCAL_WORKER}; use service_network::runtime::AsyncRuntimeContext; -use service_network::worker::runtime::WorkerRuntimeChannelMessage::{ - ShouldUpdateInfo, ShouldUpdateStatus, -}; -use service_network::worker::runtime::{ - WorkerRuntime, WorkerRuntimeChannelMessage, WorkerRuntimeStatus, WrappedWorkerRuntime, -}; use std::collections::HashMap; use std::net::Ipv4Addr; -use tokio::sync::mpsc::{Receiver, Sender}; +use tokio::sync::mpsc::Sender; + +pub mod runtime; #[macro_use] extern crate lazy_static; @@ -51,111 +49,68 @@ async fn main() { ); debug!("Staring local worker broker with config: {:?}", &*CONFIG); - let (update_best_peer_tx, update_best_peer_rx) = tokio::sync::mpsc::channel(32); + let (update_best_peer_tx, _) = tokio::sync::mpsc::channel(1024); let (rt_tx, rt_rx) = tokio::sync::mpsc::channel(1024); - let handle1 = tokio::spawn(handle_update_best_peer(update_best_peer_rx, CONFIG.clone())); - let handle2 = tokio::spawn(runtime(update_best_peer_tx.clone(), rt_tx.clone(), rt_rx)); - - let wr = WR.read().await; - let pr = wr.prc; - let info = pr - .get_info(()) - .await - .expect("Failed to get info from pruntime"); - drop(wr); - let _ = rt_tx - .clone() - .send(WorkerRuntimeChannelMessage::ShouldUpdateInfo(info)) - .await; - - let _ = tokio::join!(handle1, handle2); -} + let async_handles = vec![ + tokio::spawn(handle_runtime_events( + update_best_peer_tx.clone(), + rt_tx.clone(), + rt_rx, + )), + tokio::spawn(check_pruntime_health(rt_tx.clone())), + ]; -async fn local_worker( - tx: BrokerPeerUpdateSender, - rt_tx: Sender, - ctx: &AsyncRuntimeContext, -) { - let mdns = ServiceDaemon::new().expect("Failed to create daemon"); - let pm = &RT_CTX.peer_manager; - register_service(&mdns, &ctx).await; - pm.browse_brokers(&mdns, tx.clone(), rt_tx.clone(), &RT_CTX) - .await; + try_join_all(async_handles).await.expect("main failed"); } -async fn runtime( +async fn handle_runtime_events( tx: BrokerPeerUpdateSender, - rt_tx: Sender, - mut rx: Receiver, + rt_tx: WorkerRuntimeChannelMessageSender, + mut rx: WorkerRuntimeChannelMessageReceiver, ) { while let Some(msg) = rx.recv().await { match msg { - ShouldUpdateInfo(i) => { + ShouldSetPRuntimeFailed(msg) => { + let wr = WR.read().await; + wr.handle_pruntime_failure(rt_tx.clone(), msg).await; + drop(wr); + } + ShouldSetBrokerFailed(msg) => { + let wr = WR.read().await; + wr.handle_broker_failure(rt_tx.clone(), msg).await; + drop(wr); + } + ShouldUpdateInfo(info) => { let mut wr = WR.write().await; - wr.handle_update_info(i); - if matches!(wr.status, WorkerRuntimeStatus::Starting) { - let rt_tx = rt_tx.clone(); - let _ = rt_tx - .send(WorkerRuntimeChannelMessage::ShouldUpdateStatus( - WorkerRuntimeStatus::WaitingForBroker, - )) - .await; - } + wr.handle_update_info(rt_tx.clone(), info).await; drop(wr); } ShouldUpdateStatus(s) => { let mut wr = WR.write().await; - wr.handle_update_status(s); - if matches!(wr.status, WorkerRuntimeStatus::WaitingForBroker) { - tokio::spawn(local_worker(tx.clone(), rt_tx.clone(), &RT_CTX)); - } + let pr = wr.prc; + wr.handle_update_status(s, tx.clone(), rt_tx.clone(), pr) + .await; drop(wr); } - } - } -} - -async fn set_pruntime_network_with_peer(socks_url: String, _config: PeerConfig) -> Result<()> { - debug!( - "Trying to set pRuntime outbound with {}", - socks_url.as_str() - ); - - Ok(()) -} - -async fn handle_update_best_peer(mut rx: BrokerPeerUpdateReceiver, config: PeerConfig) { - while let Some(u) = rx.recv().await { - match u { - BrokerPeerUpdate::PeerStatusChanged(name, status) => { - info!("Broker {} changed its status to {:?}", name, status); - } - BrokerPeerUpdate::BestPeerChanged(instance_name, socks_url) => { - // todo: needs to be throttled - match tokio::spawn(set_pruntime_network_with_peer( - socks_url.to_string(), - config.clone(), - )) - .await - { - Ok(_) => { - info!( - "Updated pRuntime networking with ({}):{}", - instance_name.as_str(), - socks_url.as_str() - ); - } - Err(err) => { - warn!("Failed to set pRuntime networking: {:?}", err); - } - } + ShouldLockBroker(peer) => { + let mut wr = WR.write().await; + wr.handle_lock_peer(peer.clone(), rt_tx.clone()).await; + drop(wr); } - _ => {} } } } +// async fn handle_peer_update( +// mut rx: BrokerPeerUpdateReceiver, +// rt_tx: WorkerRuntimeChannelMessageSender, +// ) { +// while let Some(u) = rx.recv().await { +// let _ = rt_tx.clone().send(ShouldUpdateBrokerPeer(u)).await; +// } +// } + async fn register_service(mdns: &ServiceDaemon, _ctx: &AsyncRuntimeContext) { let common_config = &CONFIG.common; let _worker_config = CONFIG.local_worker(); diff --git a/src/bin/service_worker/runtime.rs b/src/bin/service_worker/runtime.rs new file mode 100644 index 0000000..b8cf03a --- /dev/null +++ b/src/bin/service_worker/runtime.rs @@ -0,0 +1,387 @@ +use crate::WorkerRuntimeStatus::*; +use crate::{ShouldLockBroker, ShouldSetBrokerFailed, ShouldUpdateStatus, RT_CTX, WR}; +use anyhow::{anyhow, Context, Result}; +use log::{debug, error, info, warn}; +use mdns_sd::ServiceDaemon; +use phactory_api::prpc::{NetworkConfig, PhactoryInfo}; +use phactory_api::pruntime_client::PRuntimeClient; +use service_network::peer::local_worker::{BrokerPeerUpdateSender, WrappedBrokerPeer}; +use service_network::runtime::AsyncRuntimeContext; +use std::fmt::Debug; +use std::net::Ipv4Addr; +use std::str::FromStr; +use std::sync::Arc; +use tokio::sync::mpsc::{Receiver, Sender}; +use tokio::sync::RwLock; +use tokio::time::{sleep, Duration}; + +#[derive(Debug, Clone)] +pub enum WorkerRuntimeChannelMessage { + ShouldUpdateInfo(PhactoryInfo), + ShouldSetPRuntimeFailed(String), + ShouldSetBrokerFailed(String), + ShouldUpdateStatus(WorkerRuntimeStatus), + ShouldLockBroker(WrappedBrokerPeer), +} + +pub type WorkerRuntimeChannelMessageSender = Sender; +pub type WorkerRuntimeChannelMessageReceiver = Receiver; + +#[derive(Debug, Clone)] +pub enum WorkerRuntimeStatus { + Starting, + WaitingForBroker, + PendingProvision, + Started, + Failed(WorkerRuntimeFailReason), +} + +#[derive(Debug, Clone)] +pub struct WorkerRuntimeFailReason { + pub broker: Option, + pub pr: Option, +} + +pub type WrappedWorkerRuntime = Arc>; + +pub struct WorkerRuntime { + pub rt_ctx: &'static AsyncRuntimeContext, + pub prc: &'static PRuntimeClient, + pub status: WorkerRuntimeStatus, + pub initial_info: Option, + pub last_info: Option, + pub pr_rpc_port: u32, + pub peer_browser_started: bool, + pub best_broker: Option, + pub best_broker_name: Option, +} + +impl WorkerRuntime { + pub fn new(rt_ctx: &'static AsyncRuntimeContext, prc: &'static PRuntimeClient) -> Self { + Self { + rt_ctx, + prc, + status: Starting, + initial_info: None, + last_info: None, + pr_rpc_port: 0, + peer_browser_started: false, + best_broker: None, + best_broker_name: None, + } + } + + pub fn new_wrapped( + rt_ctx: &'static AsyncRuntimeContext, + prc: &'static PRuntimeClient, + ) -> WrappedWorkerRuntime { + Arc::new(RwLock::new(Self::new(rt_ctx, prc))) + } + + pub async fn handle_update_info( + &mut self, + rt_tx: WorkerRuntimeChannelMessageSender, + info: PhactoryInfo, + ) { + let ir = (&info).clone(); + let current_status = (&self.status).clone(); + let pr_rpc_port = &ir.network_status.unwrap_or_default().public_rpc_port; + + if pr_rpc_port.is_none() { + self.pr_rpc_port = 0; + let rt_tx = rt_tx.clone(); + let _ = rt_tx + .clone() + .send(WorkerRuntimeChannelMessage::ShouldSetPRuntimeFailed( + "Public port not enabled in pRuntime!".to_string(), + )) + .await; + return; + } + + self.pr_rpc_port = pr_rpc_port.unwrap(); + self.last_info = Some(info.clone()); + + match current_status { + Starting => { + self.initial_info = Some(info); + let rt_tx = rt_tx.clone(); + let _ = rt_tx + .clone() + .send(ShouldUpdateStatus(WaitingForBroker)) + .await; + } + Failed(_) => { + info!("Trying to recover from failure in 6s..."); + sleep(Duration::from_secs(6)).await; + self.recover_from_failure(rt_tx.clone()).await; + } + _ => {} + } + } + + pub async fn recover_from_failure(&self, rt_tx: WorkerRuntimeChannelMessageSender) { + let current_status = (&self.status).clone(); + match current_status { + Failed(_) => { + debug!("Resetting runtime status to Starting..."); + let _ = rt_tx.clone().send(ShouldUpdateStatus(Starting)).await; + } + _ => { + debug!("Triggering `recover_from_failure` from normal status, ignoring."); + } + } + } + + pub async fn handle_lock_peer( + &mut self, + peer: WrappedBrokerPeer, + rt_tx: WorkerRuntimeChannelMessageSender, + ) { + let np = peer.clone(); + let np = np.lock().await; + let name = np.instance_name.clone(); + drop(np); + + self.best_broker = Some(peer.clone()); + self.best_broker_name = Some(name.clone()); + let _ = rt_tx + .clone() + .send(ShouldUpdateStatus(PendingProvision)) + .await; + info!("Locked self to broker {}.", name); + } + + pub async fn handle_update_status( + &mut self, + new_status: WorkerRuntimeStatus, + tx: BrokerPeerUpdateSender, + rt_tx: WorkerRuntimeChannelMessageSender, + pr: &PRuntimeClient, + ) { + let old_status = (&self.status).clone(); + info!( + "Changing worker status from {:?} to {:?}...", + &old_status, &new_status + ); + match new_status { + Starting => { + info!("Staring worker runtime..."); + } + WaitingForBroker => { + if !(self.peer_browser_started) { + tokio::spawn(start_peer_browser(tx.clone(), &RT_CTX)); + self.peer_browser_started = true; + } + tokio::spawn(wait_for_broker_peer(rt_tx.clone())); + } + PendingProvision => { + // todo: transition state left for sideVM + if let Err(e) = set_pruntime_network_with_peer( + (&self.best_broker).as_ref().unwrap().clone(), + pr, + ) + .await + { + let _ = rt_tx + .clone() + .send(ShouldSetBrokerFailed(format!("{}", e))) + .await; + } else { + let _ = rt_tx.clone().send(ShouldUpdateStatus(Started)).await; + }; + } + Started => {} + Failed(_) => {} + } + self.status = new_status; + } + + pub async fn handle_broker_failure( + &self, + rt_tx: WorkerRuntimeChannelMessageSender, + msg: String, + ) { + let current_status = (&self.status).clone(); + match current_status { + Failed(reason) => { + if (&reason.broker).is_some() { + let current_msg = &reason.broker.clone().unwrap(); + if current_msg.eq(&msg) { + debug!("Ignored coming fail reason from broker: {}", &msg); + return; + } + } + let mut reason = reason.clone(); + reason.broker = Some(msg); + let _ = rt_tx.clone().send(ShouldUpdateStatus(Failed(reason))).await; + } + _ => { + let _ = rt_tx + .clone() + .send(ShouldUpdateStatus(Failed(WorkerRuntimeFailReason { + pr: None, + broker: Some(msg), + }))) + .await; + } + }; + } + + pub async fn handle_pruntime_failure( + &self, + rt_tx: WorkerRuntimeChannelMessageSender, + msg: String, + ) { + let current_status = (&self.status).clone(); + match current_status { + Failed(reason) => { + if (&reason.pr).is_some() { + let current_msg = &reason.pr.clone().unwrap(); + if current_msg.eq(&msg) { + debug!("Ignored coming fail reason from pRuntime: {}", &msg); + return; + } + } + let mut reason = reason.clone(); + reason.pr = Some(msg); + let _ = rt_tx.clone().send(ShouldUpdateStatus(Failed(reason))).await; + } + _ => { + let _ = rt_tx + .clone() + .send(WorkerRuntimeChannelMessage::ShouldUpdateStatus( + WorkerRuntimeStatus::Failed(WorkerRuntimeFailReason { + broker: None, + pr: Some(msg), + }), + )) + .await; + } + }; + } +} + +pub async fn wait_for_broker_peer(rt_tx: WorkerRuntimeChannelMessageSender) { + info!("Starting waiting for broker peer..."); + let mut loop_num: u8 = 0; + loop { + let wr = WR.read().await; + match wr.status { + WorkerRuntimeStatus::WaitingForBroker => { + let pm = &RT_CTX.peer_manager.broker; + let mut pm = pm.lock().await; + let peer = pm.verify_best_instance().await; + drop(pm); + if peer.is_err() { + let e = peer.err().unwrap(); + panic!("Failed to verify selected candidate: {}", e); + } else { + let peer = peer.unwrap(); + if peer.is_some() { + let peer = peer.unwrap(); + loop_num += 1; + if loop_num >= 15 { + let _ = rt_tx.clone().send(ShouldLockBroker(peer)).await; + return; + } + } else { + debug!("Broker not found, waiting..."); + loop_num += 1; + } + } + } + _ => { + return; + } + } + sleep(Duration::from_secs(1)).await; + drop(wr); + } +} + +pub async fn check_pruntime_health(rt_tx: WorkerRuntimeChannelMessageSender) { + info!("Starting pinging pRuntime for health check..."); + loop { + let wr = WR.read().await; + let duration = Duration::from_secs(match wr.status { + WorkerRuntimeStatus::Starting => 1, + WorkerRuntimeStatus::WaitingForBroker => 15, + WorkerRuntimeStatus::PendingProvision => 15, + WorkerRuntimeStatus::Started => 6, + WorkerRuntimeStatus::Failed(_) => 3, + }); + drop(wr); + + sleep(duration).await; + let wr = WR.read().await; + let pr = wr.prc; + let info = pr.get_info(()).await; + drop(wr); + match info { + Ok(info) => { + let _ = rt_tx + .clone() + .send(WorkerRuntimeChannelMessage::ShouldUpdateInfo(info)) + .await; + } + Err(err) => { + let err = format!("{:?}", err); + debug!("Error while fetching info from pRuntime: {:?}", &err); + let _ = rt_tx + .clone() + .send(WorkerRuntimeChannelMessage::ShouldSetPRuntimeFailed(err)) + .await; + } + } + } +} + +pub async fn start_peer_browser(tx: BrokerPeerUpdateSender, ctx: &AsyncRuntimeContext) { + let mdns = ServiceDaemon::new().expect("Failed to create daemon"); + let pm = &RT_CTX.peer_manager; + crate::register_service(&mdns, &ctx).await; + pm.browse_brokers(&mdns, tx.clone(), &RT_CTX).await; +} + +async fn set_pruntime_network_with_peer( + peer: WrappedBrokerPeer, + pr: &PRuntimeClient, +) -> Result<()> { + let p = peer.lock().await; + let si = &p.service_info; + let props = si.get_properties(); + + let oa = props.get("oa").context("Invalid outbound address").unwrap(); + let oa = oa.split(",").map(|str| str.trim()).collect::>(); + let oa = oa.get(0).context("Invalid outbound address").unwrap(); + let oa = oa.split(":").collect::>(); + let oap = *oa.get(1).context("Invalid outbound port").unwrap(); + let oa = *oa.get(0).context("Invalid outbound port").unwrap(); + let oa = if oa.eq("0.0.0.0") { + let addr = si + .get_addresses() + .clone() + .into_iter() + .next() + .context("Invalid outbound address") + .unwrap(); + addr.octets().map(|i| i.to_string()).join(".") + } else { + oa.to_string() + }; + let all_proxy = format!("socks5://{}:{}", oa, oap); + drop(p); + + info!("Setting pRuntime outbound proxy to {}.", &all_proxy); + if let Err(e) = pr + .config_network(NetworkConfig { + all_proxy, + i2p_proxy: "".to_string(), + }) + .await + { + return Err(anyhow!("Failed to configure pRuntime network{:?}", e)); + } + Ok(()) +} diff --git a/src/lib.rs b/src/lib.rs index e88835f..0c55fa3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,4 +2,3 @@ pub mod config; pub mod mgmt; pub mod peer; pub mod runtime; -pub mod worker; diff --git a/src/peer.rs b/src/peer.rs deleted file mode 100644 index a1addf9..0000000 --- a/src/peer.rs +++ /dev/null @@ -1,331 +0,0 @@ -use anyhow::{anyhow, Context, Result}; -use futures::future::join_all; -use if_addrs::{IfAddr, Ifv4Addr}; -use log::{trace, warn}; -use mdns_sd::{ServiceDaemon, ServiceEvent, ServiceInfo}; -use std::collections::{BTreeMap, HashMap}; -use std::fmt::{Debug, Formatter}; - -use std::sync::Arc; -use std::time::Duration; -use tokio::sync::mpsc::{Receiver as mpsc__Receiver, Sender as mpsc__Sender}; -use tokio::sync::{mpsc::Sender as MpscSender, Mutex, RwLock}; - -use crate::config::{PeerConfig, PeerRole}; -use crate::peer::BrokerPeerUpdate::{BestPeerChanged, PeerStatusChanged}; -use crate::runtime::AsyncRuntimeContext; -use crate::worker::runtime::WorkerRuntimeChannelMessage; - -pub const SERVICE_PSN_LOCAL_WORKER: &str = "_psn-worker._tcp.local."; -pub const SERVICE_PSN_BROKER: &str = "_psn-broker._tcp.local."; -pub const SERVICE_LOCAL_DOMAIN: &str = "local."; - -pub const BROWSE_LOOP_SLEEP_DURATION: Duration = Duration::from_secs(300); -pub const BROWSE_SEARCH_INTERVAL: Duration = Duration::from_secs(120); -pub const BROWSE_RESOLVE_TIMEOUT: Duration = Duration::from_secs(30); - -pub type TxtMap = HashMap; -pub type WrappedPeerManager = Arc>; -pub type WrappedBrokerPeer = Arc>; -pub type BrokerPeerUpdateSender = mpsc__Sender; -pub type BrokerPeerUpdateReceiver = mpsc__Receiver; -pub type PeerStatusChangeCb = Box; - -#[derive(Debug, Clone)] -pub enum BrokerPeerUpdate { - None, - PeerStatusChanged(String, PeerStatus), - BestPeerChanged(String, String), -} - -#[derive(Debug)] -pub struct BrokerPeerManager { - pub map: BTreeMap>>, - pub best_instance_name: Option, -} -impl BrokerPeerManager { - pub fn new() -> Self { - let map = BTreeMap::new(); - Self { - map, - best_instance_name: None, - } - } - async fn update_peer_with_service_info( - &mut self, - service_info: ServiceInfo, - tx: BrokerPeerUpdateSender, - _ctx: &AsyncRuntimeContext, - ) -> Result<()> { - trace!("Trying to update broker with service info: {service_info:?}"); - let props = &service_info.get_properties(); - let instance_name = props.get("in").context("Invalid instance name").unwrap(); - if instance_name.len() <= 0 { - return Err(anyhow!("Invalid instance name")); - } - let id = props - .get("i") - .context("Unexpected empty instance id") - .unwrap(); - if id.len() <= 0 { - return Err(anyhow!("Unexpected empty instance id")); - } - let cost = props.get("c").context("Invalid cost").unwrap(); - let cost = atoi::atoi::(cost.as_bytes()); - if cost.is_none() { - return Err(anyhow!( - "Peer candidate {}:{} has invalid cost", - instance_name, - id - )); - } - let cost = cost.unwrap(); - let mgmt_port = service_info.get_port(); - let mgmt_addr = service_info.get_addresses(); - let mgmt_addr = mgmt_addr.iter().next().context("Invalid address").unwrap(); - let mgmt_addr = mgmt_addr.to_string(); - let mgmt_addr = format!("http://{}:{}", mgmt_addr, mgmt_port); - - if self.map.contains_key(instance_name) { - let _peer = self.map.get(instance_name).unwrap(); - // TODO: warn for id change - return Ok(()); - } - - let map = &mut self.map; - - let mut new_peer = BrokerPeer::new( - cost, - instance_name, - id, - service_info.clone(), - mgmt_addr, - tx.clone(), - ); - new_peer.start_life_check().await; - let new_peer = Arc::new(Mutex::new(new_peer)); - map.insert(instance_name.to_string(), new_peer.clone()); - - self.set_best_instance(tx.clone()).await; - - Ok(()) - } - - fn get_cached_best(&self) -> Option>> { - match &self.best_instance_name { - Some(n) => Some(self.map.get(n).unwrap().clone()), - None => None, - } - } - - async fn set_best_instance(&mut self, tx: BrokerPeerUpdateSender) -> Result<()> { - if let Some(b) = self.get_best_instance().await { - let b = b.clone(); - let bb = b.lock().await; - let oa = bb.service_info.get_properties(); - let oa = oa.get("oa").context("Invalid outbound address").unwrap(); - self.best_instance_name = Some(bb.instance_name.to_string()); - if let Err(e) = tx - .send(BestPeerChanged( - bb.instance_name.to_string(), - format!("socks5://{}", oa), - )) - .await - { - warn!("[set_best_instance] Failed: {:?}", e) - }; - }; - Ok(()) - } - - async fn get_best_instance(&self) -> Option { - let v = self.map.values(); - if v.len() <= 0 { - return None; - } - let v: Vec<_> = v - .map(|p| async { - let p = p.clone(); - let p = p.lock().await; - let n = p.instance_name.to_string(); - (n, p.cost) - }) - .collect(); - let mut v: Vec<(String, u8)> = join_all(v).await; - v.sort_by(|a, b| a.1.cmp(&b.1)); - let v: Vec<_> = v - .into_iter() - .map(|(n, _cost)| self.map.get(n.as_str())) - .collect(); - Some(v[0].unwrap().clone()) - } -} - -pub trait PeerLifecycle { - fn set_status(&mut self, s: PeerStatus) -> (); -} - -pub struct BrokerPeer { - pub cost: u8, - pub service_info: ServiceInfo, - pub instance_name: String, - pub id: String, - pub status: PeerStatus, - pub mgmt_addr: String, - pub tx: BrokerPeerUpdateSender, -} -impl BrokerPeer { - pub fn new( - cost: u8, - instance_name: &str, - id: &str, - service_info: ServiceInfo, - mgmt_addr: String, - tx: BrokerPeerUpdateSender, - ) -> Self { - Self { - cost, - instance_name: instance_name.to_string(), - id: id.to_string(), - service_info, - mgmt_addr, - status: PeerStatus::Init, - tx, - } - } - async fn set_status(&mut self, s: PeerStatus) { - let ns = s.clone(); - self.status = s; - let tx = &self.tx.clone(); - tx.clone() - .send(PeerStatusChanged(self.instance_name.to_string(), ns)) - .await - .expect("Failed to send PeerStatusChanged") - } - async fn start_life_check(&mut self) { - self.set_status(PeerStatus::Start).await; - } -} -impl Debug for BrokerPeer { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "{}: {:?}", self.instance_name, self.service_info) - } -} - -#[derive(Debug)] -pub struct LocalPeerWorkerManager { - map: BTreeMap, -} -impl LocalPeerWorkerManager { - pub fn new() -> Self { - let map = BTreeMap::new(); - Self { map } - } -} - -#[derive(Debug, Clone)] -pub struct LocalWorkerPeer {} -impl LocalWorkerPeer {} - -#[derive(Debug)] -pub struct PeerManager { - pub my_role: PeerRole, - pub broker: Arc>, - pub local_worker: Arc>, -} -impl PeerManager { - // We'll maintain keepalive connections to peers instead of relying on the existence of DNS-SD records for that there are TTLs of 75 minutes for non-host records per RFC6762 - - fn init(my_role: PeerRole) -> Self { - let broker = BrokerPeerManager::new(); - let broker = Arc::new(Mutex::new(broker)); - let local_worker = LocalPeerWorkerManager::new(); - let local_worker = Arc::new(Mutex::new(local_worker)); - Self { - my_role, - broker, - local_worker, - } - } - - pub fn init_for_broker(_config: &PeerConfig) -> Self { - Self::init(PeerRole::PrBroker(None)) - } - - pub fn init_for_local_worker(_config: &PeerConfig) -> Self { - Self::init(PeerRole::PrLocalWorker(None)) - } - - pub async fn browse_local_workers(&self, mdns: &ServiceDaemon, ctx: &AsyncRuntimeContext) { - let receiver = mdns - .browse(SERVICE_PSN_LOCAL_WORKER) - .expect("Failed to browse"); - - while let Ok(event) = receiver.recv_async().await { - match event { - ServiceEvent::ServiceResolved(info) => { - PeerManager::update_local_worker_peer(ctx, info).await; - } - other_event => { - trace!("[browse_local_workers] {:?}", &other_event); - } - } - } - } - - async fn update_local_worker_peer(_ctx: &AsyncRuntimeContext, service_info: ServiceInfo) { - trace!("[update_local_worker_peer] Trying to update local worker with service info: {service_info:?}"); - // TODO: local routing table for local workers - } - - pub async fn browse_brokers( - &self, - mdns: &ServiceDaemon, - tx: BrokerPeerUpdateSender, - rt_tx: MpscSender, - ctx: &AsyncRuntimeContext, - ) { - let receiver = mdns.browse(SERVICE_PSN_BROKER).expect("Failed to browse"); - - while let Ok(event) = receiver.recv_async().await { - match event { - ServiceEvent::ServiceResolved(info) => { - let broker = &self.broker.clone(); - let mut broker = broker.lock().await; - let tx = tx.clone(); - let _ = broker.update_peer_with_service_info(info, tx, ctx).await; - drop(broker); - } - other_event => { - trace!("[browse_brokers] {:?}", &other_event); - } - } - } - } -} - -#[derive(Debug, Clone)] -pub enum PeerStatus { - Init, - Start, - Alive, - Aiding, - Dead, -} - -pub fn my_ipv4_interfaces() -> Vec { - if_addrs::get_if_addrs() - .unwrap_or_default() - .into_iter() - .filter_map(|i| { - if i.is_loopback() { - None - } else { - match i.addr { - IfAddr::V4(ifv4) => Some(ifv4), - _ => None, - } - } - }) - .collect() -} diff --git a/src/peer/broker.rs b/src/peer/broker.rs new file mode 100644 index 0000000..8ce50a8 --- /dev/null +++ b/src/peer/broker.rs @@ -0,0 +1,177 @@ +use crate::peer::PeerStatus; +use anyhow::{anyhow, Context, Result}; +use futures::future::join_all; +use log::{trace, warn}; +use mdns_sd::ServiceInfo; +use std::collections::BTreeMap; +use std::fmt::{Debug, Formatter}; + +use crate::peer::local_worker::{BrokerPeerUpdateSender, WrappedBrokerPeer}; +use std::sync::Arc; +use tokio::sync::Mutex; + +use crate::runtime::AsyncRuntimeContext; + +#[derive(Debug)] +pub struct BrokerPeerManager { + pub map: BTreeMap>>, + pub best_instance_name: Option, +} + +impl BrokerPeerManager { + pub fn new() -> Self { + let map = BTreeMap::new(); + Self { + map, + best_instance_name: None, + } + } + pub(crate) async fn update_peer_with_service_info( + &mut self, + service_info: ServiceInfo, + tx: BrokerPeerUpdateSender, + _ctx: &AsyncRuntimeContext, + ) -> Result<()> { + trace!("Trying to update broker with service info: {service_info:?}"); + let props = &service_info.get_properties(); + let instance_name = props.get("in").context("Invalid instance name").unwrap(); + if instance_name.len() <= 0 { + return Err(anyhow!("Invalid instance name")); + } + let id = props + .get("i") + .context("Unexpected empty instance id") + .unwrap(); + if id.len() <= 0 { + return Err(anyhow!("Unexpected empty instance id")); + } + let cost = props.get("c").context("Invalid cost").unwrap(); + let cost = atoi::atoi::(cost.as_bytes()); + if cost.is_none() { + return Err(anyhow!( + "Peer candidate {}:{} has invalid cost", + instance_name, + id + )); + } + let cost = cost.unwrap(); + let mgmt_port = service_info.get_port(); + let mgmt_addr = service_info.get_addresses(); + let mgmt_addr = mgmt_addr.iter().next().context("Invalid address").unwrap(); + let mgmt_addr = mgmt_addr.to_string(); + let mgmt_addr = format!("http://{}:{:?}", mgmt_addr, mgmt_port); + + if self.map.contains_key(instance_name) { + let _peer = self.map.get(instance_name).unwrap(); + // TODO: warn for id change + return Ok(()); + } + + let map = &mut self.map; + + let mut new_peer = BrokerPeer::new( + cost, + instance_name, + id, + service_info.clone(), + mgmt_addr, + tx.clone(), + ); + new_peer.start_life_check().await; + let new_peer = Arc::new(Mutex::new(new_peer)); + map.insert(instance_name.to_string(), new_peer.clone()); + + Ok(()) + } + + fn get_cached_best(&self) -> Option>> { + match &self.best_instance_name { + Some(n) => Some(self.map.get(n).unwrap().clone()), + None => None, + } + } + + pub async fn verify_best_instance(&mut self) -> Result> { + if let Some(b) = self.get_best_instance().await { + let b = b.clone(); + let bb = b.lock().await; + let oa = bb.service_info.get_properties(); + let _ = oa.get("oa").context("Invalid outbound address").unwrap(); + self.best_instance_name = Some(bb.instance_name.to_string()); + return Ok(Some(b.clone())); + }; + Ok(None) + } + + async fn get_best_instance(&self) -> Option { + let v = self.map.values(); + if v.len() <= 0 { + return None; + } + let v: Vec<_> = v + .map(|p| async { + let p = p.clone(); + let p = p.lock().await; + let n = p.instance_name.to_string(); + (n, p.cost) + }) + .collect(); + let mut v: Vec<(String, u8)> = join_all(v).await; + v.sort_by(|a, b| a.1.cmp(&b.1)); + let v: Vec<_> = v + .into_iter() + .map(|(n, _cost)| self.map.get(n.as_str())) + .collect(); + Some(v[0].unwrap().clone()) + } +} + +pub struct BrokerPeer { + pub cost: u8, + pub service_info: ServiceInfo, + pub instance_name: String, + pub id: String, + pub status: PeerStatus, + pub mgmt_addr: String, + pub tx: BrokerPeerUpdateSender, +} + +impl BrokerPeer { + pub fn new( + cost: u8, + instance_name: &str, + id: &str, + service_info: ServiceInfo, + mgmt_addr: String, + tx: BrokerPeerUpdateSender, + ) -> Self { + Self { + cost, + instance_name: instance_name.to_string(), + id: id.to_string(), + service_info, + mgmt_addr, + status: PeerStatus::Init, + tx, + } + } + async fn set_status(&mut self, s: PeerStatus) { + self.status = s; + } + async fn start_life_check(&mut self) { + self.set_status(PeerStatus::Start).await; + } +} + +impl Debug for BrokerPeer { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{}: {:?}", self.instance_name, self.service_info) + } +} + +#[derive(Debug, Clone)] +pub enum BrokerPeerUpdate { + None, + PeerStatusChanged(String, PeerStatus), + BestPeerChanged(String, String), +} diff --git a/src/peer/local_worker.rs b/src/peer/local_worker.rs new file mode 100644 index 0000000..39ee04d --- /dev/null +++ b/src/peer/local_worker.rs @@ -0,0 +1,26 @@ +use crate::peer::broker::{BrokerPeer, BrokerPeerUpdate}; +use std::collections::BTreeMap; +use std::sync::Arc; +use tokio::sync::mpsc::{Receiver as mpsc__Receiver, Sender as mpsc__Sender}; +use tokio::sync::Mutex; + +#[derive(Debug)] +pub struct LocalPeerWorkerManager { + map: BTreeMap, +} + +impl LocalPeerWorkerManager { + pub fn new() -> Self { + let map = BTreeMap::new(); + Self { map } + } +} + +#[derive(Debug, Clone)] +pub struct LocalWorkerPeer {} + +impl LocalWorkerPeer {} + +pub type WrappedBrokerPeer = Arc>; +pub type BrokerPeerUpdateSender = mpsc__Sender; +pub type BrokerPeerUpdateReceiver = mpsc__Receiver; diff --git a/src/peer/mod.rs b/src/peer/mod.rs new file mode 100644 index 0000000..48b3ec5 --- /dev/null +++ b/src/peer/mod.rs @@ -0,0 +1,136 @@ +pub mod broker; +pub mod local_worker; + +use crate::peer::broker::{BrokerPeer, BrokerPeerManager}; +use anyhow::{anyhow, Context, Result}; +use futures::future::join_all; +use if_addrs::{IfAddr, Ifv4Addr}; +use log::{trace, warn}; +use mdns_sd::{ServiceDaemon, ServiceEvent, ServiceInfo}; +use std::collections::{BTreeMap, HashMap}; +use std::fmt::{Debug, Formatter}; + +use broker::BrokerPeerUpdate; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::mpsc::{Receiver as mpsc__Receiver, Sender as mpsc__Sender}; +use tokio::sync::{mpsc::Sender as MpscSender, Mutex, RwLock}; + +use crate::config::{PeerConfig, PeerRole}; +use crate::runtime::AsyncRuntimeContext; +use broker::BrokerPeerUpdate::{BestPeerChanged, PeerStatusChanged}; +use local_worker::{BrokerPeerUpdateSender, LocalPeerWorkerManager}; + +pub const SERVICE_PSN_LOCAL_WORKER: &str = "_psn-worker._tcp.local."; +pub const SERVICE_PSN_BROKER: &str = "_psn-broker._tcp.local."; +pub const SERVICE_LOCAL_DOMAIN: &str = "local."; + +pub const BROWSE_LOOP_SLEEP_DURATION: Duration = Duration::from_secs(300); +pub const BROWSE_SEARCH_INTERVAL: Duration = Duration::from_secs(120); +pub const BROWSE_RESOLVE_TIMEOUT: Duration = Duration::from_secs(30); + +pub type TxtMap = HashMap; +pub type WrappedPeerManager = Arc>; +pub type PeerStatusChangeCb = Box; + +#[derive(Debug)] +pub struct PeerManager { + pub my_role: PeerRole, + pub broker: Arc>, + pub local_worker: Arc>, +} +impl PeerManager { + // We'll maintain keepalive connections to peers instead of relying on the existence of DNS-SD records for that there are TTLs of 75 minutes for non-host records per RFC6762 + + fn init(my_role: PeerRole) -> Self { + let broker = BrokerPeerManager::new(); + let broker = Arc::new(Mutex::new(broker)); + let local_worker = LocalPeerWorkerManager::new(); + let local_worker = Arc::new(Mutex::new(local_worker)); + Self { + my_role, + broker, + local_worker, + } + } + + pub fn init_for_broker(_config: &PeerConfig) -> Self { + Self::init(PeerRole::PrBroker(None)) + } + + pub fn init_for_local_worker(_config: &PeerConfig) -> Self { + Self::init(PeerRole::PrLocalWorker(None)) + } + + pub async fn browse_local_workers(&self, mdns: &ServiceDaemon, ctx: &AsyncRuntimeContext) { + let receiver = mdns + .browse(SERVICE_PSN_LOCAL_WORKER) + .expect("Failed to browse"); + + while let Ok(event) = receiver.recv_async().await { + match event { + ServiceEvent::ServiceResolved(info) => { + PeerManager::update_local_worker_peer(ctx, info).await; + } + other_event => { + trace!("[browse_local_workers] {:?}", &other_event); + } + } + } + } + + async fn update_local_worker_peer(_ctx: &AsyncRuntimeContext, service_info: ServiceInfo) { + trace!("[update_local_worker_peer] Trying to update local worker with service info: {service_info:?}"); + // TODO: local routing table for local workers + } + + pub async fn browse_brokers( + &self, + mdns: &ServiceDaemon, + tx: BrokerPeerUpdateSender, + ctx: &AsyncRuntimeContext, + ) { + let receiver = mdns.browse(SERVICE_PSN_BROKER).expect("Failed to browse"); + + while let Ok(event) = receiver.recv_async().await { + match event { + ServiceEvent::ServiceResolved(info) => { + let broker = &self.broker.clone(); + let mut broker = broker.lock().await; + let tx = tx.clone(); + let _ = broker.update_peer_with_service_info(info, tx, ctx).await; + drop(broker); + } + other_event => { + trace!("[browse_brokers] {:?}", &other_event); + } + } + } + } +} + +#[derive(Debug, Clone)] +pub enum PeerStatus { + Init, + Start, + Alive, + Aiding, + Dead, +} + +pub fn my_ipv4_interfaces() -> Vec { + if_addrs::get_if_addrs() + .unwrap_or_default() + .into_iter() + .filter_map(|i| { + if i.is_loopback() { + None + } else { + match i.addr { + IfAddr::V4(ifv4) => Some(ifv4), + _ => None, + } + } + }) + .collect() +} diff --git a/src/worker/mod.rs b/src/worker/mod.rs deleted file mode 100644 index ea62c41..0000000 --- a/src/worker/mod.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod runtime; diff --git a/src/worker/runtime.rs b/src/worker/runtime.rs deleted file mode 100644 index 6694daa..0000000 --- a/src/worker/runtime.rs +++ /dev/null @@ -1,61 +0,0 @@ -use crate::runtime::AsyncRuntimeContext; -use log::{debug, info}; -use phactory_api::prpc::client::Error; -use phactory_api::prpc::PhactoryInfo; -use phactory_api::pruntime_client::PRuntimeClient; -use std::fmt::Debug; -use std::sync::Arc; -use tokio::sync::mpsc::Sender; -use tokio::sync::RwLock; - -#[derive(Debug, Clone, PartialEq)] -pub enum WorkerRuntimeChannelMessage { - ShouldUpdateInfo(PhactoryInfo), - ShouldUpdateStatus(WorkerRuntimeStatus), -} - -#[derive(Debug, Clone, PartialEq)] -pub enum WorkerRuntimeStatus { - Starting, - WaitingForBroker, - PendingProvision, - Started, - BrokerFailed, -} - -pub type WrappedWorkerRuntime = Arc>; - -pub struct WorkerRuntime { - pub rt_ctx: &'static AsyncRuntimeContext, - pub prc: &'static PRuntimeClient, - pub status: WorkerRuntimeStatus, - pub initial_info: Option, -} - -impl WorkerRuntime { - pub fn new(rt_ctx: &'static AsyncRuntimeContext, prc: &'static PRuntimeClient) -> Self { - Self { - rt_ctx, - prc, - status: WorkerRuntimeStatus::Starting, - initial_info: None, - } - } - - pub fn new_wrapped( - rt_ctx: &'static AsyncRuntimeContext, - prc: &'static PRuntimeClient, - ) -> WrappedWorkerRuntime { - Arc::new(RwLock::new(Self::new(rt_ctx, prc))) - } - - pub fn handle_update_info(&mut self, info: PhactoryInfo) { - info!("Updating info from pRuntime: {:?}", &info); - self.initial_info = Some(info); - } - - pub fn handle_update_status(&mut self, s: WorkerRuntimeStatus) { - info!("Worker status from {:?} changed to {:?}", &self.status, &s); - self.status = s; - } -}