diff --git a/README.md b/README.md index b8ce826..f8e94d1 100644 --- a/README.md +++ b/README.md @@ -13,4 +13,9 @@ runs forever attempting to maintain a connection to the MQTT broker. It includes automatically subscribing to some topics when connected to the broker and registering a last will message to be published if the connection to the broker it lost. -Has some basic support for the different MQTT QoS levels. +The different MQTT QoS levels are supported to a certain extent. In most cases a QoS of 0 is used by +default which means your message may never make it to the broker. This is particularly true in the +case where the network is disconnected or the broker is unreachable in which case you will get no +error or other warning after publishing a message. If you need that you can set a higher QoS level. +This crate will not automatically re-send messages that fail to be delivered however you will get an +error if the broker does not acknowledge messages within a certain time (currently 2 seconds). diff --git a/src/buffer.rs b/src/buffer.rs index a36d167..b721e19 100644 --- a/src/buffer.rs +++ b/src/buffer.rs @@ -72,11 +72,6 @@ impl Buffer { pub fn available(&self) -> usize { N - self.cursor } - - /// Resets the buffer discarding any previously written bytes. - pub(crate) fn reset(&mut self) { - self.cursor = 0; - } } impl Deref for Buffer { diff --git a/src/homeassistant/light.rs b/src/homeassistant/light.rs index 1f9b60d..47cd4f1 100644 --- a/src/homeassistant/light.rs +++ b/src/homeassistant/light.rs @@ -4,11 +4,9 @@ use core::{ops::Deref, str}; use serde::{ser::SerializeStruct, Deserialize, Serialize, Serializer}; use crate::{ + fmt::Debug2Format, homeassistant::{binary_sensor::BinarySensorState, ser::List, Component}, - Error, - Payload, - Publishable, - Topic, + Error, Payload, Publishable, Topic, }; #[derive(Serialize)] @@ -129,7 +127,7 @@ impl<'a> LightState<'a> { let parsed: LedPayload<'a> = match payload.deserialize_json() { Ok(p) => p, Err(e) => { - warn!("Failed to deserialize packet: {:?}", e); + warn!("Failed to deserialize packet: {:?}", Debug2Format(&e)); if let Ok(s) = str::from_utf8(payload) { trace!("{}", s); } diff --git a/src/io.rs b/src/io.rs index 688ad6d..4d68bd4 100644 --- a/src/io.rs +++ b/src/io.rs @@ -9,45 +9,21 @@ use embassy_net::{ }; use embassy_sync::{ blocking_mutex::raw::CriticalSectionRawMutex, - mutex::Mutex, pubsub::{PubSubChannel, Subscriber, WaitResult}, - signal::Signal, }; use embassy_time::Timer; use embedded_io_async::Write; use mqttrs::{ - decode_slice, - Connect, - ConnectReturnCode, - LastWill, - Packet, - Pid, - Protocol, - Publish, - QoS, - QosPid, + decode_slice, Connect, ConnectReturnCode, LastWill, Packet, Pid, Protocol, Publish, QoS, QosPid, }; use crate::{ - device_id, - fmt::Debug2Format, - Buffer, - ControlMessage, - Error, - MqttMessage, - Payload, - Publishable, - Topic, - TopicString, - CONFIRMATION_TIMEOUT, - DATA_CHANNEL, - DEFAULT_BACKOFF, + device_id, fmt::Debug2Format, queue::LossyQueue, ControlMessage, Error, MqttMessage, Payload, + Publishable, Topic, TopicString, CONFIRMATION_TIMEOUT, DATA_CHANNEL, DEFAULT_BACKOFF, RESET_BACKOFF, }; -static WRITE_BUFFER: Mutex> = Mutex::new(Buffer::new()); -static WRITE_PENDING: Signal = Signal::new(); -static WRITE_COMPLETE: Signal = Signal::new(); +static SEND_QUEUE: LossyQueue = LossyQueue::new(); pub(crate) static CONTROL_CHANNEL: PubSubChannel = PubSubChannel::new(); @@ -92,27 +68,19 @@ mod atomic16 { } } -pub(crate) async fn send_packet(packet: Packet<'_>) -> Result<(), Error> { - loop { - trace!("Waiting for data to be written"); - WRITE_COMPLETE.wait().await; - - { - let mut buffer = WRITE_BUFFER.lock().await; - trace!("Encoding packet"); +pub(crate) fn send_packet(packet: Packet<'_>) -> Result<(), Error> { + let mut buffer = Payload::new(); + trace!("Encoding packet"); - match buffer.encode_packet(&packet) { - Ok(()) => { - trace!("Signaling data ready"); - WRITE_PENDING.signal(()); - return Ok(()); - } - Err(mqttrs::Error::WriteZero) => {} - Err(_) => { - error!("Failed to send packet"); - return Err(Error::PacketError); - } - } + match buffer.encode_packet(&packet) { + Ok(()) => { + trace!("Sending packet"); + SEND_QUEUE.push(buffer); + Ok(()) + } + Err(_) => { + error!("Failed to send packet"); + Err(Error::PacketError) } } } @@ -174,7 +142,7 @@ pub(crate) async fn publish( payload, }); - send_packet(packet).await?; + send_packet(packet)?; if let Some(expected_pid) = pid { wait_for_publish(subscriber, expected_pid).await @@ -325,10 +293,10 @@ where match publish.qospid { mqttrs::QosPid::AtMostOnce => {} mqttrs::QosPid::AtLeastOnce(pid) => { - send_packet(Packet::Puback(pid)).await?; + send_packet(Packet::Puback(pid))?; } mqttrs::QosPid::ExactlyOnce(pid) => { - send_packet(Packet::Pubrec(pid)).await?; + send_packet(Packet::Pubrec(pid))?; } } } @@ -337,9 +305,9 @@ where } Packet::Pubrec(pid) => { controller.publish_immediate(ControlMessage::Published(pid)); - send_packet(Packet::Pubrel(pid)).await?; + send_packet(Packet::Pubrel(pid))?; } - Packet::Pubrel(pid) => send_packet(Packet::Pubrel(pid)).await?, + Packet::Pubrel(pid) => send_packet(Packet::Pubrel(pid))?, Packet::Pubcomp(_) => {} Packet::Suback(suback) => { @@ -378,76 +346,57 @@ where } async fn write_loop(&self, mut writer: TcpWriter<'_>) { - // Clear out any old data. - { - let mut buffer = WRITE_BUFFER.lock().await; - buffer.reset(); - WRITE_PENDING.reset(); - - let mut last_will_topic = TopicString::new(); - let mut last_will_payload = Payload::new(); - - let last_will = self.last_will.as_ref().and_then(|p| { - if p.write_topic(&mut last_will_topic).is_ok() - && p.write_payload(&mut last_will_payload).is_ok() - { - Some(LastWill { - topic: &last_will_topic, - message: &last_will_payload, - qos: p.qos(), - retain: p.retain(), - }) - } else { - None - } - }); - - // Send our connection request. - if buffer - .encode_packet(&Packet::Connect(Connect { - protocol: Protocol::MQTT311, - keep_alive: 60, - client_id: device_id(), - clean_session: true, - last_will, - username: self.username, - password: self.password.map(|s| s.as_bytes()), - })) - .is_err() - { - error!("Failed to encode connection packet"); - return; - } + let mut buffer = Payload::new(); - if let Err(e) = writer.write_all(&buffer).await { - error!("Failed to send connection packet: {:?}", e); - return; - } + let mut last_will_topic = TopicString::new(); + let mut last_will_payload = Payload::new(); - buffer.reset(); + let last_will = self.last_will.as_ref().and_then(|p| { + if p.write_topic(&mut last_will_topic).is_ok() + && p.write_payload(&mut last_will_payload).is_ok() + { + Some(LastWill { + topic: &last_will_topic, + message: &last_will_payload, + qos: p.qos(), + retain: p.retain(), + }) + } else { + None + } + }); + + // Send our connection request. + if buffer + .encode_packet(&Packet::Connect(Connect { + protocol: Protocol::MQTT311, + keep_alive: 60, + client_id: device_id(), + clean_session: true, + last_will, + username: self.username, + password: self.password.map(|s| s.as_bytes()), + })) + .is_err() + { + error!("Failed to encode connection packet"); + return; + } - WRITE_COMPLETE.signal(()); + if let Err(e) = writer.write_all(&buffer).await { + error!("Failed to send connection packet: {:?}", e); + return; } loop { trace!("Writer waiting for data"); - WRITE_PENDING.wait().await; + let buffer = SEND_QUEUE.pop().await; - { - let mut buffer = WRITE_BUFFER.lock().await; - WRITE_PENDING.reset(); - trace!("Writer locked data"); - - if let Err(e) = writer.write_all(&buffer).await { - error!("Failed to send data: {:?}", e); - return; - } - - buffer.reset(); + trace!("Writer sending data"); + if let Err(e) = writer.write_all(&buffer).await { + error!("Failed to send data: {:?}", e); + return; } - - trace!("Writer signaling completion"); - WRITE_COMPLETE.signal(()); } } @@ -505,7 +454,7 @@ where loop { Timer::after_secs(45).await; - let _ = send_packet(Packet::Pingreq).await; + let _ = send_packet(Packet::Pingreq); } }; diff --git a/src/lib.rs b/src/lib.rs index e8b7184..caf5bcd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -25,6 +25,7 @@ mod buffer; pub mod homeassistant; mod io; mod publish; +mod queue; mod topic; // This really needs to match that used by mqttrs. diff --git a/src/queue.rs b/src/queue.rs new file mode 100644 index 0000000..0c9fb96 --- /dev/null +++ b/src/queue.rs @@ -0,0 +1,80 @@ +use core::{ + cell::RefCell, + future::Future, + pin::Pin, + task::{Context, Poll, Waker}, +}; + +use embassy_sync::blocking_mutex::{raw::RawMutex, Mutex}; +use heapless::Deque; + +struct LossyQueueData { + receiver_waker: Option, + queue: Deque, +} + +pub(crate) struct ReceiveFuture<'a, M: RawMutex, T, const N: usize> { + pipe: &'a LossyQueue, +} + +impl Future for ReceiveFuture<'_, M, T, N> { + type Output = T; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.pipe.inner.lock(|cell| { + let mut inner = cell.borrow_mut(); + + if let Some(waker) = inner.receiver_waker.take() { + waker.wake(); + } + + if let Some(item) = inner.queue.pop_front() { + Poll::Ready(item) + } else { + inner.receiver_waker = Some(cx.waker().clone()); + Poll::Pending + } + }) + } +} + +/// A FIFO queue holding a fixed number of items. Older items are dropped if the +/// queue is full when a new item is pushed. +pub(crate) struct LossyQueue { + inner: Mutex>>, +} + +impl LossyQueue { + pub(crate) const fn new() -> Self { + Self { + inner: Mutex::new(RefCell::new(LossyQueueData { + receiver_waker: None, + queue: Deque::new(), + })), + } + } + + /// A future that waits for a new item to be available. + pub(crate) fn pop(&self) -> ReceiveFuture<'_, M, T, N> { + ReceiveFuture { pipe: self } + } + + /// Pushes an item into the queue. If the queue is already full the oldest + /// item is dropped to make space. + pub(crate) fn push(&self, data: T) { + self.inner.lock(|cell| { + let mut inner = cell.borrow_mut(); + + if inner.queue.is_full() { + inner.queue.pop_front(); + } + + // As we pop above the queue cannot be full now. + let _ = inner.queue.push_back(data); + + if let Some(waker) = inner.receiver_waker.take() { + waker.wake(); + } + }) + } +} diff --git a/src/topic.rs b/src/topic.rs index eea263d..8f6ef74 100644 --- a/src/topic.rs +++ b/src/topic.rs @@ -9,14 +9,10 @@ use mqttrs::{Packet, QoS, Subscribe, SubscribeReturnCodes, SubscribeTopic, Unsub #[cfg(feature = "serde")] use crate::publish::PublishJson; use crate::{ - device_id, - device_type, + device_id, device_type, io::{assign_pid, send_packet, subscribe}, publish::{PublishBytes, PublishDisplay}, - ControlMessage, - Error, - TopicString, - CONFIRMATION_TIMEOUT, + ControlMessage, Error, TopicString, CONFIRMATION_TIMEOUT, }; /// An MQTT topic that is optionally prefixed with the device type and unique ID. @@ -189,7 +185,7 @@ impl> Topic { let packet = Packet::Subscribe(Subscribe { pid, topics }); - send_packet(packet).await?; + send_packet(packet)?; if wait_for_ack { match select( @@ -247,7 +243,7 @@ impl> Topic { let packet = Packet::Unsubscribe(Unsubscribe { pid, topics }); - send_packet(packet).await?; + send_packet(packet)?; if wait_for_ack { match select(