Skip to content

Commit

Permalink
probably better
Browse files Browse the repository at this point in the history
  • Loading branch information
codabrink committed Jan 27, 2025
1 parent 7ec6825 commit 192952c
Showing 1 changed file with 77 additions and 31 deletions.
108 changes: 77 additions & 31 deletions xmtp_mls/src/groups/mls_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -516,42 +516,99 @@ where
}

#[tracing::instrument(level = "trace", skip_all)]
async fn process_external_message(
async fn validate_and_process_external_message(
&self,
provider: &XmtpOpenMlsProvider,
mls_group: &mut OpenMlsGroup,
message: PrivateMessageIn,
envelope: &GroupMessageV1,
cursor: Option<i64>,
) -> Result<(), GroupMessageProcessingError> {
let GroupMessageV1 {
created_ns: envelope_timestamp_ns,
id: ref msg_id,
..
} = *envelope;

let decrypted_message = mls_group.process_message(provider, message)?;
let processed_message = mls_group.process_message(provider, message)?;
let (sender_inbox_id, sender_installation_id) =
extract_message_sender(mls_group, &decrypted_message, envelope_timestamp_ns)?;
extract_message_sender(mls_group, &processed_message, envelope_timestamp_ns)?;

tracing::info!(
inbox_id = self.client.inbox_id(),
installation_id = %self.client.installation_id(),sender_inbox_id = sender_inbox_id,
sender_installation_id = hex::encode(&sender_installation_id),
group_id = hex::encode(&self.group_id),
current_epoch = mls_group.epoch().as_u64(),
msg_epoch = decrypted_message.epoch().as_u64(),
msg_group_id = hex::encode(decrypted_message.group_id().as_slice()),
msg_epoch = processed_message.epoch().as_u64(),
msg_group_id = hex::encode(processed_message.group_id().as_slice()),
msg_id,
"[{}] extracted sender inbox id: {}",
self.client.inbox_id(),
sender_inbox_id
);

let (msg_epoch, msg_group_id) = (
decrypted_message.epoch().as_u64(),
hex::encode(decrypted_message.group_id().as_slice()),
);
match decrypted_message.into_content() {
let validated_commit = match &processed_message.content() {
ProcessedMessageContent::StagedCommitMessage(staged_commit) => {
// Validate the commit
let validated_commit = ValidatedCommit::from_staged_commit(
self.client.as_ref(),
provider.conn_ref(),
&*staged_commit,
&mls_group,
)
.await?;

Some(validated_commit)
}
_ => None,
};

provider.transaction(|provider| {
if let Some(cursor) = cursor {
let is_updated = provider.conn_ref().update_cursor(
&envelope.group_id,
EntityKind::Group,
cursor,
)?;
if !is_updated {
return Err(ProcessIntentError::AlreadyProcessed(cursor as u64).into());
}
}

self.process_external_message(
provider,
mls_group,
processed_message,
envelope,
validated_commit,
)
})?;

Ok(())
}

#[tracing::instrument(level = "trace", skip_all)]
fn process_external_message(
&self,
provider: &XmtpOpenMlsProvider,
mls_group: &mut OpenMlsGroup,
processed_message: ProcessedMessage,
envelope: &GroupMessageV1,
validated_commit: Option<ValidatedCommit>,
) -> Result<(), GroupMessageProcessingError> {
let GroupMessageV1 {
created_ns: envelope_timestamp_ns,
id: ref msg_id,
..
} = *envelope;

let msg_epoch = processed_message.epoch().as_u64();
let msg_group_id = hex::encode(processed_message.group_id().as_slice());
let (sender_inbox_id, sender_installation_id) =
extract_message_sender(mls_group, &processed_message, envelope_timestamp_ns)?;

match processed_message.into_content() {
ProcessedMessageContent::ApplicationMessage(application_message) => {
tracing::info!(
inbox_id = self.client.inbox_id(),
Expand Down Expand Up @@ -698,6 +755,8 @@ where
}
ProcessedMessageContent::StagedCommitMessage(staged_commit) => {
let staged_commit = *staged_commit;
let validated_commit =
validated_commit.expect("Needs to be present when this is a staged commit");

tracing::info!(
inbox_id = self.client.inbox_id(),
Expand All @@ -712,14 +771,6 @@ where
self.context().inbox_id()
);

// Validate the commit
let validated_commit = ValidatedCommit::from_staged_commit(
self.client.as_ref(),
provider.conn_ref(),
&staged_commit,
&mls_group,
)
.await?;
tracing::info!(
inbox_id = self.client.inbox_id(),
sender_inbox_id = sender_inbox_id,
Expand Down Expand Up @@ -850,19 +901,14 @@ where
);

self.load_mls_group_with_lock_async(provider, |mut mls_group| async move {
if let Some(cursor) = cursor {
let is_updated = provider.conn_ref().update_cursor(
&envelope.group_id,
EntityKind::Group,
cursor,
)?;
if !is_updated {
return Err(ProcessIntentError::AlreadyProcessed(cursor as u64).into());
}
}

self.process_external_message(provider, &mut mls_group, message, envelope)
.await
self.validate_and_process_external_message(
provider,
&mut mls_group,
message,
envelope,
cursor,
)
.await
})
.await?;

Expand Down

0 comments on commit 192952c

Please sign in to comment.