From 080034d319f5ef7cae2a954541797d3e8f9cec31 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luk=C3=A1=C5=A1=20Pol=C3=A1=C4=8Dek?= Date: Fri, 2 Feb 2024 13:38:07 +0100 Subject: [PATCH] Add BLE using DuplexStream --- Cargo.toml | 6 ++- examples/basic_ble.rs | 41 +++++++++++++++++++ src/lib.rs | 2 + src/utils_internal.rs | 92 +++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 139 insertions(+), 2 deletions(-) create mode 100644 examples/basic_ble.rs diff --git a/Cargo.toml b/Cargo.toml index 9c9b2c1..bd9081d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,7 +19,7 @@ default = ["serde"] serde = ["dep:serde", "dep:serde_json"] ts-gen = ["serde", "dep:specta"] -bluetooth-le = ["dep:uuid","dep:btleplug"] +bluetooth-le = ["dep:uuid", "dep:btleplug", "dep:futures"] [[example]] name = "basic_serial" @@ -33,7 +33,8 @@ name = "message_filtering" [[example]] name = "generate_typescript_types" -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[[example]] +name = "basic_ble" [build-dependencies] prost-build = "0.11.1" @@ -55,6 +56,7 @@ serde_json = { version = "1.0", optional = true } thiserror = "1.0.48" uuid = { version = "1.6.1", optional = true } btleplug = { version = "0.11.5", optional = true } +futures = { version = "0.3.30", optional = true } [dev-dependencies] fern = { version = "0.6.2", features = ["colored"] } diff --git a/examples/basic_ble.rs b/examples/basic_ble.rs new file mode 100644 index 0000000..9509e38 --- /dev/null +++ b/examples/basic_ble.rs @@ -0,0 +1,41 @@ +/// This example connects via Bluetooth LE to the radio, and prints out all received packets. +extern crate meshtastic; + +use std::io::{self, BufRead}; + +use meshtastic::api::StreamApi; +use meshtastic::utils; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let stream_api = StreamApi::new(); + + println!("Enter the MAC address of a BLE device to connect to:"); + + let stdin = io::stdin(); + let entered_address = stdin + .lock() + .lines() + .next() + .expect("Failed to find next line") + .expect("Could not read next line"); + + let ble_stream = utils::stream::build_ble_stream(entered_address).await?; + let (mut decoded_listener, stream_api) = stream_api.connect(ble_stream).await; + + let config_id = utils::generate_rand_id(); + let stream_api = stream_api.configure(config_id).await?; + + // This loop can be broken with ctrl+c, disabling bluetooth or by unpowering the radio. + while let Some(decoded) = decoded_listener.recv().await { + println!("Received: {:?}", decoded); + } + + // Note that in this specific example, this will only be called when + // the radio is disconnected, as the above loop will never exit. + // Typically you would allow the user to manually kill the loop, + // for example with tokio::select!. + let _stream_api = stream_api.disconnect().await?; + + Ok(()) +} diff --git a/src/lib.rs b/src/lib.rs index 1eb107e..e3982b8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -128,6 +128,8 @@ pub mod utils { /// can also be used to list all available serial ports on the host machine. pub mod stream { pub use crate::utils_internal::available_serial_ports; + #[cfg(feature = "bluetooth-le")] + pub use crate::utils_internal::build_ble_stream; pub use crate::utils_internal::build_serial_stream; pub use crate::utils_internal::build_tcp_stream; } diff --git a/src/utils_internal.rs b/src/utils_internal.rs index 404ab2c..fd93db1 100644 --- a/src/utils_internal.rs +++ b/src/utils_internal.rs @@ -1,8 +1,18 @@ +#[cfg(feature = "bluetooth-le")] +use crate::connections::ble_handler::BleHandler; use crate::errors_internal::Error; +#[cfg(feature = "bluetooth-le")] +use crate::errors_internal::InternalStreamError; +#[cfg(feature = "bluetooth-le")] +use futures::stream::StreamExt; +#[cfg(feature = "bluetooth-le")] +use std::pin::pin; use std::time::Duration; use std::time::UNIX_EPOCH; use rand::{distributions::Standard, prelude::Distribution, Rng}; +#[cfg(feature = "bluetooth-le")] +use tokio::io::{AsyncReadExt, AsyncWriteExt, DuplexStream}; use tokio_serial::{available_ports, SerialPort, SerialStream}; use crate::connections::stream_api::StreamHandle; @@ -194,6 +204,88 @@ pub async fn build_tcp_stream( Ok(StreamHandle::from_stream(stream)) } +#[cfg(feature = "bluetooth-le")] +pub async fn build_ble_stream(name: String) -> Result, Error> { + let ble_handler = BleHandler::new(name).await?; + + let (client, mut server) = tokio::io::duplex(1024); + let handle = tokio::spawn(async move { + let duplex_write_error_fn = |e| { + Error::InternalStreamError(InternalStreamError::StreamWriteError { + source: Box::new(e), + }) + }; + let mut messages_read = ble_handler.read_fromnum().await?; + + let mut buf = [0u8; 1024]; + if let Ok(len) = server.read(&mut buf).await { + ble_handler.write_to_radio(&buf[..len]).await? + } + while let Ok(msg) = ble_handler.read_from_radio().await { + if msg.len() == 0 { + break; + } + let msg = format_data_packet(msg.into())?; + server + .write(msg.data()) + .await + .map_err(duplex_write_error_fn)?; + } + + let mut notification_stream = pin!(ble_handler + .notifications() + .await? + .filter_map(BleHandler::filter_map)); + let mut adapter_events = ble_handler.adapter_events().await?; + loop { + // Note: the following `tokio::select` is only half-duplex on the BLE radio. While we + // are reading from the radio, we are not writing to it and vice versa. However, BLE is + // a half-duplex technology, so we wouldn't gain much with a full duplex solution + // anyway. + tokio::select!( + notification = notification_stream.next() => { + if let Some(msg_count) = notification { + for _ in messages_read..msg_count { + let radio_msg = ble_handler.read_from_radio().await; + match radio_msg { + Ok(msg) => { + let msg = format_data_packet(msg.into())?; + server.write(msg.data()).await.map_err(duplex_write_error_fn)?; + messages_read += 1; + }, + Err(e) => return Err(e), + } + } + } + }, + from_server = server.read(&mut buf) => { + match from_server { + Ok(len) => ble_handler.write_to_radio(&buf[..len]).await?, + Err(e) => { // Irrecoverable + return Err(Error::InternalStreamError( + InternalStreamError::StreamWriteError { + source: Box::new(e), + }, + )); + } + } + }, + event = adapter_events.next() => { + if ble_handler.is_disconnected_event(event) { + println!("Disconnected"); + return Err(Error::InternalStreamError(InternalStreamError::ConnectionLost)) + } + } + ); + } + }); + + Ok(StreamHandle { + stream: client, + join_handle: Some(handle), + }) +} + /// A helper method to generate random numbers using the `rand` crate. /// /// This method is intended to be used to generate random id values. This method