Skip to content

Commit

Permalink
extract disappearing messages worker to a separate file
Browse files Browse the repository at this point in the history
  • Loading branch information
mchenani committed Jan 21, 2025
1 parent 354b7e6 commit 1b11016
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 70 deletions.
73 changes: 3 additions & 70 deletions xmtp_mls/src/groups/device_sync.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::{GroupError, MlsGroup};
use crate::groups::disappearing_messages::DisappearingMessagesCleanerWorker;
#[cfg(any(test, feature = "test-utils"))]
pub use crate::utils::WorkerHandle;
use crate::{
Expand Down Expand Up @@ -107,8 +108,6 @@ pub enum DeviceSyncError {
Bincode(#[from] bincode::Error),
}

#[derive(Debug, Error)]
pub enum ExpirationWorkerError {}
impl RetryableError for DeviceSyncError {
fn is_retryable(&self) -> bool {
true
Expand Down Expand Up @@ -142,15 +141,15 @@ where
}

#[instrument(level = "trace", skip_all)]
pub fn start_expired_messages_cleaner_worker(&self) {
pub fn start_disappearing_messages_cleaner_worker(&self) {
let client = self.clone();
tracing::debug!(
inbox_id = client.inbox_id(),
installation_id = hex::encode(client.installation_public_key()),
"starting expired messages cleaner worker"
);

let worker = MessageExpirationWorker::new(client);
let worker = DisappearingMessagesCleanerWorker::new(client);
worker.spawn_worker();
}
}
Expand Down Expand Up @@ -376,72 +375,6 @@ where
}
}

pub struct MessageExpirationWorker<ApiClient, V> {
client: Client<ApiClient, V>,
init: OnceCell<()>,
}
impl<ApiClient, V> MessageExpirationWorker<ApiClient, V>
where
ApiClient: XmtpApi + Send + Sync + 'static,
V: SmartContractSignatureVerifier + Send + Sync + 'static,
{
fn new(client: Client<ApiClient, V>) -> Self {
Self {
client,
init: OnceCell::new(),
}
}
fn spawn_worker(mut self) {
crate::spawn(None, async move {
let inbox_id = self.client.inbox_id().to_string();
let installation_id = hex::encode(self.client.installation_public_key());
while let Err(err) = self.run().await {
tracing::info!("Running worker..");
match err {
DeviceSyncError::Client(ClientError::Storage(
StorageError::PoolNeedsConnection,
)) => {
tracing::warn!(
inbox_id,
installation_id,
"Pool disconnected. task will restart on reconnect"
);
break;
}
_ => {
tracing::error!(inbox_id, installation_id, "sync worker error {err}");
// Wait 2 seconds before restarting.
xmtp_common::time::sleep(Duration::from_secs(2)).await;
}
}
}
});
}
}

impl<ApiClient, V> MessageExpirationWorker<ApiClient, V>
where
ApiClient: XmtpApi + Send + Sync + 'static,
V: SmartContractSignatureVerifier + Send + Sync + 'static,
{
/// Iterate on the list of groups and delete expired messages
async fn delete_expired_messages(&mut self) -> Result<(), DeviceSyncError> {
let provider = self.client.mls_provider()?;
if let Err(e) = provider.conn_ref().delete_expired_messages() {
tracing::error!("Failed to delete expired messages, error: {:?}", e);
}
Ok(())
}

async fn run(&mut self) -> Result<(), DeviceSyncError> {
// Call delete_expired_messages on every iteration
if let Err(err) = self.delete_expired_messages().await {
tracing::error!("Error during deletion of expired messages: {:?}", err);
}
Ok(())
}
}

impl<ApiClient, V> Client<ApiClient, V>
where
ApiClient: XmtpApi,
Expand Down
81 changes: 81 additions & 0 deletions xmtp_mls/src/groups/disappearing_messages.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
use crate::client::ClientError;
use crate::storage::StorageError;
use crate::Client;
use std::time::Duration;
use thiserror::Error;
use tokio::sync::OnceCell;
use xmtp_id::scw_verifier::SmartContractSignatureVerifier;
use xmtp_proto::api_client::trait_impls::XmtpApi;

#[derive(Debug, Error)]
pub enum DisappearingMessagesCleanerError {
#[error("storage error: {0}")]
Storage(#[from] StorageError),
#[error("client error: {0}")]
Client(#[from] ClientError),
}

pub struct DisappearingMessagesCleanerWorker<ApiClient, V> {
client: Client<ApiClient, V>,
init: OnceCell<()>,
}
impl<ApiClient, V> DisappearingMessagesCleanerWorker<ApiClient, V>
where
ApiClient: XmtpApi + Send + Sync + 'static,
V: SmartContractSignatureVerifier + Send + Sync + 'static,
{
pub fn new(client: Client<ApiClient, V>) -> Self {
Self {
client,
init: OnceCell::new(),
}
}
pub(crate) fn spawn_worker(mut self) {
crate::spawn(None, async move {
let inbox_id = self.client.inbox_id().to_string();
let installation_id = hex::encode(self.client.installation_public_key());
while let Err(err) = self.run().await {
tracing::info!("Running worker..");
match err {
DisappearingMessagesCleanerError::Client(ClientError::Storage(
StorageError::PoolNeedsConnection,
)) => {
tracing::warn!(
inbox_id,
installation_id,
"Pool disconnected. task will restart on reconnect"
);
break;
}
_ => {
tracing::error!(inbox_id, installation_id, "sync worker error {err}");
// Wait 2 seconds before restarting.
xmtp_common::time::sleep(Duration::from_secs(2)).await;
}
}
}
});
}
}

impl<ApiClient, V> DisappearingMessagesCleanerWorker<ApiClient, V>
where
ApiClient: XmtpApi + Send + Sync + 'static,
V: SmartContractSignatureVerifier + Send + Sync + 'static,
{
/// Iterate on the list of groups and delete expired messages
async fn delete_expired_messages(&mut self) -> Result<(), DisappearingMessagesCleanerError> {
let provider = self.client.mls_provider()?;
if let Err(e) = provider.conn_ref().delete_expired_messages() {
tracing::error!("Failed to delete expired messages, error: {:?}", e);
}
Ok(())
}

async fn run(&mut self) -> Result<(), DisappearingMessagesCleanerError> {
if let Err(err) = self.delete_expired_messages().await {
tracing::error!("Error during deletion of expired messages: {:?}", err);
}
Ok(())
}
}
1 change: 1 addition & 0 deletions xmtp_mls/src/groups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub mod intents;
pub mod members;
pub mod scoped_client;

mod disappearing_messages;
pub(super) mod mls_sync;
pub(super) mod subscriptions;
pub mod validated_commit;
Expand Down

0 comments on commit 1b11016

Please sign in to comment.