From d7e237fbdbf0bb6ace5f532a57a62aad135a3542 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luk=C3=A1=C5=A1=20Pol=C3=A1=C4=8Dek?= Date: Fri, 2 Feb 2024 13:38:07 +0100 Subject: [PATCH] Add BLE using DuplexStream --- Cargo.toml | 9 ++- examples/basic_ble.rs | 41 ++++++++++++ src/connections/ble_handler.rs | 111 ++++++++++++++++++++------------- src/lib.rs | 2 + src/utils_internal.rs | 98 +++++++++++++++++++++++++++++ 5 files changed, 216 insertions(+), 45 deletions(-) create mode 100644 examples/basic_ble.rs diff --git a/Cargo.toml b/Cargo.toml index 2a14321..754fce9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,11 +15,11 @@ edition = "2021" doctest = false [features] -default = ["serde"] +default = ["serde", "bluetooth-le"] serde = ["dep:serde", "dep:serde_json"] ts-gen = ["serde", "dep:specta"] -bluetooth-le = ["dep:uuid","dep:btleplug"] +bluetooth-le = ["dep:uuid", "dep:btleplug", "dep:futures"] [[example]] name = "basic_serial" @@ -34,7 +34,9 @@ name = "message_filtering" name = "generate_typescript_types" required-features = ["ts-gen"] -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[[example]] +name = "basic_ble" +required-features = ["bluetooth-le"] [build-dependencies] prost-build = "0.11.1" @@ -56,6 +58,7 @@ serde_json = { version = "1.0", optional = true } thiserror = "1.0.48" uuid = { version = "1.6.1", optional = true } btleplug = { version = "0.11.5", optional = true } +futures = { version = "0.3.30", optional = true } [dev-dependencies] fern = { version = "0.6.2", features = ["colored"] } diff --git a/examples/basic_ble.rs b/examples/basic_ble.rs new file mode 100644 index 0000000..81898e8 --- /dev/null +++ b/examples/basic_ble.rs @@ -0,0 +1,41 @@ +/// This example connects via Bluetooth LE to the radio, and prints out all received packets. +extern crate meshtastic; + +use std::io::{self, BufRead}; + +use meshtastic::api::StreamApi; +use meshtastic::utils; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let stream_api = StreamApi::new(); + + println!("Enter the short name of a BLE device to connect to:"); + + let stdin = io::stdin(); + let entered_name = stdin + .lock() + .lines() + .next() + .expect("Failed to find next line") + .expect("Could not read next line"); + + let ble_stream = utils::stream::build_ble_stream(entered_name).await?; + let (mut decoded_listener, stream_api) = stream_api.connect(ble_stream).await; + + let config_id = utils::generate_rand_id(); + let stream_api = stream_api.configure(config_id).await?; + + // This loop can be broken with ctrl+c, disabling bluetooth or by turning off the radio. + while let Some(decoded) = decoded_listener.recv().await { + println!("Received: {:?}", decoded); + } + + // Note that in this specific example, this will only be called when + // the radio is disconnected, as the above loop will never exit. + // Typically you would allow the user to manually kill the loop, + // for example with tokio::select!. + let _stream_api = stream_api.disconnect().await?; + + Ok(()) +} diff --git a/src/connections/ble_handler.rs b/src/connections/ble_handler.rs index df78203..fd5d330 100644 --- a/src/connections/ble_handler.rs +++ b/src/connections/ble_handler.rs @@ -3,8 +3,10 @@ use btleplug::api::{ ValueNotification, WriteType, }; use btleplug::platform::{Adapter, Manager, Peripheral}; +use futures::stream::StreamExt; use futures_util::stream::BoxStream; use log::error; +use std::future; use uuid::Uuid; use crate::errors_internal::{BleConnectionError, Error, InternalStreamError}; @@ -22,6 +24,26 @@ pub struct BleHandler { fromnum_char: Characteristic, } +#[derive(PartialEq)] +pub enum BleEvent { + Disconnected, +} + +pub trait Ble { + fn write_to_radio( + &self, + buffer: &[u8], + ) -> impl std::future::Future> + Send; + fn read_from_radio(&self) -> impl std::future::Future, Error>> + Send; + fn read_fromnum(&self) -> impl std::future::Future> + Send; + fn notifications( + &self, + ) -> impl std::future::Future, Error>> + Send; + fn adapter_events( + &self, + ) -> impl std::future::Future, Error>> + Send; +} + #[allow(dead_code)] impl BleHandler { pub async fn new(name: String) -> Result { @@ -64,7 +86,7 @@ impl BleHandler { let adapters = manager.adapters().await.map_err(scan_error_fn)?; for adapter in &adapters { - let peripherals = Self::scan_peripherals(&adapter).await; + let peripherals = Self::scan_peripherals(adapter).await; match peripherals { Err(e) => { error!("Error while scanning for meshtastic peripherals: {e:?}"); @@ -119,7 +141,23 @@ impl BleHandler { ]) } - pub async fn write_to_radio(&self, buffer: &[u8]) -> Result<(), Error> { + fn ble_read_error_fn(e: btleplug::Error) -> Error { + Error::InternalStreamError(InternalStreamError::StreamReadError { + source: Box::new(e), + }) + } + fn parse_u32(data: Vec) -> Result { + let data = data.as_slice().try_into().map_err(|e| { + Error::InternalStreamError(InternalStreamError::StreamReadError { + source: Box::new(e), + }) + })?; + Ok(u32::from_le_bytes(data)) + } +} + +impl Ble for BleHandler { + async fn write_to_radio(&self, buffer: &[u8]) -> Result<(), Error> { self.radio // TODO: remove the skipping of the first 4 bytes .write(&self.toradio_char, &buffer[4..], WriteType::WithResponse) @@ -131,29 +169,14 @@ impl BleHandler { }) } - fn ble_read_error_fn(e: btleplug::Error) -> Error { - Error::InternalStreamError(InternalStreamError::StreamReadError { - source: Box::new(e), - }) - } - - pub async fn read_from_radio(&self) -> Result, Error> { + async fn read_from_radio(&self) -> Result, Error> { self.radio .read(&self.fromradio_char) .await .map_err(Self::ble_read_error_fn) } - fn parse_u32(data: Vec) -> Result { - let parsed_value = u32::from_le_bytes(data.as_slice().try_into().map_err(|e| { - Error::InternalStreamError(InternalStreamError::StreamReadError { - source: Box::new(e), - }) - })?); - Ok(parsed_value) - } - - pub async fn read_fromnum(&self) -> Result { + async fn read_fromnum(&self) -> Result { let data = self .radio .read(&self.fromnum_char) @@ -162,41 +185,45 @@ impl BleHandler { Self::parse_u32(data) } - pub async fn notifications(&self) -> Result, Error> { + async fn notifications(&self) -> Result, Error> { self.radio .subscribe(&self.fromnum_char) .await .map_err(Self::ble_read_error_fn)?; - self.radio + let notification_stream = self + .radio .notifications() .await - .map_err(Self::ble_read_error_fn) - } + .map_err(Self::ble_read_error_fn)?; - pub async fn filter_map(notification: ValueNotification) -> Option { - match notification { - ValueNotification { - uuid: FROMNUM, - value, - } => Some(Self::parse_u32(value).unwrap()), - _ => None, - } + Ok(Box::pin(notification_stream.filter_map( + |notification| match notification { + ValueNotification { + uuid: FROMNUM, + value, + } => future::ready(Self::parse_u32(value).ok()), + _ => future::ready(None), + }, + ))) } - pub async fn adapter_events(&self) -> Result, Error> { - self.adapter + async fn adapter_events(&self) -> Result, Error> { + let stream = self + .adapter .events() .await .map_err(|e| Error::StreamBuildError { source: Box::new(e), - description: format!("Failed to listen to device events"), - }) - } - - pub fn is_disconnected_event(&self, event: Option) -> bool { - if let Some(CentralEvent::DeviceDisconnected(peripheral_id)) = event { - return self.radio.id() == peripheral_id; - } - return false; + description: "Failed to listen to device events".to_owned(), + })?; + let id = self.radio.id(); + Ok(Box::pin(stream.filter_map(move |event| { + if let CentralEvent::DeviceDisconnected(peripheral_id) = event { + if id == peripheral_id { + return future::ready(Some(BleEvent::Disconnected)); + } + } + future::ready(None) + }))) } } diff --git a/src/lib.rs b/src/lib.rs index 1eb107e..e3982b8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -128,6 +128,8 @@ pub mod utils { /// can also be used to list all available serial ports on the host machine. pub mod stream { pub use crate::utils_internal::available_serial_ports; + #[cfg(feature = "bluetooth-le")] + pub use crate::utils_internal::build_ble_stream; pub use crate::utils_internal::build_serial_stream; pub use crate::utils_internal::build_tcp_stream; } diff --git a/src/utils_internal.rs b/src/utils_internal.rs index 171a599..f24a858 100644 --- a/src/utils_internal.rs +++ b/src/utils_internal.rs @@ -1,8 +1,16 @@ +#![allow(clippy::mut_range_bound)] + +#[cfg(feature = "bluetooth-le")] +use crate::connections::ble_handler::{Ble, BleHandler}; use crate::errors_internal::Error; +#[cfg(feature = "bluetooth-le")] +use futures::stream::StreamExt; use std::time::Duration; use std::time::UNIX_EPOCH; use rand::{distributions::Standard, prelude::Distribution, Rng}; +#[cfg(feature = "bluetooth-le")] +use tokio::io::{AsyncReadExt, AsyncWriteExt, DuplexStream}; use tokio_serial::{available_ports, SerialPort, SerialStream}; use crate::connections::stream_api::StreamHandle; @@ -194,6 +202,96 @@ pub async fn build_tcp_stream( Ok(StreamHandle::from_stream(stream)) } +#[cfg(feature = "bluetooth-le")] +pub async fn build_ble_stream(name: String) -> Result, Error> { + let ble_handler = BleHandler::new(name).await?; + build_ble_stream_impl(ble_handler).await +} + +#[cfg(feature = "bluetooth-le")] +pub async fn build_ble_stream_impl( + ble_handler: B, +) -> Result, Error> { + use crate::{connections::ble_handler::BleEvent, errors_internal::InternalStreamError}; + // `client` will be returned by this function, server is the opposite end of the channel and + // it's directly connected to a `BleHandler`. + let (client, mut server) = tokio::io::duplex(1024); + let handle = tokio::spawn(async move { + let duplex_write_error_fn = |e| { + Error::InternalStreamError(InternalStreamError::StreamWriteError { + source: Box::new(e), + }) + }; + let mut read_messages_count = ble_handler.read_fromnum().await?; + + let mut buf = [0u8; 1024]; + if let Ok(len) = server.read(&mut buf).await { + ble_handler.write_to_radio(&buf[..len]).await? + } + while let Ok(msg) = ble_handler.read_from_radio().await { + if msg.is_empty() { + break; + } + let msg = format_data_packet(msg.into())?; + server + .write(msg.data()) + .await + .map_err(duplex_write_error_fn)?; + } + + let mut notification_stream = ble_handler.notifications().await?; + let mut adapter_events = ble_handler.adapter_events().await?; + loop { + // Note: the following `tokio::select` is only half-duplex on the BLE radio. While we + // are reading from the radio, we are not writing to it and vice versa. However, BLE is + // a half-duplex technology, so we wouldn't gain much with a full duplex solution + // anyway. + tokio::select!( + // Data from device, forward it to the user + notification = notification_stream.next() => { + if let Some(msg_count) = notification { + for _ in read_messages_count..msg_count { + let radio_msg = ble_handler.read_from_radio().await; + match radio_msg { + Ok(msg) => { + let msg = format_data_packet(msg.into())?; + server.write(msg.data()).await.map_err(duplex_write_error_fn)?; + read_messages_count += 1; + }, + Err(e) => return Err(e), + } + } + } + }, + // Data from user, forward it to the device + from_server = server.read(&mut buf) => { + match from_server { + Ok(len) => ble_handler.write_to_radio(&buf[..len]).await?, + Err(e) => { // Irrecoverable + return Err(Error::InternalStreamError( + InternalStreamError::StreamWriteError { + source: Box::new(e), + }, + )); + } + } + }, + event = adapter_events.next() => { + if Some(BleEvent::Disconnected) == event { + println!("Disconnected"); + return Err(Error::InternalStreamError(InternalStreamError::ConnectionLost)) + } + } + ); + } + }); + + Ok(StreamHandle { + stream: client, + join_handle: Some(handle), + }) +} + /// A helper method to generate random numbers using the `rand` crate. /// /// This method is intended to be used to generate random id values. This method