diff --git a/host/src/channel_manager.rs b/host/src/channel_manager.rs index 9fe5f7ea..be6ba50e 100644 --- a/host/src/channel_manager.rs +++ b/host/src/channel_manager.rs @@ -10,7 +10,7 @@ use embassy_sync::channel::Channel; use embassy_sync::waitqueue::WakerRegistration; use crate::cursor::WriteCursor; -use crate::host::{AclSender, BleHost}; +use crate::host::BleHost; use crate::l2cap::L2capChannel; use crate::packet_pool::{Packet, Pool}; use crate::pdu::Pdu; @@ -204,8 +204,8 @@ impl<'d> ChannelManager<'d> { let mut tx = [0; 18]; // Respond that we accept the channel. - let mut hci = ble.acl(conn, 1).await?; - hci.signal( + ble.l2cap_signal( + conn, req_id, &LeCreditConnRes { mps, @@ -254,8 +254,7 @@ impl<'d> ChannelManager<'d> { mtu, credits, }; - let mut hci = ble.acl(conn, 1).await?; - hci.signal(req_id, &command, &mut tx[..]).await?; + ble.l2cap_signal(conn, req_id, &command, &mut tx[..]).await?; // Wait until a response is accepted. poll_fn(|cx| self.poll_created(conn, idx, ble, Some(cx))).await @@ -543,28 +542,32 @@ impl<'d> ChannelManager<'d> { ) -> Result<(), BleHostError> { let (conn, mps, peer_cid) = self.connected_channel_params(index)?; // The number of packets we'll need to send for this payload - let n_packets = 1 + ((buf.len() as u16).saturating_sub(mps - 2)).div_ceil(mps); + let len = (buf.len() as u16).saturating_add(2); + let n_packets = len.div_ceil(mps); let mut grant = poll_fn(|cx| self.poll_request_to_send(index, n_packets, Some(cx))).await?; - let mut hci = ble.acl(conn, n_packets).await?; + + let mut sender = ble.l2cap(conn, len, n_packets).await?; // Segment using mps let (first, remaining) = buf.split_at(buf.len().min(mps as usize - 2)); let len = encode(first, &mut p_buf[..], peer_cid, Some(buf.len() as u16))?; - hci.send(&p_buf[..len], true).await?; + sender.send(&p_buf[..len]).await?; grant.confirm(1); let chunks = remaining.chunks(mps as usize); for chunk in chunks { let len = encode(chunk, &mut p_buf[..], peer_cid, None)?; - hci.send(&p_buf[..len], true).await?; + sender.send(&p_buf[..len]).await?; grant.confirm(1); } Ok(()) } + // Number of fragments + /// Send the provided buffer over a given l2cap channel. /// /// The buffer will be segmented to the maximum payload size agreed in the opening handshake. @@ -580,7 +583,8 @@ impl<'d> ChannelManager<'d> { let (conn, mps, peer_cid) = self.connected_channel_params(index)?; // The number of packets we'll need to send for this payload - let n_packets = ((buf.len() as u16).saturating_add(2)).div_ceil(mps); + let len = (buf.len() as u16).saturating_add(2); + let n_packets = len.div_ceil(mps); let mut grant = match self.poll_request_to_send(index, n_packets, None) { Poll::Ready(res) => res?, @@ -589,13 +593,14 @@ impl<'d> ChannelManager<'d> { } }; - let mut hci = ble.try_acl(conn, n_packets)?; + // Pre-request + let mut sender = ble.try_l2cap(conn, len, n_packets)?; // Segment using mps let (first, remaining) = buf.split_at(buf.len().min(mps as usize - 2)); let len = encode(first, &mut p_buf[..], peer_cid, Some(buf.len() as u16))?; - hci.try_send(&p_buf[..len], true)?; + sender.try_send(&p_buf[..len])?; grant.confirm(1); let chunks = remaining.chunks(mps as usize); @@ -603,7 +608,7 @@ impl<'d> ChannelManager<'d> { for (i, chunk) in chunks.enumerate() { let len = encode(chunk, &mut p_buf[..], peer_cid, None)?; - hci.try_send(&p_buf[..len], true)?; + sender.try_send(&p_buf[..len])?; grant.confirm(1); } Ok(()) @@ -641,8 +646,7 @@ impl<'d> ChannelManager<'d> { let signal = LeCreditFlowInd { cid, credits }; // Reuse packet buffer for signalling data to save the extra TX buffer - let mut hci = ble.acl(conn, 1).await?; - hci.signal(identifier, &signal, packet.as_mut()).await?; + ble.l2cap_signal(conn, identifier, &signal, packet.as_mut()).await?; self.with_mut(|state| { let chan = &mut state.channels[index.0 as usize]; if chan.state == ChannelState::Connected { @@ -745,7 +749,7 @@ impl<'a, 'd> DisconnectRequest<'a, 'd> { self.handle } - pub async fn send(&self, hci: &mut AclSender<'a, 'd, T>) -> Result<(), BleHostError> { + pub async fn send(&self, host: &BleHost<'_, T>) -> Result<(), BleHostError> { let (state, conn, identifier, dcid, scid) = { let mut state = self.state.borrow_mut(); let identifier = state.next_request_id(); @@ -757,12 +761,12 @@ impl<'a, 'd> DisconnectRequest<'a, 'd> { match state { ChannelState::PeerDisconnecting => { assert_eq!(Some(self.handle), conn); - hci.signal(identifier, &DisconnectionRes { dcid, scid }, &mut tx[..]) + host.l2cap_signal(self.handle, identifier, &DisconnectionRes { dcid, scid }, &mut tx[..]) .await?; } ChannelState::Disconnecting => { assert_eq!(Some(self.handle), conn); - hci.signal(identifier, &DisconnectionReq { dcid, scid }, &mut tx[..]) + host.l2cap_signal(self.handle, identifier, &DisconnectionReq { dcid, scid }, &mut tx[..]) .await?; } _ => {} @@ -1043,4 +1047,7 @@ mod tests { Poll::Ready(Err(BleHostError::BleHost(Error::Disconnected))) )); } + + #[test] + fn channel_fragmentation() {} } diff --git a/host/src/gatt.rs b/host/src/gatt.rs index 804ca348..b4d3e771 100644 --- a/host/src/gatt.rs +++ b/host/src/gatt.rs @@ -373,8 +373,12 @@ impl<'reference, T: Controller, const MAX_SERVICES: usize, const L2CAP_MTU: usiz w.write_hci(&header)?; w.write(req)?; - let mut grant = self.stack.host.acl(self.connection.handle(), 1).await?; - grant.send(w.finish(), true).await?; + let mut grant = self + .stack + .host + .l2cap(self.connection.handle(), w.len() as u16, 1) + .await?; + grant.send(w.finish()).await?; let (h, pdu) = self.response_channel.receive().await; @@ -399,9 +403,8 @@ impl<'reference, C: Controller, const MAX_SERVICES: usize, const L2CAP_MTU: usiz mtu: L2CAP_MTU as u16 - 4, })?; - let mut grant = stack.host.acl(connection.handle(), 1).await?; - - grant.send(w.finish(), true).await?; + let mut grant = stack.host.l2cap(connection.handle(), w.len() as u16, 1).await?; + grant.send(w.finish()).await?; Ok(Self { known_services: RefCell::new(heapless::Vec::new()), diff --git a/host/src/host.rs b/host/src/host.rs index 3a6e92e5..8134d270 100644 --- a/host/src/host.rs +++ b/host/src/host.rs @@ -414,28 +414,87 @@ where Ok(()) } - // Request to send n ACL packets to the HCI controller for a connection - pub(crate) async fn acl(&self, handle: ConnHandle, n: u16) -> Result, BleHostError> { - let grant = poll_fn(|cx| self.connections.poll_request_to_send(handle, n as usize, Some(cx))).await?; - Ok(AclSender { + // Send l2cap signal payload + pub(crate) async fn l2cap_signal( + &self, + conn: ConnHandle, + identifier: u8, + signal: &D, + p_buf: &mut [u8], + ) -> Result<(), BleHostError> { + //trace!( + // "[l2cap] sending control signal (req = {}) signal: {:?}", + // identifier, + // signal + //); + let header = L2capSignalHeader { + identifier, + code: D::code(), + length: signal.size() as u16, + }; + let l2cap = L2capHeader { + channel: D::channel(), + length: header.size() as u16 + header.length, + }; + + let mut w = WriteCursor::new(p_buf); + w.write_hci(&l2cap)?; + w.write_hci(&header)?; + w.write_hci(signal)?; + + let mut sender = self.l2cap(conn, w.len() as u16, 1).await?; + sender.send(w.finish()).await?; + + Ok(()) + } + + // Request to an L2CAP payload of len to the HCI controller for a connection. + // + // This function will request the appropriate number of ACL packets to be sent and + // the returned sender will handle fragmentation. + pub(crate) async fn l2cap( + &self, + handle: ConnHandle, + len: u16, + n_packets: u16, + ) -> Result, BleHostError> { + // Take into account l2cap header. + let acl_max = self.initialized.get().await.acl_max as u16; + let len = len + (4 * n_packets); + let n_acl = len.div_ceil(acl_max); + let grant = poll_fn(|cx| self.connections.poll_request_to_send(handle, n_acl as usize, Some(cx))).await?; + Ok(L2capSender { controller: &self.controller, handle, grant, + fragment_size: acl_max, }) } - // Request to send n ACL packets to the HCI controller for a connection - pub(crate) fn try_acl(&self, handle: ConnHandle, n: u16) -> Result, BleHostError> { - let grant = match self.connections.poll_request_to_send(handle, n as usize, None) { + // Request to an L2CAP payload of len to the HCI controller for a connection. + // + // This function will request the appropriate number of ACL packets to be sent and + // the returned sender will handle fragmentation. + pub(crate) fn try_l2cap( + &self, + handle: ConnHandle, + len: u16, + n_packets: u16, + ) -> Result, BleHostError> { + let acl_max = self.initialized.try_get().map(|i| i.acl_max).unwrap_or(27) as u16; + let len = len + (4 * n_packets); + let n_acl = len.div_ceil(acl_max); + let grant = match self.connections.poll_request_to_send(handle, n_acl as usize, None) { Poll::Ready(res) => res?, Poll::Pending => { return Err(Error::Busy.into()); } }; - Ok(AclSender { + Ok(L2capSender { controller: &self.controller, handle, grant, + fragment_size: acl_max, }) } @@ -864,8 +923,7 @@ impl<'d, C: Controller> ControlRunner<'d, C> { } Either3::Second(request) => { trace!("[host] poll disconnecting channels"); - let mut grant = host.acl(request.handle(), 1).await?; - request.send(&mut grant).await?; + request.send(host).await?; request.confirm(); } Either3::Third(states) => match states { @@ -915,109 +973,65 @@ impl<'d, C: Controller> TxRunner<'d, C> { let params = host.initialized.get().await; loop { let (conn, pdu) = host.connections.outbound().await; - let mut first = true; - for chunk in pdu.as_ref().chunks(params.acl_max) { - match host.acl(conn, 1).await { - Ok(mut sender) => { - if let Err(e) = sender.send(chunk, first).await { - warn!("[host] error sending outbound pdu"); - return Err(e); - } - first = false; - } - Err(e) => { - warn!("[host] error requesting sending outbound pdu"); + match host.l2cap(conn, pdu.len as u16, 1).await { + Ok(mut sender) => { + if let Err(e) = sender.send(pdu.as_ref()).await { + warn!("[host] error sending outbound pdu"); return Err(e); } } + Err(e) => { + warn!("[host] error requesting sending outbound pdu"); + return Err(e); + } } } } } -pub struct AclSender<'a, 'd, T: Controller> { +pub struct L2capSender<'a, 'd, T: Controller> { pub(crate) controller: &'a T, pub(crate) handle: ConnHandle, pub(crate) grant: PacketGrant<'a, 'd>, + pub(crate) fragment_size: u16, } -impl<'a, 'd, T: Controller> AclSender<'a, 'd, T> { - pub(crate) fn try_send(&mut self, pdu: &[u8], first: bool) -> Result<(), BleHostError> +impl<'a, 'd, T: Controller> L2capSender<'a, 'd, T> { + pub(crate) fn try_send(&mut self, pdu: &[u8]) -> Result<(), BleHostError> where T: blocking::Controller, { - let acl = AclPacket::new( - self.handle, - if first { - AclPacketBoundary::FirstNonFlushable - } else { - AclPacketBoundary::Continuing - }, - AclBroadcastFlag::PointToPoint, - pdu, - ); - // info!("Sent ACL {:?}", acl); - match self.controller.try_write_acl_data(&acl) { - Ok(result) => { - self.grant.confirm(1); - Ok(result) - } - Err(blocking::TryError::Busy) => { - warn!("hci: acl data send busy"); - Err(Error::Busy.into()) + let mut pbf = AclPacketBoundary::FirstNonFlushable; + for chunk in pdu.chunks(self.fragment_size as usize) { + let acl = AclPacket::new(self.handle, pbf, AclBroadcastFlag::PointToPoint, chunk); + // info!("Sent ACL {:?}", acl); + match self.controller.try_write_acl_data(&acl) { + Ok(result) => { + self.grant.confirm(1); + } + Err(blocking::TryError::Busy) => { + warn!("hci: acl data send busy"); + return Err(Error::Busy.into()); + } + Err(blocking::TryError::Error(e)) => return Err(BleHostError::Controller(e)), } - Err(blocking::TryError::Error(e)) => Err(BleHostError::Controller(e)), + pbf = AclPacketBoundary::Continuing; } - } - - pub(crate) async fn send(&mut self, pdu: &[u8], first: bool) -> Result<(), BleHostError> { - let acl = AclPacket::new( - self.handle, - if first { - AclPacketBoundary::FirstNonFlushable - } else { - AclPacketBoundary::Continuing - }, - AclBroadcastFlag::PointToPoint, - pdu, - ); - // info!("Sent ACL {:?}", acl); - self.controller - .write_acl_data(&acl) - .await - .map_err(BleHostError::Controller)?; - self.grant.confirm(1); Ok(()) } - pub(crate) async fn signal( - &mut self, - identifier: u8, - signal: &D, - p_buf: &mut [u8], - ) -> Result<(), BleHostError> { - //trace!( - // "[l2cap] sending control signal (req = {}) signal: {:?}", - // identifier, - // signal - //); - let header = L2capSignalHeader { - identifier, - code: D::code(), - length: signal.size() as u16, - }; - let l2cap = L2capHeader { - channel: D::channel(), - length: header.size() as u16 + header.length, - }; - - let mut w = WriteCursor::new(p_buf); - w.write_hci(&l2cap)?; - w.write_hci(&header)?; - w.write_hci(signal)?; - - self.send(w.finish(), true).await?; - + pub(crate) async fn send(&mut self, pdu: &[u8]) -> Result<(), BleHostError> { + let mut pbf = AclPacketBoundary::FirstNonFlushable; + for chunk in pdu.chunks(self.fragment_size as usize) { + let acl = AclPacket::new(self.handle, pbf, AclBroadcastFlag::PointToPoint, chunk); + // info!("Sent ACL {:?}", acl); + self.controller + .write_acl_data(&acl) + .await + .map_err(BleHostError::Controller)?; + self.grant.confirm(1); + pbf = AclPacketBoundary::Continuing; + } Ok(()) } }