Skip to content

Commit

Permalink
fix subscription error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
akiroz committed Jan 12, 2024
1 parent 272722c commit 9649b6f
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 27 deletions.
39 changes: 22 additions & 17 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,39 +152,44 @@ impl Client {
&mut self,
topic: String,
message: Bytes,
) -> Result<(), etherparse::WriteError> {
let parsed = Ipv4Header::from_slice(&message);
for tunnel in self.tunnels.as_ref() {
if tunnel.topic != topic {
continue;
}
let packet_with_updated_header = match parsed {
) -> Result<bool, etherparse::WriteError> {
if let Some(tunnel) = self.tunnels.as_ref().iter().find(|&t| t.topic == topic) {
match Ipv4Header::from_slice(&message) {
Err(error) => {
log::debug!("packet parse failed {:?}", error);
message.to_vec()
Ok(false)
}
Ok((mut ipv4_header, rest)) => {
ipv4_header.source = tunnel.bind_addr.octets();
ipv4_header.destination = self.local_addr.octets();
let mut cursor = Cursor::new(Vec::new());
ipv4_header.write(&mut cursor)?;
cursor.write_all(rest)?;
cursor.into_inner()
self.sink.send(TunPacket::new(cursor.into_inner())).await?;
Ok(true)
}
};
self.sink
.send(TunPacket::new(packet_with_updated_header))
.await?;
break;
}
} else {
Ok(false)
}
Ok(())
}

pub async fn inject_message(&mut self) {
// TODO
}

pub async fn run(&mut self) {
loop {
if let Some((topic, message)) = self.remote_receiver.recv().await {
if let Err(err) = self.handle_remote_message(topic, message).await {
log::error!("handle_remote_message error {:?}", err);
match self.handle_remote_message(topic, message).await {
Err(err) => {
log::error!("handle_remote_message error {:?}", err);
}
Ok(handled) => {
if !handled {
// TODO: pass outside
}
}
}
} else {
break;
Expand Down
25 changes: 15 additions & 10 deletions src/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,18 +119,23 @@ impl Remote {

pub async fn subscribe(&self, topic: String) -> Result<(), mqtt::ClientError> {
let mut subs = self.subs.lock().await;
if subs.contains(&topic) {
return Ok(());
};
if subs.contains(&topic) { return Ok(()); };
subs.push(topic.clone()); // Ensure sub on reconnect
self.subscribe_ephemeral(topic).await
}

pub async fn subscribe_ephemeral(&self, topic: String) -> Result<(), mqtt::ClientError> {
for (idx, client) in self.clients.iter().enumerate() {
let res = client.mqttc.subscribe(topic.clone(), QoS::AtMostOnce).await;
if res.is_ok() {
subs.push(topic);
return Ok(());
}
if idx == self.clients.len() - 1 {
return res;
}
if !res.is_ok() || idx == self.clients.len() - 1 { return res }
}
unreachable!()
}

pub async fn unsubscribe(&self, topic: String) -> Result<(), mqtt::ClientError> {
for (idx, client) in self.clients.iter().enumerate() {
let res = client.mqttc.unsubscribe(topic.clone()).await;
if !res.is_ok() || idx == self.clients.len() - 1 { return res }
}
unreachable!()
}
Expand Down

0 comments on commit 9649b6f

Please sign in to comment.