Skip to content

Commit

Permalink
BLE using DuplexStream
Browse files Browse the repository at this point in the history
Working!
  • Loading branch information
lukipuki committed Jan 23, 2024
1 parent e77d168 commit 75073f8
Show file tree
Hide file tree
Showing 4 changed files with 206 additions and 4 deletions.
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ name = "basic_serial"
[[example]]
name = "basic_tcp"

[[example]]
name = "basic_ble"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[build-dependencies]
Expand All @@ -48,3 +51,4 @@ serde_json = { version = "1.0", optional = true }
thiserror = "1.0.48"
uuid = "1.6.1"
btleplug = "0.11.5"
futures = "0.3.30"
43 changes: 43 additions & 0 deletions examples/basic_ble.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/// This example connects to a TCP port on the radio, and prints out all received packets.
/// This can be used with a simulated radio via the Meshtastic Docker firmware image.
/// https://meshtastic.org/docs/software/linux-native#usage-with-docker
extern crate meshtastic;

use std::io::{self, BufRead};

use meshtastic::api::StreamApi;
use meshtastic::utils;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let stream_api = StreamApi::new();

println!("Enter the MAC address of a BLE device to connect to:");

let stdin = io::stdin();
let entered_address = stdin
.lock()
.lines()
.next()
.expect("Failed to find next line")
.expect("Could not read next line");

let (ble_stream, _handle) = utils::stream::build_ble_stream(entered_address).await?;
let (mut decoded_listener, stream_api) = stream_api.connect(ble_stream).await;

let config_id = utils::generate_rand_id();
let stream_api = stream_api.configure(config_id).await?;

// This loop can be broken with ctrl+c, or by unpowering the radio.
while let Some(decoded) = decoded_listener.recv().await {
println!("Received: {:?}", decoded);
}

// Note that in this specific example, this will only be called when
// the radio is disconnected, as the above loop will never exit.
// Typically you would allow the user to manually kill the loop,
// for example with tokio::select!.
let _stream_api = stream_api.disconnect().await?;

Ok(())
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ pub mod utils {
/// can also be used to list all available serial ports on the host machine.
pub mod stream {
pub use crate::utils_internal::available_serial_ports;
pub use crate::utils_internal::build_ble_stream;
pub use crate::utils_internal::build_serial_stream;
pub use crate::utils_internal::build_tcp_stream;
}
Expand Down
162 changes: 158 additions & 4 deletions src/utils_internal.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
use crate::errors_internal::{BleConnectionError, Error};
use btleplug::api::{Central, Manager as _, Peripheral as _, ScanFilter};
use crate::errors_internal::{BleConnectionError, Error, InternalStreamError};
use btleplug::api::{
Central, Characteristic, Manager as _, Peripheral as _, ScanFilter, ValueNotification,
WriteType,
};
use btleplug::platform::{Adapter, Manager, Peripheral};
use futures::stream::StreamExt;
use log::error;
use std::time::Duration;
use std::time::UNIX_EPOCH;
use uuid::Uuid;

use rand::{distributions::Standard, prelude::Distribution, Rng};
use tokio::io::DuplexStream;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio_serial::{available_ports, SerialPort, SerialStream};

use crate::connections::stream_api::StreamHandle;
Expand Down Expand Up @@ -199,6 +205,9 @@ pub async fn build_tcp_stream(
}

const MSH_SERVICE: Uuid = Uuid::from_u128(0x6ba1b218_15a8_461f_9fa8_5dcae273eafd);
const FROMRADIO: Uuid = Uuid::from_u128(0x2c55e69e_4993_11ed_b878_0242ac120002);
const TORADIO: Uuid = Uuid::from_u128(0xf75c76d2_129e_4dad_a1dd_7866124401e7);
const FROMNUM: Uuid = Uuid::from_u128(0xed9da18c_a800_4f66_a670_aa7547e34453);

async fn scan_peripherals(adapter: &Adapter) -> Result<Vec<Peripheral>, btleplug::Error> {
adapter
Expand All @@ -211,7 +220,7 @@ async fn scan_peripherals(adapter: &Adapter) -> Result<Vec<Peripheral>, btleplug

/// Finds a BLE radio matching a given name and running meshtastic.
/// It searches for the 'MSH_SERVICE' running on the device.
async fn find_ble_radio(name: String) -> Result<Peripheral, Error> {
async fn find_ble_radio(name: &str) -> Result<Peripheral, Error> {
//TODO: support searching both by a name and by a MAC address
let scan_error_fn = |e: btleplug::Error| Error::StreamBuildError {
source: Box::new(e),
Expand All @@ -231,7 +240,7 @@ async fn find_ble_radio(name: String) -> Result<Peripheral, Error> {
Ok(peripherals) => {
for peripheral in peripherals {
if let Ok(Some(peripheral_properties)) = peripheral.properties().await {
if peripheral_properties.local_name == Some(name.clone()) {
if peripheral_properties.local_name == Some(name.to_owned()) {
return Ok(peripheral);
}
}
Expand All @@ -246,6 +255,151 @@ async fn find_ble_radio(name: String) -> Result<Peripheral, Error> {
})
}

/// Finds the 3 meshtastic characteristics: toradio, fromnum and fromradio. It returns them in this
/// order.
async fn find_characteristics(radio: &Peripheral) -> Result<[Characteristic; 3], Error> {
radio
.discover_services()
.await
.map_err(|e| Error::StreamBuildError {
source: Box::new(e),
description: "Failed to discover services".to_owned(),
})?;
let characteristics = radio.characteristics();
let find_characteristic = |uuid| {
characteristics
.iter()
.find(|c| c.uuid == uuid)
.ok_or(Error::StreamBuildError {
source: Box::new(BleConnectionError()), // TODO
description: format!("Failed to find characteristic {uuid}"),
})
};

Ok([
find_characteristic(TORADIO)?.clone(),
find_characteristic(FROMNUM)?.clone(),
find_characteristic(FROMRADIO)?.clone(),
])
}

fn parse_u32(data: Vec<u8>) -> Result<u32, Error> {
let parsed_value = u32::from_le_bytes(data.as_slice().try_into().map_err(|e| {
Error::InternalStreamError(InternalStreamError::StreamReadError {
source: Box::new(e),
})
})?);
Ok(parsed_value)
}

pub async fn build_ble_stream(name: String) -> Result<StreamHandle<DuplexStream>, Error> {
let radio = find_ble_radio(&name).await?;
radio.connect().await.map_err(|e| Error::StreamBuildError {
source: Box::new(e),
description: format!("Failed to connect to the device {name}"),
})?;

let [toradio_char, fromnum_char, fromradio_char] = find_characteristics(&radio).await?;

let (client, mut server) = tokio::io::duplex(1024);
let handle = tokio::spawn(async move {
let ble_read_error_fn = |e: btleplug::Error| {
Error::InternalStreamError(InternalStreamError::StreamReadError {
source: Box::new(e),
})
};
let ble_write_error_fn = |e: btleplug::Error| {
Error::InternalStreamError(InternalStreamError::StreamWriteError {
source: Box::new(e),
})
};
let duplex_write_error_fn = |e| {
Error::InternalStreamError(InternalStreamError::StreamWriteError {
source: Box::new(e),
})
};
let mut messages_read =
parse_u32(radio.read(&fromnum_char).await.map_err(ble_read_error_fn)?)?;
let mut msg_count;

let mut buf = [0u8; 1024];
if let Ok(len) = server.read(&mut buf).await {
radio
.write(&toradio_char, &buf[4..len], WriteType::WithResponse)
.await
.map_err(ble_write_error_fn)?;
}
while let Ok(msg) = radio.read(&fromradio_char).await {
if msg.len() == 0 {
break;
}
let msg = format_data_packet(msg.into())?;
server
.write(msg.data())
.await
.map_err(duplex_write_error_fn)?;
}
radio
.subscribe(&fromnum_char)
.await
.map_err(ble_read_error_fn)?;
let mut notifications = radio.notifications().await.map_err(ble_read_error_fn)?;
loop {
// Note: the following `tokio::select` is only half-duplex on the BLE radio. While we
// are reading from the radio, we are not writing to it and vice versa. However, BLE is
// a half-duplex technology, so we wouldn't gain much with a full duplex solution
// anyway.
tokio::select!(
fromnum_data = notifications.next() => {
if let Some(ValueNotification {
uuid: FROMNUM,
value,
}) = fromnum_data {
msg_count = parse_u32(value)?;
for _ in messages_read..msg_count {
let radio_msg = radio.read(&fromradio_char).await;
match radio_msg {
Ok(msg) => {
let msg = format_data_packet(msg.into())?;
server.write(msg.data()).await.map_err(duplex_write_error_fn)?;
messages_read += 1;
},
Err(e) => { // Irrecoverable
return Err(Error::InternalStreamError(
InternalStreamError::StreamReadError {
source: Box::new(e),
},
));
}
}
}
}
},
from_server = server.read(&mut buf) => {
match from_server {
Ok(len) => radio
.write(&toradio_char, &buf[4..len], WriteType::WithResponse)
.await
.map_err(ble_write_error_fn)?,
Err(e) => { // Irrecoverable
return Err(Error::InternalStreamError(
InternalStreamError::StreamWriteError {
source: Box::new(e),
},
));
}
}
}
);
}
});

Ok(StreamHandle {
stream: client,
join_handle: Some(handle),
})
}

/// A helper method to generate random numbers using the `rand` crate.
///
/// This method is intended to be used to generate random id values. This method
Expand Down

0 comments on commit 75073f8

Please sign in to comment.