From 75073f8261c4c6ee97bdde15a55f754e9d6072be Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luk=C3=A1=C5=A1=20Pol=C3=A1=C4=8Dek?= Date: Tue, 16 Jan 2024 11:32:00 +0100 Subject: [PATCH] BLE using DuplexStream Working! --- Cargo.toml | 4 ++ examples/basic_ble.rs | 43 +++++++++++ src/lib.rs | 1 + src/utils_internal.rs | 162 ++++++++++++++++++++++++++++++++++++++++-- 4 files changed, 206 insertions(+), 4 deletions(-) create mode 100644 examples/basic_ble.rs diff --git a/Cargo.toml b/Cargo.toml index 7de47e8..0b1230b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,9 @@ name = "basic_serial" [[example]] name = "basic_tcp" +[[example]] +name = "basic_ble" + # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [build-dependencies] @@ -48,3 +51,4 @@ serde_json = { version = "1.0", optional = true } thiserror = "1.0.48" uuid = "1.6.1" btleplug = "0.11.5" +futures = "0.3.30" diff --git a/examples/basic_ble.rs b/examples/basic_ble.rs new file mode 100644 index 0000000..acaea63 --- /dev/null +++ b/examples/basic_ble.rs @@ -0,0 +1,43 @@ +/// This example connects to a TCP port on the radio, and prints out all received packets. +/// This can be used with a simulated radio via the Meshtastic Docker firmware image. +/// https://meshtastic.org/docs/software/linux-native#usage-with-docker +extern crate meshtastic; + +use std::io::{self, BufRead}; + +use meshtastic::api::StreamApi; +use meshtastic::utils; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let stream_api = StreamApi::new(); + + println!("Enter the MAC address of a BLE device to connect to:"); + + let stdin = io::stdin(); + let entered_address = stdin + .lock() + .lines() + .next() + .expect("Failed to find next line") + .expect("Could not read next line"); + + let (ble_stream, _handle) = utils::stream::build_ble_stream(entered_address).await?; + let (mut decoded_listener, stream_api) = stream_api.connect(ble_stream).await; + + let config_id = utils::generate_rand_id(); + let stream_api = stream_api.configure(config_id).await?; + + // This loop can be broken with ctrl+c, or by unpowering the radio. + while let Some(decoded) = decoded_listener.recv().await { + println!("Received: {:?}", decoded); + } + + // Note that in this specific example, this will only be called when + // the radio is disconnected, as the above loop will never exit. + // Typically you would allow the user to manually kill the loop, + // for example with tokio::select!. + let _stream_api = stream_api.disconnect().await?; + + Ok(()) +} diff --git a/src/lib.rs b/src/lib.rs index 1b8c028..1fe6199 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -126,6 +126,7 @@ pub mod utils { /// can also be used to list all available serial ports on the host machine. pub mod stream { pub use crate::utils_internal::available_serial_ports; + pub use crate::utils_internal::build_ble_stream; pub use crate::utils_internal::build_serial_stream; pub use crate::utils_internal::build_tcp_stream; } diff --git a/src/utils_internal.rs b/src/utils_internal.rs index bc99066..8aa5f51 100644 --- a/src/utils_internal.rs +++ b/src/utils_internal.rs @@ -1,12 +1,18 @@ -use crate::errors_internal::{BleConnectionError, Error}; -use btleplug::api::{Central, Manager as _, Peripheral as _, ScanFilter}; +use crate::errors_internal::{BleConnectionError, Error, InternalStreamError}; +use btleplug::api::{ + Central, Characteristic, Manager as _, Peripheral as _, ScanFilter, ValueNotification, + WriteType, +}; use btleplug::platform::{Adapter, Manager, Peripheral}; +use futures::stream::StreamExt; use log::error; use std::time::Duration; use std::time::UNIX_EPOCH; use uuid::Uuid; use rand::{distributions::Standard, prelude::Distribution, Rng}; +use tokio::io::DuplexStream; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio_serial::{available_ports, SerialPort, SerialStream}; use crate::connections::stream_api::StreamHandle; @@ -199,6 +205,9 @@ pub async fn build_tcp_stream( } const MSH_SERVICE: Uuid = Uuid::from_u128(0x6ba1b218_15a8_461f_9fa8_5dcae273eafd); +const FROMRADIO: Uuid = Uuid::from_u128(0x2c55e69e_4993_11ed_b878_0242ac120002); +const TORADIO: Uuid = Uuid::from_u128(0xf75c76d2_129e_4dad_a1dd_7866124401e7); +const FROMNUM: Uuid = Uuid::from_u128(0xed9da18c_a800_4f66_a670_aa7547e34453); async fn scan_peripherals(adapter: &Adapter) -> Result, btleplug::Error> { adapter @@ -211,7 +220,7 @@ async fn scan_peripherals(adapter: &Adapter) -> Result, btleplug /// Finds a BLE radio matching a given name and running meshtastic. /// It searches for the 'MSH_SERVICE' running on the device. -async fn find_ble_radio(name: String) -> Result { +async fn find_ble_radio(name: &str) -> Result { //TODO: support searching both by a name and by a MAC address let scan_error_fn = |e: btleplug::Error| Error::StreamBuildError { source: Box::new(e), @@ -231,7 +240,7 @@ async fn find_ble_radio(name: String) -> Result { Ok(peripherals) => { for peripheral in peripherals { if let Ok(Some(peripheral_properties)) = peripheral.properties().await { - if peripheral_properties.local_name == Some(name.clone()) { + if peripheral_properties.local_name == Some(name.to_owned()) { return Ok(peripheral); } } @@ -246,6 +255,151 @@ async fn find_ble_radio(name: String) -> Result { }) } +/// Finds the 3 meshtastic characteristics: toradio, fromnum and fromradio. It returns them in this +/// order. +async fn find_characteristics(radio: &Peripheral) -> Result<[Characteristic; 3], Error> { + radio + .discover_services() + .await + .map_err(|e| Error::StreamBuildError { + source: Box::new(e), + description: "Failed to discover services".to_owned(), + })?; + let characteristics = radio.characteristics(); + let find_characteristic = |uuid| { + characteristics + .iter() + .find(|c| c.uuid == uuid) + .ok_or(Error::StreamBuildError { + source: Box::new(BleConnectionError()), // TODO + description: format!("Failed to find characteristic {uuid}"), + }) + }; + + Ok([ + find_characteristic(TORADIO)?.clone(), + find_characteristic(FROMNUM)?.clone(), + find_characteristic(FROMRADIO)?.clone(), + ]) +} + +fn parse_u32(data: Vec) -> Result { + let parsed_value = u32::from_le_bytes(data.as_slice().try_into().map_err(|e| { + Error::InternalStreamError(InternalStreamError::StreamReadError { + source: Box::new(e), + }) + })?); + Ok(parsed_value) +} + +pub async fn build_ble_stream(name: String) -> Result, Error> { + let radio = find_ble_radio(&name).await?; + radio.connect().await.map_err(|e| Error::StreamBuildError { + source: Box::new(e), + description: format!("Failed to connect to the device {name}"), + })?; + + let [toradio_char, fromnum_char, fromradio_char] = find_characteristics(&radio).await?; + + let (client, mut server) = tokio::io::duplex(1024); + let handle = tokio::spawn(async move { + let ble_read_error_fn = |e: btleplug::Error| { + Error::InternalStreamError(InternalStreamError::StreamReadError { + source: Box::new(e), + }) + }; + let ble_write_error_fn = |e: btleplug::Error| { + Error::InternalStreamError(InternalStreamError::StreamWriteError { + source: Box::new(e), + }) + }; + let duplex_write_error_fn = |e| { + Error::InternalStreamError(InternalStreamError::StreamWriteError { + source: Box::new(e), + }) + }; + let mut messages_read = + parse_u32(radio.read(&fromnum_char).await.map_err(ble_read_error_fn)?)?; + let mut msg_count; + + let mut buf = [0u8; 1024]; + if let Ok(len) = server.read(&mut buf).await { + radio + .write(&toradio_char, &buf[4..len], WriteType::WithResponse) + .await + .map_err(ble_write_error_fn)?; + } + while let Ok(msg) = radio.read(&fromradio_char).await { + if msg.len() == 0 { + break; + } + let msg = format_data_packet(msg.into())?; + server + .write(msg.data()) + .await + .map_err(duplex_write_error_fn)?; + } + radio + .subscribe(&fromnum_char) + .await + .map_err(ble_read_error_fn)?; + let mut notifications = radio.notifications().await.map_err(ble_read_error_fn)?; + loop { + // Note: the following `tokio::select` is only half-duplex on the BLE radio. While we + // are reading from the radio, we are not writing to it and vice versa. However, BLE is + // a half-duplex technology, so we wouldn't gain much with a full duplex solution + // anyway. + tokio::select!( + fromnum_data = notifications.next() => { + if let Some(ValueNotification { + uuid: FROMNUM, + value, + }) = fromnum_data { + msg_count = parse_u32(value)?; + for _ in messages_read..msg_count { + let radio_msg = radio.read(&fromradio_char).await; + match radio_msg { + Ok(msg) => { + let msg = format_data_packet(msg.into())?; + server.write(msg.data()).await.map_err(duplex_write_error_fn)?; + messages_read += 1; + }, + Err(e) => { // Irrecoverable + return Err(Error::InternalStreamError( + InternalStreamError::StreamReadError { + source: Box::new(e), + }, + )); + } + } + } + } + }, + from_server = server.read(&mut buf) => { + match from_server { + Ok(len) => radio + .write(&toradio_char, &buf[4..len], WriteType::WithResponse) + .await + .map_err(ble_write_error_fn)?, + Err(e) => { // Irrecoverable + return Err(Error::InternalStreamError( + InternalStreamError::StreamWriteError { + source: Box::new(e), + }, + )); + } + } + } + ); + } + }); + + Ok(StreamHandle { + stream: client, + join_handle: Some(handle), + }) +} + /// A helper method to generate random numbers using the `rand` crate. /// /// This method is intended to be used to generate random id values. This method