Skip to content

Commit

Permalink
feat(broker): local worker peer lifecycle
Browse files Browse the repository at this point in the history
  • Loading branch information
krhougs committed Oct 19, 2022
1 parent 8fcce80 commit 6c558ec
Show file tree
Hide file tree
Showing 15 changed files with 256 additions and 113 deletions.
1 change: 0 additions & 1 deletion .idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,6 @@ prost = "0.11.0"
fast-socks5 = { git = "https://github.com/Phala-Network/fast-socks5" }
trust-dns-resolver = { version = "0.22.0", features = ["dns-over-https-rustls"] }
rust-fsm = "0.6.0"
urlparse = "0.7.3"

[workspace]
40 changes: 24 additions & 16 deletions src/bin/service_broker/local_worker.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
use crate::local_worker::KnownLocalWorkerStatus::*;
use crate::local_worker::LocalWorkerManagerChannelMessage::*;
use crate::mgmt::local_worker::LocalWorkerIdentity;
use log::{error, info, warn};
use crate::LW_MAP;
use log::{debug, error, info, warn};
use service_network::config::{BROKER_PEER_DEAD_COUNT, BROKER_PEER_LOST_COUNT};
use service_network::mgmt_types::LocalWorkerIdentity;
use std::collections::BTreeMap;
use std::net::Ipv4Addr;
use std::str::FromStr;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use std::sync::Arc;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::RwLock;
use urlparse::{urlparse, GetQuery};

pub enum LocalWorkerManagerChannelMessage {
ShouldCheckPeerHealth,
Expand All @@ -17,21 +21,24 @@ pub type LocalWorkerManagerChannelMessageSender = Sender<LocalWorkerManagerChann
pub type LocalWorkerManagerChannelMessageReceiver = Receiver<LocalWorkerManagerChannelMessage>;

pub type LocalWorkerMap = BTreeMap<String, KnownLocalWorker>;
pub type WrappedLocalWorkerMap = Arc<RwLock<LocalWorkerMap>>;

pub async fn local_worker_manager(
_tx: LocalWorkerManagerChannelMessageSender,
mut rx: LocalWorkerManagerChannelMessageReceiver,
) {
let mut lw_vec_keys: Vec<String> = Vec::new();
let mut lw_map: LocalWorkerMap = BTreeMap::new();
let lw_map_lock = LW_MAP.clone();

loop {
while let Some(msg) = rx.recv().await {
match msg {
ShouldCheckPeerHealth => {
check_peer_health(&mut lw_map, &lw_vec_keys);
check_peer_health((&lw_map_lock).clone(), &lw_vec_keys).await;
}
ReceivedKeepAlive(lw) => {
let lw_map = (&lw_map_lock).clone();
let mut lw_map = lw_map.write().await;
let key = lw.public_key.as_str();
let klw = lw_map.get_mut(key);
if let Some(mut klw) = klw {
Expand All @@ -46,6 +53,7 @@ pub async fn local_worker_manager(
} else {
lw_vec_keys = create_local_worker(&mut lw_map, lw);
}
drop(lw_map);
}
}
}
Expand All @@ -62,14 +70,9 @@ fn create_local_worker(lw_map: &mut LocalWorkerMap, lw: LocalWorkerIdentity) ->
} = lw;

let key = public_key.clone();
let address = Ipv4Addr::from_str(address_string.as_str()).expect(
format!(
"Invalid IP address for worker {}: {}",
instance_name.as_str(),
address_string.as_str()
)
.as_str(),
);
let uri = urlparse(address_string.as_str());
let hostname = uri.hostname.unwrap();
let forwarder_port = uri.port.unwrap();

info!(
"Hello new worker({}/{}).",
Expand All @@ -79,7 +82,8 @@ fn create_local_worker(lw_map: &mut LocalWorkerMap, lw: LocalWorkerIdentity) ->

let ret = KnownLocalWorker {
status: Active,
address,
hostname,
forwarder_port,
instance_name,
instance_id,
address_string,
Expand All @@ -88,6 +92,7 @@ fn create_local_worker(lw_map: &mut LocalWorkerMap, lw: LocalWorkerIdentity) ->
lost_count: 0,
lost_mark: false,
};
debug!("KnownLocalWorker: {:?}", &ret);
lw_map.insert(key, ret);
lw_map
.iter()
Expand Down Expand Up @@ -150,7 +155,8 @@ fn update_local_worker(klw: &mut KnownLocalWorker, lw: LocalWorkerIdentity) {
}
}

fn check_peer_health(lw_map: &mut LocalWorkerMap, lw_vec_keys: &Vec<String>) {
async fn check_peer_health(lw_map_lock: WrappedLocalWorkerMap, lw_vec_keys: &Vec<String>) {
let mut lw_map = lw_map_lock.write().await;
let _ = lw_vec_keys.iter().for_each(|k| {
let lw = lw_map.get_mut(k);
if let Some(mut lw) = lw {
Expand Down Expand Up @@ -180,6 +186,7 @@ fn check_peer_health(lw_map: &mut LocalWorkerMap, lw_vec_keys: &Vec<String>) {
}
}
});
drop(lw_map);
}

#[derive(Clone, Debug)]
Expand All @@ -192,7 +199,8 @@ pub enum KnownLocalWorkerStatus {
#[derive(Clone, Debug)]
pub struct KnownLocalWorker {
pub status: KnownLocalWorkerStatus,
pub address: Ipv4Addr,
pub hostname: String,
pub forwarder_port: u16,
pub address_string: String,
pub public_port: u16,
pub public_key: String,
Expand Down
16 changes: 10 additions & 6 deletions src/bin/service_broker/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,22 @@ mod outbound;

use crate::local_worker::{
local_worker_manager, LocalWorkerManagerChannelMessage, LocalWorkerManagerChannelMessageSender,
WrappedLocalWorkerMap,
};
use crate::LocalWorkerManagerChannelMessage::ShouldCheckPeerHealth;
use env_logger::{Builder as LoggerBuilder, Target};
use futures::future::try_join_all;
use log::{debug, info};
use mdns_sd::{ServiceDaemon, ServiceInfo};
use service_network::config::{PeerConfig, PeerRole, BROKER_HEALTH_CHECK_INTERVAL};
use service_network::peer::{my_ipv4_interfaces, SERVICE_PSN_BROKER};
use service_network::runtime::AsyncRuntimeContext;
use std::collections::HashMap;
use service_network::utils::join_handles;
use std::collections::{BTreeMap, HashMap};
use std::net::Ipv4Addr;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc::channel;
use tokio::sync::RwLock;
use tokio::time::sleep;

#[macro_use]
Expand All @@ -33,6 +36,7 @@ lazy_static! {
GIT_REVISION.to_string(),
);
pub static ref RT_CTX: AsyncRuntimeContext = AsyncRuntimeContext::new(CONFIG.clone());
pub static ref LW_MAP: WrappedLocalWorkerMap = Arc::new(RwLock::new(BTreeMap::new()));
}

#[tokio::main]
Expand All @@ -50,14 +54,14 @@ async fn main() {

let (tx, rx) = channel::<LocalWorkerManagerChannelMessage>(1024);

let async_handles = vec![
join_handles(vec![
tokio::spawn(broker()),
tokio::spawn(outbound::start(&RT_CTX)),
tokio::spawn(mgmt::start_server(tx.clone(), &RT_CTX, &CONFIG)),
tokio::spawn(local_worker_manager(tx.clone(), rx)),
tokio::spawn(check_peer_health_loop(tx.clone())),
];
try_join_all(async_handles).await.expect("main failed");
])
.await;
}

async fn broker() {
Expand Down Expand Up @@ -116,7 +120,7 @@ async fn register_service(mdns: &ServiceDaemon, _ctx: &AsyncRuntimeContext) {

async fn check_peer_health_loop(tx: LocalWorkerManagerChannelMessageSender) {
loop {
sleep(Duration::from_millis(BROKER_HEALTH_CHECK_INTERVAL as u64)).await;
sleep(Duration::from_millis(BROKER_HEALTH_CHECK_INTERVAL)).await;
let _ = tx.clone().send(ShouldCheckPeerHealth).await;
}
}
24 changes: 4 additions & 20 deletions src/bin/service_broker/mgmt/local_worker.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,15 @@
use crate::mgmt::{BrokerMgmtShared, MyIdentity};
use crate::mgmt::BrokerMgmtShared;
use crate::LocalWorkerManagerChannelMessage::ReceivedKeepAlive;
use axum::http::StatusCode;
use axum::response::IntoResponse;
use axum::{extract::Extension, Json};
use serde::{Deserialize, Serialize};
use service_network::mgmt_types::LocalWorkerIdentity;
use std::sync::Arc;

#[derive(Clone, Serialize, Deserialize)]
pub struct LocalWorkerIdentity {
pub instance_name: String,
pub instance_id: String,
pub address_string: String,
pub public_port: u16,
pub public_key: String,
}

pub async fn handle_keepalive(
Extension(shared): Extension<Arc<BrokerMgmtShared>>,
Json(lwi): Json<LocalWorkerIdentity>,
) -> impl IntoResponse {
let config = &shared.config;
let _ = shared.tx.clone().send(ReceivedKeepAlive(lwi.clone())).await;
(
StatusCode::IM_A_TEAPOT,
Json(MyIdentity {
instance_name: config.common.instance_name.to_string(),
instance_id: config.instance_id.to_string(),
}),
)
let _ = shared.tx.clone().send(ReceivedKeepAlive(lwi)).await;
(StatusCode::IM_A_TEAPOT, shared.my_id.clone())
}
17 changes: 9 additions & 8 deletions src/bin/service_broker/mgmt/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,31 @@ use axum::routing::put;
use axum::{Extension, Router};
use hyper::Server;
use log::info;
use serde::{Deserialize, Serialize};
use service_network::config::PeerConfig;
use service_network::mgmt_types::{MyIdentity, R_V0_LOCAL_WORKER_KEEPALIVE};
use service_network::runtime::AsyncRuntimeContext;
use std::sync::Arc;

pub struct BrokerMgmtShared {
pub tx: LocalWorkerManagerChannelMessageSender,
pub config: PeerConfig,
pub my_id: String,
}

pub async fn start_server(
tx: LocalWorkerManagerChannelMessageSender,
_ctx: &AsyncRuntimeContext,
config: &'static PeerConfig,
) {
let my_id = MyIdentity {
instance_name: config.common.instance_name.to_string(),
instance_id: config.instance_id.to_string(),
};
let my_id = serde_json::to_string(&my_id).unwrap();
let shared = BrokerMgmtShared {
tx,
config: config.clone(),
my_id,
};
tokio::spawn(async move {
let router = create_router();
Expand All @@ -46,15 +53,9 @@ fn create_router() -> Router {

// Internal API v0
let router = router.route(
"/v0/local_worker/keepalive",
R_V0_LOCAL_WORKER_KEEPALIVE,
put(local_worker::handle_keepalive),
);

router
}

#[derive(Serialize, Deserialize)]
pub struct MyIdentity {
instance_name: String,
instance_id: String,
}
26 changes: 10 additions & 16 deletions src/bin/service_worker/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ pub mod runtime;
use crate::runtime::*;
use crate::WorkerRuntimeChannelMessage::*;
use env_logger::{Builder as LoggerBuilder, Target};
use futures::future::try_join_all;
use log::{debug, info};
use log::{debug, error, info};
use mdns_sd::{ServiceDaemon, ServiceInfo};
use phactory_api::pruntime_client::{new_pruntime_client, PRuntimeClient};
use service_network::config::{PeerConfig, PeerRole};
use service_network::peer::local_worker::BrokerPeerUpdateSender;
use service_network::peer::{my_ipv4_interfaces, SERVICE_PSN_LOCAL_WORKER};
use service_network::runtime::AsyncRuntimeContext;
use service_network::utils::join_handles;
use std::collections::HashMap;
use std::net::Ipv4Addr;

Expand All @@ -30,7 +30,10 @@ lazy_static! {
pub static ref RT_CTX: AsyncRuntimeContext = AsyncRuntimeContext::new(CONFIG.clone());
pub static ref PRUNTIME_CLIENT: PRuntimeClient =
new_pruntime_client(CONFIG.local_worker().pruntime_address.to_string());
pub static ref WR: WrappedWorkerRuntime = WorkerRuntime::new_wrapped(&RT_CTX, &PRUNTIME_CLIENT);
pub static ref MDNS: ServiceDaemon = { ServiceDaemon::new().expect("Failed to create daemon") };
pub static ref WR: WrappedWorkerRuntime =
WorkerRuntime::new_wrapped(&MDNS, &RT_CTX, &PRUNTIME_CLIENT);
pub static ref REQ_CLIENT: reqwest::Client = reqwest::Client::new();
}

#[tokio::main]
Expand All @@ -49,16 +52,16 @@ async fn main() {
let (update_best_peer_tx, _) = tokio::sync::mpsc::channel(1024);
let (rt_tx, rt_rx) = tokio::sync::mpsc::channel(1024);

let async_handles = vec![
join_handles(vec![
tokio::spawn(handle_runtime_events(
update_best_peer_tx.clone(),
rt_tx.clone(),
rt_rx,
)),
tokio::spawn(check_pruntime_health(rt_tx.clone())),
];

try_join_all(async_handles).await.expect("main failed");
tokio::spawn(check_current_broker_health_loop(rt_tx.clone())),
])
.await;
}

async fn handle_runtime_events(
Expand Down Expand Up @@ -99,15 +102,6 @@ async fn handle_runtime_events(
}
}

// async fn handle_peer_update(
// mut rx: BrokerPeerUpdateReceiver,
// rt_tx: WorkerRuntimeChannelMessageSender,
// ) {
// while let Some(u) = rx.recv().await {
// let _ = rt_tx.clone().send(ShouldUpdateBrokerPeer(u)).await;
// }
// }

async fn register_service(mdns: &ServiceDaemon, _ctx: &AsyncRuntimeContext) {
let common_config = &CONFIG.common;
let _worker_config = CONFIG.local_worker();
Expand Down
Loading

0 comments on commit 6c558ec

Please sign in to comment.