Skip to content

Commit

Permalink
Add BLE using DuplexStream
Browse files Browse the repository at this point in the history
  • Loading branch information
lukipuki committed Sep 30, 2024
1 parent 50cac09 commit 8f5867b
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 3 deletions.
6 changes: 4 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ default = ["serde"]

serde = ["dep:serde", "dep:serde_json"]
ts-gen = ["serde", "dep:specta"]
bluetooth-le = ["dep:uuid","dep:btleplug"]
bluetooth-le = ["dep:uuid", "dep:btleplug", "dep:futures"]

[[example]]
name = "basic_serial"
Expand All @@ -33,7 +33,8 @@ name = "message_filtering"
[[example]]
name = "generate_typescript_types"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[[example]]
name = "basic_ble"

[build-dependencies]
prost-build = "0.11.1"
Expand All @@ -55,6 +56,7 @@ serde_json = { version = "1.0", optional = true }
thiserror = "1.0.48"
uuid = { version = "1.6.1", optional = true }
btleplug = { version = "0.11.5", optional = true }
futures = { version = "0.3.30", optional = true }

[dev-dependencies]
fern = { version = "0.6.2", features = ["colored"] }
Expand Down
43 changes: 43 additions & 0 deletions examples/basic_ble.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/// This example connects to a TCP port on the radio, and prints out all received packets.
/// This can be used with a simulated radio via the Meshtastic Docker firmware image.
/// https://meshtastic.org/docs/software/linux-native#usage-with-docker
extern crate meshtastic;

use std::io::{self, BufRead};

use meshtastic::api::StreamApi;
use meshtastic::utils;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let stream_api = StreamApi::new();

println!("Enter the MAC address of a BLE device to connect to:");

let stdin = io::stdin();
let entered_address = stdin
.lock()
.lines()
.next()
.expect("Failed to find next line")
.expect("Could not read next line");

let ble_stream = utils::stream::build_ble_stream(entered_address).await?;
let (mut decoded_listener, stream_api) = stream_api.connect(ble_stream).await;

let config_id = utils::generate_rand_id();
let stream_api = stream_api.configure(config_id).await?;

// This loop can be broken with ctrl+c, or by unpowering the radio.
while let Some(decoded) = decoded_listener.recv().await {
println!("Received: {:?}", decoded);
}

// Note that in this specific example, this will only be called when
// the radio is disconnected, as the above loop will never exit.
// Typically you would allow the user to manually kill the loop,
// for example with tokio::select!.
let _stream_api = stream_api.disconnect().await?;

Ok(())
}
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ pub mod utils {
/// can also be used to list all available serial ports on the host machine.
pub mod stream {
pub use crate::utils_internal::available_serial_ports;
#[cfg(feature = "bluetooth-le")]
pub use crate::utils_internal::build_ble_stream;
pub use crate::utils_internal::build_serial_stream;
pub use crate::utils_internal::build_tcp_stream;
}
Expand Down
92 changes: 91 additions & 1 deletion src/utils_internal.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
use crate::errors_internal::Error;
#[cfg(feature = "bluetooth-le")]
use crate::connections::ble_handler::BleHandler;
use crate::errors_internal::{Error, InternalStreamError};
#[cfg(feature = "bluetooth-le")]
use futures::stream::StreamExt;
#[cfg(feature = "bluetooth-le")]
use std::pin::pin;
use std::time::Duration;
use std::time::UNIX_EPOCH;

use rand::{distributions::Standard, prelude::Distribution, Rng};
#[cfg(feature = "bluetooth-le")]
use tokio::io::{AsyncReadExt, AsyncWriteExt, DuplexStream};
use tokio_serial::{available_ports, SerialPort, SerialStream};

use crate::connections::stream_api::StreamHandle;
Expand Down Expand Up @@ -194,6 +202,88 @@ pub async fn build_tcp_stream(
Ok(StreamHandle::from_stream(stream))
}

#[cfg(feature = "bluetooth-le")]
pub async fn build_ble_stream(name: String) -> Result<StreamHandle<DuplexStream>, Error> {
let ble_handler = BleHandler::new(name).await?;

let (client, mut server) = tokio::io::duplex(1024);
let handle = tokio::spawn(async move {
let duplex_write_error_fn = |e| {
Error::InternalStreamError(InternalStreamError::StreamWriteError {
source: Box::new(e),
})
};
let mut messages_read = ble_handler.read_fromnum().await?;

let mut buf = [0u8; 1024];
if let Ok(len) = server.read(&mut buf).await {
ble_handler.write_to_radio(&buf[..len]).await?
}
while let Ok(msg) = ble_handler.read_from_radio().await {
if msg.len() == 0 {
break;
}
let msg = format_data_packet(msg.into())?;
server
.write(msg.data())
.await
.map_err(duplex_write_error_fn)?;
}

let mut notification_stream = pin!(ble_handler
.notifications()
.await?
.filter_map(BleHandler::filter_map));
let mut adapter_events = ble_handler.adapter_events().await?;
loop {
// Note: the following `tokio::select` is only half-duplex on the BLE radio. While we
// are reading from the radio, we are not writing to it and vice versa. However, BLE is
// a half-duplex technology, so we wouldn't gain much with a full duplex solution
// anyway.
tokio::select!(
notification = notification_stream.next() => {
if let Some(msg_count) = notification {
for _ in messages_read..msg_count {
let radio_msg = ble_handler.read_from_radio().await;
match radio_msg {
Ok(msg) => {
let msg = format_data_packet(msg.into())?;
server.write(msg.data()).await.map_err(duplex_write_error_fn)?;
messages_read += 1;
},
Err(e) => return Err(e),
}
}
}
},
from_server = server.read(&mut buf) => {
match from_server {
Ok(len) => ble_handler.write_to_radio(&buf[..len]).await?,
Err(e) => { // Irrecoverable
return Err(Error::InternalStreamError(
InternalStreamError::StreamWriteError {
source: Box::new(e),
},
));
}
}
},
event = adapter_events.next() => {
if ble_handler.is_disconnected_event(event) {
println!("Disconnected");
return Err(Error::InternalStreamError(InternalStreamError::ConnectionLost))
}
}
);
}
});

Ok(StreamHandle {
stream: client,
join_handle: Some(handle),
})
}

/// A helper method to generate random numbers using the `rand` crate.
///
/// This method is intended to be used to generate random id values. This method
Expand Down

0 comments on commit 8f5867b

Please sign in to comment.