Skip to content

Commit

Permalink
feat(broker): local worker peer lifecycle
Browse files Browse the repository at this point in the history
  • Loading branch information
krhougs committed Oct 18, 2022
1 parent ec97b09 commit 8fcce80
Show file tree
Hide file tree
Showing 12 changed files with 377 additions and 102 deletions.
205 changes: 205 additions & 0 deletions src/bin/service_broker/local_worker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
use crate::local_worker::KnownLocalWorkerStatus::*;
use crate::local_worker::LocalWorkerManagerChannelMessage::*;
use crate::mgmt::local_worker::LocalWorkerIdentity;
use log::{error, info, warn};
use service_network::config::{BROKER_PEER_DEAD_COUNT, BROKER_PEER_LOST_COUNT};
use std::collections::BTreeMap;
use std::net::Ipv4Addr;
use std::str::FromStr;
use tokio::sync::mpsc::{channel, Receiver, Sender};

pub enum LocalWorkerManagerChannelMessage {
ShouldCheckPeerHealth,
ReceivedKeepAlive(LocalWorkerIdentity),
}

pub type LocalWorkerManagerChannelMessageSender = Sender<LocalWorkerManagerChannelMessage>;
pub type LocalWorkerManagerChannelMessageReceiver = Receiver<LocalWorkerManagerChannelMessage>;

pub type LocalWorkerMap = BTreeMap<String, KnownLocalWorker>;

pub async fn local_worker_manager(
_tx: LocalWorkerManagerChannelMessageSender,
mut rx: LocalWorkerManagerChannelMessageReceiver,
) {
let mut lw_vec_keys: Vec<String> = Vec::new();
let mut lw_map: LocalWorkerMap = BTreeMap::new();

loop {
while let Some(msg) = rx.recv().await {
match msg {
ShouldCheckPeerHealth => {
check_peer_health(&mut lw_map, &lw_vec_keys);
}
ReceivedKeepAlive(lw) => {
let key = lw.public_key.as_str();
let klw = lw_map.get_mut(key);
if let Some(mut klw) = klw {
match klw.status {
Dead => {
lw_vec_keys = create_local_worker(&mut lw_map, lw);
}
_ => {
update_local_worker(&mut klw, lw);
}
}
} else {
lw_vec_keys = create_local_worker(&mut lw_map, lw);
}
}
}
}
}
}

fn create_local_worker(lw_map: &mut LocalWorkerMap, lw: LocalWorkerIdentity) -> Vec<String> {
let LocalWorkerIdentity {
instance_name,
instance_id,
address_string,
public_key,
public_port,
} = lw;

let key = public_key.clone();
let address = Ipv4Addr::from_str(address_string.as_str()).expect(
format!(
"Invalid IP address for worker {}: {}",
instance_name.as_str(),
address_string.as_str()
)
.as_str(),
);

info!(
"Hello new worker({}/{}).",
instance_name.as_str(),
public_key.as_str()
);

let ret = KnownLocalWorker {
status: Active,
address,
instance_name,
instance_id,
address_string,
public_key,
public_port,
lost_count: 0,
lost_mark: false,
};
lw_map.insert(key, ret);
lw_map
.iter()
.filter_map(|(key, val)| match val.status.clone() {
Dead => None,
_ => Some(key.clone()),
})
.collect::<Vec<String>>()
}

fn update_local_worker(klw: &mut KnownLocalWorker, lw: LocalWorkerIdentity) {
let LocalWorkerIdentity {
instance_name,
instance_id,
address_string,
public_key,
..
} = lw;

if !(instance_id.eq(klw.instance_id.as_str())) {
warn!(
"Worker {} has changed instance id, it may have restarted.",
instance_name.as_str()
)
}
klw.instance_id = instance_id;

if !(address_string.eq(klw.address_string.as_str())) {
warn!(
"Worker {} has changed its IP address, there may be IP address collisions.",
instance_name.as_str()
)
}
if !(public_key.eq(klw.public_key.as_str())) {
error!(
"[FATAL] Worker {} has changed public key, please check your network environment.",
instance_name.as_str()
)
}

match klw.status.clone() {
Active => {
klw.lost_mark = false;
}
Lost => {
klw.status = Active;
klw.lost_count = 0;
klw.lost_mark = false;
info!(
"Worker {} has recovered from lost state.",
instance_name.as_str()
)
}
Dead => {
warn!(
"[FATAL] This is a bug, `update_local_worker` ran for dead worker {}.",
instance_name.as_str()
)
}
}
}

fn check_peer_health(lw_map: &mut LocalWorkerMap, lw_vec_keys: &Vec<String>) {
let _ = lw_vec_keys.iter().for_each(|k| {
let lw = lw_map.get_mut(k);
if let Some(mut lw) = lw {
match lw.status.clone() {
Active => {
if lw.lost_mark && (lw.lost_count > BROKER_PEER_LOST_COUNT) {
lw.status = Lost;
warn!(
"Worker peer unreachable: {}/{}",
lw.instance_name, lw.public_key
);
}
lw.lost_mark = true;
lw.lost_count += 1;
}
Lost => {
if lw.lost_count > BROKER_PEER_DEAD_COUNT {
lw.status = Dead;
warn!("Worker peer dead: {}/{}", lw.instance_name, lw.public_key);
}
lw.lost_mark = true;
lw.lost_count += 1;
}
Dead => {
// ignored
}
}
}
});
}

#[derive(Clone, Debug)]
pub enum KnownLocalWorkerStatus {
Active,
Lost,
Dead,
}

#[derive(Clone, Debug)]
pub struct KnownLocalWorker {
pub status: KnownLocalWorkerStatus,
pub address: Ipv4Addr,
pub address_string: String,
pub public_port: u16,
pub public_key: String,
pub instance_name: String,
pub instance_id: String,
pub lost_count: u8,
pub lost_mark: bool,
}

impl KnownLocalWorker {}
42 changes: 30 additions & 12 deletions src/bin/service_broker/main.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,23 @@
mod local_worker;
mod mgmt;
mod outbound;

use crate::local_worker::{
local_worker_manager, LocalWorkerManagerChannelMessage, LocalWorkerManagerChannelMessageSender,
};
use crate::LocalWorkerManagerChannelMessage::ShouldCheckPeerHealth;
use env_logger::{Builder as LoggerBuilder, Target};
use lazy_static::lazy_static;
use futures::future::try_join_all;
use log::{debug, info};
use mdns_sd::{ServiceDaemon, ServiceInfo};
use service_network::config::{PeerConfig, PeerRole};
use service_network::config::{PeerConfig, PeerRole, BROKER_HEALTH_CHECK_INTERVAL};
use service_network::peer::{my_ipv4_interfaces, SERVICE_PSN_BROKER};
use service_network::runtime::AsyncRuntimeContext;
use std::collections::HashMap;
use std::net::Ipv4Addr;
use std::time::Duration;
use tokio::sync::mpsc::channel;
use tokio::time::sleep;

#[macro_use]
extern crate lazy_static;
Expand Down Expand Up @@ -40,20 +48,23 @@ async fn main() {
);
debug!("Staring service broker with config: {:?}", &*CONFIG);

tokio::spawn(broker()).await.expect("Broker panic!");
let (tx, rx) = channel::<LocalWorkerManagerChannelMessage>(1024);

let async_handles = vec![
tokio::spawn(broker()),
tokio::spawn(outbound::start(&RT_CTX)),
tokio::spawn(mgmt::start_server(tx.clone(), &RT_CTX, &CONFIG)),
tokio::spawn(local_worker_manager(tx.clone(), rx)),
tokio::spawn(check_peer_health_loop(tx.clone())),
];
try_join_all(async_handles).await.expect("main failed");
}

async fn broker() {
let pm = &RT_CTX.peer_manager;
// let pm = &RT_CTX.peer_manager;
let mdns = ServiceDaemon::new().expect("Could not create service daemon");

register_service(&mdns, &RT_CTX).await;

tokio::join!(
mgmt::start_server(&RT_CTX, &CONFIG),
pm.browse_local_workers(&mdns, &RT_CTX),
outbound::start(&RT_CTX)
);
// pm.browse_local_workers(&mdns, &RT_CTX).await;
}

async fn register_service(mdns: &ServiceDaemon, _ctx: &AsyncRuntimeContext) {
Expand All @@ -67,7 +78,7 @@ async fn register_service(mdns: &ServiceDaemon, _ctx: &AsyncRuntimeContext) {
// mp => management port
// c => cost
// oa => outbound_bind_addresses
// i => inbound_http_server_accessible_address_prefix
// o => inbound_http_server_accessible_address_prefix
let service_info = ServiceInfo::new(
SERVICE_PSN_BROKER,
common_config.instance_name.as_str(),
Expand Down Expand Up @@ -102,3 +113,10 @@ async fn register_service(mdns: &ServiceDaemon, _ctx: &AsyncRuntimeContext) {
&service_info
);
}

async fn check_peer_health_loop(tx: LocalWorkerManagerChannelMessageSender) {
loop {
sleep(Duration::from_millis(BROKER_HEALTH_CHECK_INTERVAL as u64)).await;
let _ = tx.clone().send(ShouldCheckPeerHealth).await;
}
}
18 changes: 0 additions & 18 deletions src/bin/service_broker/mgmt.rs

This file was deleted.

31 changes: 31 additions & 0 deletions src/bin/service_broker/mgmt/local_worker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
use crate::mgmt::{BrokerMgmtShared, MyIdentity};
use crate::LocalWorkerManagerChannelMessage::ReceivedKeepAlive;
use axum::http::StatusCode;
use axum::response::IntoResponse;
use axum::{extract::Extension, Json};
use serde::{Deserialize, Serialize};
use std::sync::Arc;

#[derive(Clone, Serialize, Deserialize)]
pub struct LocalWorkerIdentity {
pub instance_name: String,
pub instance_id: String,
pub address_string: String,
pub public_port: u16,
pub public_key: String,
}

pub async fn handle_keepalive(
Extension(shared): Extension<Arc<BrokerMgmtShared>>,
Json(lwi): Json<LocalWorkerIdentity>,
) -> impl IntoResponse {
let config = &shared.config;
let _ = shared.tx.clone().send(ReceivedKeepAlive(lwi.clone())).await;
(
StatusCode::IM_A_TEAPOT,
Json(MyIdentity {
instance_name: config.common.instance_name.to_string(),
instance_id: config.instance_id.to_string(),
}),
)
}
60 changes: 60 additions & 0 deletions src/bin/service_broker/mgmt/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
pub mod local_worker;

use crate::LocalWorkerManagerChannelMessageSender;
use axum::routing::put;
use axum::{Extension, Router};
use hyper::Server;
use log::info;
use serde::{Deserialize, Serialize};
use service_network::config::PeerConfig;
use service_network::runtime::AsyncRuntimeContext;
use std::sync::Arc;

pub struct BrokerMgmtShared {
pub tx: LocalWorkerManagerChannelMessageSender,
pub config: PeerConfig,
}

pub async fn start_server(
tx: LocalWorkerManagerChannelMessageSender,
_ctx: &AsyncRuntimeContext,
config: &'static PeerConfig,
) {
let shared = BrokerMgmtShared {
tx,
config: config.clone(),
};
tokio::spawn(async move {
let router = create_router();
// TODO: add RSA identity for peer management api
let bind_addr = config.common.mgmt_port;
let bind_addr = format!("0.0.0.0:{}", bind_addr);
info!("Starting management API on {}...", &bind_addr);
let bind_addr = &bind_addr.parse().unwrap();
let router = router.layer(Extension(Arc::new(shared)));
Server::bind(bind_addr)
.serve(router.into_make_service())
.await
.unwrap();
})
.await
.expect("Failed to start management server");
}

fn create_router() -> Router {
let router = Router::new();

// Internal API v0
let router = router.route(
"/v0/local_worker/keepalive",
put(local_worker::handle_keepalive),
);

router
}

#[derive(Serialize, Deserialize)]
pub struct MyIdentity {
instance_name: String,
instance_id: String,
}
Loading

0 comments on commit 8fcce80

Please sign in to comment.