diff --git a/Cargo.lock b/Cargo.lock index 7008cd4..8c90e82 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1387,7 +1387,7 @@ dependencies = [ [[package]] name = "zika" -version = "3.4.0" +version = "3.4.1" dependencies = [ "base64", "bytes", diff --git a/Cargo.toml b/Cargo.toml index fcca30d..73e1a13 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "zika" -version = "3.4.0" +version = "3.4.1" license = "MIT" description = "IP Tunneling over MQTT" repository = "https://github.com/akiroz/zika" diff --git a/src/remote.rs b/src/remote.rs index 2c030f6..947cbe3 100644 --- a/src/remote.rs +++ b/src/remote.rs @@ -1,5 +1,5 @@ use core::time::Duration; -use std::sync::Arc; +use std::sync::{atomic::{AtomicBool, Ordering}, Arc}; use std::ops::Range; use log; @@ -31,6 +31,7 @@ pub struct Remote { clients: Vec, subs: Arc>>, pub on_event: broadcast::Receiver<(usize, Packet)>, + pub online: Arc, } impl Remote { @@ -45,7 +46,9 @@ impl Remote { clients: Vec::with_capacity(broker_opts.len()), subs: subs.clone(), on_event: evt_recv, + online: Arc::new(AtomicBool::new(false)), }; + for (idx, opt) in broker_opts.iter().enumerate() { log::debug!("broker[{}] opts {:?}", idx, opt); let (mqtt_client, mut event_loop) = mqtt::AsyncClient::new(opt.clone(), 128); @@ -95,6 +98,23 @@ impl Remote { }); remote.clients.push(remote_client); } + + // Online checker + let broker_len = remote.clients.len(); + let mut chkr_recv = remote.on_event.resubscribe(); + let chkr_online = remote.online.clone(); + task::spawn(async move { + let mut broker_state: Vec = Vec::with_capacity(broker_len); + loop { + match chkr_recv.recv().await { + Ok((idx, Packet::ConnAck(_))) => broker_state[idx] = true, + Ok((idx, Packet::Disconnect(_))) => broker_state[idx] = false, + _ => {} + } + chkr_online.store(broker_state.iter().any(|up| *up), Ordering::Relaxed); + } + }); + (remote, msg_recv) }