Skip to content

Commit

Permalink
Add BLE using DuplexStream
Browse files Browse the repository at this point in the history
  • Loading branch information
lukipuki committed Dec 28, 2024
1 parent 1e37217 commit c039e86
Show file tree
Hide file tree
Showing 5 changed files with 205 additions and 45 deletions.
9 changes: 6 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ edition = "2021"
doctest = false

[features]
default = ["serde"]
default = ["serde", "bluetooth-le"]

serde = ["dep:serde", "dep:serde_json"]
ts-gen = ["serde", "dep:specta"]
bluetooth-le = ["dep:uuid","dep:btleplug"]
bluetooth-le = ["dep:uuid", "dep:btleplug", "dep:futures"]

[[example]]
name = "basic_serial"
Expand All @@ -34,7 +34,9 @@ name = "message_filtering"
name = "generate_typescript_types"
required-features = ["ts-gen"]

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[[example]]
name = "basic_ble"
required-features = ["bluetooth-le"]

[build-dependencies]
prost-build = "0.11.1"
Expand All @@ -56,6 +58,7 @@ serde_json = { version = "1.0", optional = true }
thiserror = "1.0.48"
uuid = { version = "1.6.1", optional = true }
btleplug = { version = "0.11.5", optional = true }
futures = { version = "0.3.30", optional = true }

[dev-dependencies]
fern = { version = "0.6.2", features = ["colored"] }
Expand Down
41 changes: 41 additions & 0 deletions examples/basic_ble.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/// This example connects via Bluetooth LE to the radio, and prints out all received packets.
extern crate meshtastic;

use std::io::{self, BufRead};

use meshtastic::api::StreamApi;
use meshtastic::utils;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let stream_api = StreamApi::new();

println!("Enter the short name of a BLE device to connect to:");

let stdin = io::stdin();
let entered_name = stdin
.lock()
.lines()
.next()
.expect("Failed to find next line")
.expect("Could not read next line");

let ble_stream = utils::stream::build_ble_stream(entered_name).await?;
let (mut decoded_listener, stream_api) = stream_api.connect(ble_stream).await;

let config_id = utils::generate_rand_id();
let stream_api = stream_api.configure(config_id).await?;

// This loop can be broken with ctrl+c, disabling bluetooth or by turning off the radio.
while let Some(decoded) = decoded_listener.recv().await {
println!("Received: {:?}", decoded);
}

// Note that in this specific example, this will only be called when
// the radio is disconnected, as the above loop will never exit.
// Typically you would allow the user to manually kill the loop,
// for example with tokio::select!.
let _stream_api = stream_api.disconnect().await?;

Ok(())
}
111 changes: 69 additions & 42 deletions src/connections/ble_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ use btleplug::api::{
ValueNotification, WriteType,
};
use btleplug::platform::{Adapter, Manager, Peripheral};
use futures::stream::StreamExt;
use futures_util::stream::BoxStream;
use log::error;
use std::future;
use uuid::Uuid;

use crate::errors_internal::{BleConnectionError, Error, InternalStreamError};
Expand All @@ -22,6 +24,26 @@ pub struct BleHandler {
fromnum_char: Characteristic,
}

#[derive(PartialEq)]
pub enum AdapterEvent {
Disconnected,
}

pub trait Ble {
fn write_to_radio(
&self,
buffer: &[u8],
) -> impl std::future::Future<Output = Result<(), Error>> + Send;
fn read_from_radio(&self) -> impl std::future::Future<Output = Result<Vec<u8>, Error>> + Send;
fn read_fromnum(&self) -> impl std::future::Future<Output = Result<u32, Error>> + Send;
fn notifications(
&self,
) -> impl std::future::Future<Output = Result<BoxStream<u32>, Error>> + Send;
fn adapter_events(
&self,
) -> impl std::future::Future<Output = Result<BoxStream<AdapterEvent>, Error>> + Send;
}

#[allow(dead_code)]
impl BleHandler {
pub async fn new(name: String) -> Result<Self, Error> {
Expand Down Expand Up @@ -64,7 +86,7 @@ impl BleHandler {
let adapters = manager.adapters().await.map_err(scan_error_fn)?;

for adapter in &adapters {
let peripherals = Self::scan_peripherals(&adapter).await;
let peripherals = Self::scan_peripherals(adapter).await;
match peripherals {
Err(e) => {
error!("Error while scanning for meshtastic peripherals: {e:?}");
Expand Down Expand Up @@ -119,7 +141,23 @@ impl BleHandler {
])
}

pub async fn write_to_radio(&self, buffer: &[u8]) -> Result<(), Error> {
fn ble_read_error_fn(e: btleplug::Error) -> Error {
Error::InternalStreamError(InternalStreamError::StreamReadError {
source: Box::new(e),
})
}
fn parse_u32(data: Vec<u8>) -> Result<u32, Error> {
let data = data.as_slice().try_into().map_err(|e| {
Error::InternalStreamError(InternalStreamError::StreamReadError {
source: Box::new(e),
})
})?;
Ok(u32::from_le_bytes(data))
}
}

impl Ble for BleHandler {
async fn write_to_radio(&self, buffer: &[u8]) -> Result<(), Error> {
self.radio
// TODO: remove the skipping of the first 4 bytes
.write(&self.toradio_char, &buffer[4..], WriteType::WithResponse)
Expand All @@ -131,29 +169,14 @@ impl BleHandler {
})
}

fn ble_read_error_fn(e: btleplug::Error) -> Error {
Error::InternalStreamError(InternalStreamError::StreamReadError {
source: Box::new(e),
})
}

pub async fn read_from_radio(&self) -> Result<Vec<u8>, Error> {
async fn read_from_radio(&self) -> Result<Vec<u8>, Error> {
self.radio
.read(&self.fromradio_char)
.await
.map_err(Self::ble_read_error_fn)
}

fn parse_u32(data: Vec<u8>) -> Result<u32, Error> {
let parsed_value = u32::from_le_bytes(data.as_slice().try_into().map_err(|e| {
Error::InternalStreamError(InternalStreamError::StreamReadError {
source: Box::new(e),
})
})?);
Ok(parsed_value)
}

pub async fn read_fromnum(&self) -> Result<u32, Error> {
async fn read_fromnum(&self) -> Result<u32, Error> {
let data = self
.radio
.read(&self.fromnum_char)
Expand All @@ -162,41 +185,45 @@ impl BleHandler {
Self::parse_u32(data)
}

pub async fn notifications(&self) -> Result<BoxStream<ValueNotification>, Error> {
async fn notifications(&self) -> Result<BoxStream<u32>, Error> {
self.radio
.subscribe(&self.fromnum_char)
.await
.map_err(Self::ble_read_error_fn)?;
self.radio
let notification_stream = self
.radio
.notifications()
.await
.map_err(Self::ble_read_error_fn)
}
.map_err(Self::ble_read_error_fn)?;

pub async fn filter_map(notification: ValueNotification) -> Option<u32> {
match notification {
ValueNotification {
uuid: FROMNUM,
value,
} => Some(Self::parse_u32(value).unwrap()),
_ => None,
}
Ok(Box::pin(notification_stream.filter_map(
|notification| match notification {
ValueNotification {
uuid: FROMNUM,
value,
} => future::ready(Self::parse_u32(value).ok()),
_ => future::ready(None),
},
)))
}

pub async fn adapter_events(&self) -> Result<BoxStream<CentralEvent>, Error> {
self.adapter
async fn adapter_events(&self) -> Result<BoxStream<AdapterEvent>, Error> {
let stream = self
.adapter
.events()
.await
.map_err(|e| Error::StreamBuildError {
source: Box::new(e),
description: format!("Failed to listen to device events"),
})
}

pub fn is_disconnected_event(&self, event: Option<CentralEvent>) -> bool {
if let Some(CentralEvent::DeviceDisconnected(peripheral_id)) = event {
return self.radio.id() == peripheral_id;
}
return false;
description: "Failed to listen to device events".to_owned(),
})?;
let id = self.radio.id();
Ok(Box::pin(stream.filter_map(move |event| {
if let CentralEvent::DeviceDisconnected(peripheral_id) = event {
if id == peripheral_id {
return future::ready(Some(AdapterEvent::Disconnected));
}
}
future::ready(None)
})))
}
}
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ pub mod utils {
/// can also be used to list all available serial ports on the host machine.
pub mod stream {
pub use crate::utils_internal::available_serial_ports;
#[cfg(feature = "bluetooth-le")]
pub use crate::utils_internal::build_ble_stream;
pub use crate::utils_internal::build_serial_stream;
pub use crate::utils_internal::build_tcp_stream;
}
Expand Down
87 changes: 87 additions & 0 deletions src/utils_internal.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
#![allow(clippy::mut_range_bound)]

#[cfg(feature = "bluetooth-le")]
use crate::connections::ble_handler::{Ble, BleHandler};
use crate::errors_internal::Error;
#[cfg(feature = "bluetooth-le")]
use futures::stream::StreamExt;
use std::time::Duration;
use std::time::UNIX_EPOCH;

use rand::{distributions::Standard, prelude::Distribution, Rng};
#[cfg(feature = "bluetooth-le")]
use tokio::io::{AsyncReadExt, AsyncWriteExt, DuplexStream};
use tokio_serial::{available_ports, SerialPort, SerialStream};

use crate::connections::stream_api::StreamHandle;
Expand Down Expand Up @@ -194,6 +202,85 @@ pub async fn build_tcp_stream(
Ok(StreamHandle::from_stream(stream))
}

#[cfg(feature = "bluetooth-le")]
pub async fn build_ble_stream(name: String) -> Result<StreamHandle<DuplexStream>, Error> {
let ble_handler = BleHandler::new(name).await?;
build_ble_stream_impl(ble_handler).await
}

#[cfg(feature = "bluetooth-le")]
pub async fn build_ble_stream_impl<B: Ble + Send + 'static>(
ble_handler: B,
) -> Result<StreamHandle<DuplexStream>, Error> {
use crate::{connections::ble_handler::AdapterEvent, errors_internal::InternalStreamError};
// `client` will be returned by this function, server is the opposite end of the channel and
// it's directly connected to a `BleHandler`.
let (client, mut server) = tokio::io::duplex(1024);
let handle = tokio::spawn(async move {
let duplex_write_error_fn = |e| {
Error::InternalStreamError(InternalStreamError::StreamWriteError {
source: Box::new(e),
})
};
let mut read_messages_count = ble_handler.read_fromnum().await?;

let mut buf = [0u8; 1024];
if let Ok(len) = server.read(&mut buf).await {
ble_handler.write_to_radio(&buf[..len]).await?
}
while let Ok(msg) = ble_handler.read_from_radio().await {
if msg.is_empty() {
break;
}
let msg = format_data_packet(msg.into())?;
server
.write(msg.data())
.await
.map_err(duplex_write_error_fn)?;
}

let mut notification_stream = ble_handler.notifications().await?;
let mut adapter_events = ble_handler.adapter_events().await?;
loop {
// Note: the following `tokio::select` is only half-duplex on the BLE radio. While we
// are reading from the radio, we are not writing to it and vice versa. However, BLE is
// a half-duplex technology, so we wouldn't gain much with a full duplex solution
// anyway.
tokio::select!(
// Data from device, forward it to the user
notification = notification_stream.next() => {
if let Some(avail_msg_count) = notification {
for _ in read_messages_count..avail_msg_count {
let radio_msg = ble_handler.read_from_radio().await?;
let msg = format_data_packet(radio_msg.into())?;
server.write(msg.data()).await.map_err(duplex_write_error_fn)?;
read_messages_count += 1;
}
} else {
// TODO: failed to parse notification. Should we log it? Return error to user?
}
},
// Data from user, forward it to the device
from_server = server.read(&mut buf) => {
let len = from_server.map_err(duplex_write_error_fn)?;
ble_handler.write_to_radio(&buf[..len]).await?;
},
event = adapter_events.next() => {
if Some(AdapterEvent::Disconnected) == event {
println!("Disconnected");
return Err(Error::InternalStreamError(InternalStreamError::ConnectionLost))
}
}
);
}
});

Ok(StreamHandle {
stream: client,
join_handle: Some(handle),
})
}

/// A helper method to generate random numbers using the `rand` crate.
///
/// This method is intended to be used to generate random id values. This method
Expand Down

0 comments on commit c039e86

Please sign in to comment.