Skip to content

Commit

Permalink
config network on pruntime
Browse files Browse the repository at this point in the history
  • Loading branch information
krhougs committed Oct 16, 2022
1 parent 4f00fa7 commit ec97b09
Show file tree
Hide file tree
Showing 12 changed files with 798 additions and 489 deletions.
1 change: 1 addition & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
"args": [],
"cwd": "${workspaceFolder}",
"env": {
"PSN_MGMT_PORT": "1918",
"PSN_BROKER_INBOUND_HTTP_SERVER_ACCESSIBLE_ADDRESS_PREFIX": "http://example.com",
"RUST_LOG": "service_network=trace,service_broker=trace"
}
Expand Down
20 changes: 20 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,6 @@ phactory-api = { git = "https://github.com/Phala-Network/phala-blockchain", vers
prost = "0.11.0"
fast-socks5 = { git = "https://github.com/Phala-Network/fast-socks5" }
trust-dns-resolver = { version = "0.22.0", features = ["dns-over-https-rustls"] }
rust-fsm = "0.6.0"

[workspace]
145 changes: 50 additions & 95 deletions src/bin/service_worker/main.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,21 @@
use crate::runtime::*;
use crate::WorkerRuntimeChannelMessage::*;
use anyhow::Result;
use env_logger::{Builder as LoggerBuilder, Target};
use futures::future::try_join_all;
use log::{debug, info, warn};
use mdns_sd::{ServiceDaemon, ServiceInfo};
use phactory_api::pruntime_client::{new_pruntime_client, PRuntimeClient};
use service_network::config::{PeerConfig, PeerRole};
use service_network::peer::{
my_ipv4_interfaces, BrokerPeerUpdate, BrokerPeerUpdateReceiver, BrokerPeerUpdateSender,
SERVICE_PSN_LOCAL_WORKER,
};
use service_network::peer::broker::BrokerPeerUpdate;
use service_network::peer::local_worker::{BrokerPeerUpdateReceiver, BrokerPeerUpdateSender};
use service_network::peer::{my_ipv4_interfaces, SERVICE_PSN_LOCAL_WORKER};
use service_network::runtime::AsyncRuntimeContext;
use service_network::worker::runtime::WorkerRuntimeChannelMessage::{
ShouldUpdateInfo, ShouldUpdateStatus,
};
use service_network::worker::runtime::{
WorkerRuntime, WorkerRuntimeChannelMessage, WorkerRuntimeStatus, WrappedWorkerRuntime,
};
use std::collections::HashMap;
use std::net::Ipv4Addr;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::mpsc::Sender;

pub mod runtime;

#[macro_use]
extern crate lazy_static;
Expand Down Expand Up @@ -51,111 +49,68 @@ async fn main() {
);
debug!("Staring local worker broker with config: {:?}", &*CONFIG);

let (update_best_peer_tx, update_best_peer_rx) = tokio::sync::mpsc::channel(32);
let (update_best_peer_tx, _) = tokio::sync::mpsc::channel(1024);
let (rt_tx, rt_rx) = tokio::sync::mpsc::channel(1024);

let handle1 = tokio::spawn(handle_update_best_peer(update_best_peer_rx, CONFIG.clone()));
let handle2 = tokio::spawn(runtime(update_best_peer_tx.clone(), rt_tx.clone(), rt_rx));

let wr = WR.read().await;
let pr = wr.prc;
let info = pr
.get_info(())
.await
.expect("Failed to get info from pruntime");
drop(wr);
let _ = rt_tx
.clone()
.send(WorkerRuntimeChannelMessage::ShouldUpdateInfo(info))
.await;

let _ = tokio::join!(handle1, handle2);
}
let async_handles = vec![
tokio::spawn(handle_runtime_events(
update_best_peer_tx.clone(),
rt_tx.clone(),
rt_rx,
)),
tokio::spawn(check_pruntime_health(rt_tx.clone())),
];

async fn local_worker(
tx: BrokerPeerUpdateSender,
rt_tx: Sender<WorkerRuntimeChannelMessage>,
ctx: &AsyncRuntimeContext,
) {
let mdns = ServiceDaemon::new().expect("Failed to create daemon");
let pm = &RT_CTX.peer_manager;
register_service(&mdns, &ctx).await;
pm.browse_brokers(&mdns, tx.clone(), rt_tx.clone(), &RT_CTX)
.await;
try_join_all(async_handles).await.expect("main failed");
}

async fn runtime(
async fn handle_runtime_events(
tx: BrokerPeerUpdateSender,
rt_tx: Sender<WorkerRuntimeChannelMessage>,
mut rx: Receiver<WorkerRuntimeChannelMessage>,
rt_tx: WorkerRuntimeChannelMessageSender,
mut rx: WorkerRuntimeChannelMessageReceiver,
) {
while let Some(msg) = rx.recv().await {
match msg {
ShouldUpdateInfo(i) => {
ShouldSetPRuntimeFailed(msg) => {
let wr = WR.read().await;
wr.handle_pruntime_failure(rt_tx.clone(), msg).await;
drop(wr);
}
ShouldSetBrokerFailed(msg) => {
let wr = WR.read().await;
wr.handle_broker_failure(rt_tx.clone(), msg).await;
drop(wr);
}
ShouldUpdateInfo(info) => {
let mut wr = WR.write().await;
wr.handle_update_info(i);
if matches!(wr.status, WorkerRuntimeStatus::Starting) {
let rt_tx = rt_tx.clone();
let _ = rt_tx
.send(WorkerRuntimeChannelMessage::ShouldUpdateStatus(
WorkerRuntimeStatus::WaitingForBroker,
))
.await;
}
wr.handle_update_info(rt_tx.clone(), info).await;
drop(wr);
}
ShouldUpdateStatus(s) => {
let mut wr = WR.write().await;
wr.handle_update_status(s);
if matches!(wr.status, WorkerRuntimeStatus::WaitingForBroker) {
tokio::spawn(local_worker(tx.clone(), rt_tx.clone(), &RT_CTX));
}
let pr = wr.prc;
wr.handle_update_status(s, tx.clone(), rt_tx.clone(), pr)
.await;
drop(wr);
}
}
}
}

async fn set_pruntime_network_with_peer(socks_url: String, _config: PeerConfig) -> Result<()> {
debug!(
"Trying to set pRuntime outbound with {}",
socks_url.as_str()
);

Ok(())
}

async fn handle_update_best_peer(mut rx: BrokerPeerUpdateReceiver, config: PeerConfig) {
while let Some(u) = rx.recv().await {
match u {
BrokerPeerUpdate::PeerStatusChanged(name, status) => {
info!("Broker {} changed its status to {:?}", name, status);
}
BrokerPeerUpdate::BestPeerChanged(instance_name, socks_url) => {
// todo: needs to be throttled
match tokio::spawn(set_pruntime_network_with_peer(
socks_url.to_string(),
config.clone(),
))
.await
{
Ok(_) => {
info!(
"Updated pRuntime networking with ({}):{}",
instance_name.as_str(),
socks_url.as_str()
);
}
Err(err) => {
warn!("Failed to set pRuntime networking: {:?}", err);
}
}
ShouldLockBroker(peer) => {
let mut wr = WR.write().await;
wr.handle_lock_peer(peer.clone(), rt_tx.clone()).await;
drop(wr);
}
_ => {}
}
}
}

// async fn handle_peer_update(
// mut rx: BrokerPeerUpdateReceiver,
// rt_tx: WorkerRuntimeChannelMessageSender,
// ) {
// while let Some(u) = rx.recv().await {
// let _ = rt_tx.clone().send(ShouldUpdateBrokerPeer(u)).await;
// }
// }

async fn register_service(mdns: &ServiceDaemon, _ctx: &AsyncRuntimeContext) {
let common_config = &CONFIG.common;
let _worker_config = CONFIG.local_worker();
Expand Down
Loading

0 comments on commit ec97b09

Please sign in to comment.