Skip to content

Commit

Permalink
Make lapin_consumer withstand downtime of rabbitmq server
Browse files Browse the repository at this point in the history
  • Loading branch information
pkakelas authored and themicp committed Jan 30, 2024
1 parent efaf306 commit a624b81
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 17 deletions.
5 changes: 3 additions & 2 deletions relayer/src/bin/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@ async fn main() {
);

let consumer = LapinConsumer::new(
&config.sentinel_queue_addr,
config.sentinel_queue_addr.clone(),
config.sentinel_queue_name.clone(),
)
.await;
.await
.unwrap();

let mut relayer = Relayer::new(
config.clone(),
Expand Down
44 changes: 30 additions & 14 deletions relayer/src/consumers/lapin_consumer.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,32 @@
use std::time::Duration;

use super::Amqp;
use async_trait::async_trait;
use eyre::{eyre, Result};
use lapin::{
options::{BasicAckOptions, BasicGetOptions, BasicNackOptions},
Channel, Connection, ConnectionProperties,
};
use log::{debug, info};
use log::{debug, error, info};
use mockall::automock;

pub struct LapinConsumer {
queue_addr: String,
queue_name: String,
channel: Channel,
}

impl LapinConsumer {
pub async fn new(queue_addr: &str, queue_name: String) -> Self {
let connection = Connection::connect(queue_addr, ConnectionProperties::default())
.await
.unwrap();
let channel = connection.create_channel().await.unwrap();
pub async fn new(queue_addr: String, queue_name: String) -> Result<Self> {
let connection =
Connection::connect(queue_addr.as_str(), ConnectionProperties::default()).await?;
let channel = connection.create_channel().await?;

Self {
Ok(Self {
queue_addr,
queue_name,
channel,
}
})
}
}

Expand All @@ -36,17 +39,30 @@ impl Amqp for LapinConsumer {
while deliveries.len() < max_deliveries {
let message = self
.channel
.basic_get(self.queue_name.as_str(), BasicGetOptions::default())
.await?;
.basic_get(&self.queue_name, BasicGetOptions::default())
.await;

match message {
Some(message) => {
debug!("Got message: {:?}", message.delivery);
Ok(Some(message)) => {
println!("Got message: {:?}", message.delivery);
deliveries.push(message.delivery);
}
None => {
Ok(None) => {
debug!("Queue is empty");
break; // Queue is empty, break the loop
break;
}
Err(e) => {
error!("Connection is lost due to {:?}. Will retry in 10 secs", e);
tokio::time::sleep(Duration::from_secs(10)).await;

// Clear the deliveries vector, since the delivery tags are no longer valid
deliveries.clear();

let new_instance =
LapinConsumer::new(self.queue_addr.clone(), self.queue_name.clone()).await;
if let Ok(new_instance) = new_instance {
*self = new_instance;
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion relayer/src/relayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ impl<C: Amqp, P: ProverAPI, CR: EthBeaconAPI, ER: EthExecutionAPI, V: VerifierAP
return Ok(false);
}

debug!("STATE WOULD PASS");
info!("State is in the cache of the state_prover");
Ok(true)
}
}
Expand Down

0 comments on commit a624b81

Please sign in to comment.