diff --git a/bindings_ffi/src/mls.rs b/bindings_ffi/src/mls.rs index a9dfbdbc8..3312b9be7 100644 --- a/bindings_ffi/src/mls.rs +++ b/bindings_ffi/src/mls.rs @@ -19,6 +19,7 @@ use xmtp_id::{ InboxId, }; use xmtp_mls::groups::device_sync::preference_sync::UserPreferenceUpdate; +use xmtp_mls::groups::group_mutable_metadata::MessageDisappearingSettings; use xmtp_mls::groups::scoped_client::LocalScopedGroupClient; use xmtp_mls::groups::HmacKey; use xmtp_mls::storage::group::ConversationType; @@ -789,7 +790,7 @@ pub struct FfiPermissionPolicySet { pub update_group_description_policy: FfiPermissionPolicy, pub update_group_image_url_square_policy: FfiPermissionPolicy, pub update_group_pinned_frame_url_policy: FfiPermissionPolicy, - pub update_message_expiration_ms_policy: FfiPermissionPolicy, + pub update_message_disappearing_policy: FfiPermissionPolicy, } impl From for FfiGroupPermissionsOptions { @@ -821,17 +822,17 @@ impl TryFrom for PolicySet { MetadataField::GroupPinnedFrameUrl.to_string(), policy_set.update_group_pinned_frame_url_policy.try_into()?, ); - // MessageExpirationFromMillis follows the same policy as MessageExpirationMillis + // MessageDisappearFromNS follows the same policy as MessageDisappearInNS metadata_permissions_map.insert( - MetadataField::MessageExpirationFromMillis.to_string(), + MetadataField::MessageDisappearFromNS.to_string(), policy_set - .update_message_expiration_ms_policy + .update_message_disappearing_policy .clone() .try_into()?, ); metadata_permissions_map.insert( - MetadataField::MessageExpirationMillis.to_string(), - policy_set.update_message_expiration_ms_policy.try_into()?, + MetadataField::MessageDisappearInNS.to_string(), + policy_set.update_message_disappearing_policy.try_into()?, ); Ok(PolicySet { @@ -1297,6 +1298,30 @@ impl FfiConversationListItem { } } +/// Settings for disappearing messages in a conversation. +/// +/// # Fields +/// +/// * `from_ns` - The timestamp (in nanoseconds) from when messages should be tracked for deletion. +/// * `in_ns` - The duration (in nanoseconds) after which tracked messages will be deleted. +#[derive(uniffi::Record, Clone, Debug)] +pub struct FfiMessageDisappearingSettings { + pub from_ns: i64, + pub in_ns: i64, +} + +impl FfiMessageDisappearingSettings { + fn new(from_ns: i64, in_ns: i64) -> Self { + Self { from_ns, in_ns } + } +} + +impl From for FfiMessageDisappearingSettings { + fn from(value: MessageDisappearingSettings) -> Self { + FfiMessageDisappearingSettings::new(value.from_ns, value.in_ns) + } +} + impl From> for FfiConversation { fn from(mls_group: MlsGroup) -> FfiConversation { FfiConversation { inner: mls_group } @@ -1407,6 +1432,12 @@ impl From for SortDirection { } } +impl From for MessageDisappearingSettings { + fn from(settings: FfiMessageDisappearingSettings) -> Self { + MessageDisappearingSettings::new(settings.from_ns, settings.in_ns) + } +} + #[derive(uniffi::Record, Clone, Default)] pub struct FfiListMessagesOptions { pub sent_before_ns: Option, @@ -1456,8 +1487,7 @@ pub struct FfiCreateGroupOptions { pub group_description: Option, pub group_pinned_frame_url: Option, pub custom_permission_policy_set: Option, - pub message_expiration_from_ms: Option, - pub message_expiration_ms: Option, + pub message_disappearing_settings: Option, } impl FfiCreateGroupOptions { @@ -1467,8 +1497,9 @@ impl FfiCreateGroupOptions { image_url_square: self.group_image_url_square, description: self.group_description, pinned_frame_url: self.group_pinned_frame_url, - message_expiration_from_ms: self.message_expiration_from_ms, - message_expiration_ms: self.message_expiration_ms, + message_disappearing_settings: self + .message_disappearing_settings + .map(|settings| settings.into()), } } } @@ -1699,6 +1730,43 @@ impl FfiConversation { .map_err(Into::into) } + pub async fn update_conversation_message_disappearing_settings( + &self, + settings: FfiMessageDisappearingSettings, + ) -> Result<(), GenericError> { + self.inner + .update_conversation_message_disappearing_settings(MessageDisappearingSettings::from( + settings, + )) + .await?; + + Ok(()) + } + + pub async fn remove_conversation_message_disappearing_settings( + &self, + ) -> Result<(), GenericError> { + self.inner + .remove_conversation_message_disappearing_settings() + .await?; + + Ok(()) + } + + pub fn conversation_message_disappearing_settings( + &self, + ) -> Result { + let provider = self.inner.mls_provider()?; + let group_message_expiration_settings = self + .inner + .conversation_message_disappearing_settings(&provider)?; + + Ok(FfiMessageDisappearingSettings::new( + group_message_expiration_settings.from_ns, + group_message_expiration_settings.in_ns, + )) + } + pub fn admin_list(&self) -> Result, GenericError> { let provider = self.inner.mls_provider()?; self.inner.admin_list(&provider).map_err(Into::into) @@ -2218,8 +2286,8 @@ impl FfiGroupPermissions { update_group_pinned_frame_url_policy: get_policy( MetadataField::GroupPinnedFrameUrl.as_str(), ), - update_message_expiration_ms_policy: get_policy( - MetadataField::MessageExpirationMillis.as_str(), + update_message_disappearing_policy: get_policy( + MetadataField::MessageDisappearInNS.as_str(), ), }) } @@ -2236,9 +2304,10 @@ mod tests { inbox_owner::SigningError, FfiConsent, FfiConsentEntityType, FfiConsentState, FfiContentType, FfiConversation, FfiConversationCallback, FfiConversationMessageKind, FfiCreateGroupOptions, FfiDirection, FfiGroupPermissionsOptions, FfiInboxOwner, - FfiListConversationsOptions, FfiListMessagesOptions, FfiMessageWithReactions, - FfiMetadataField, FfiPermissionPolicy, FfiPermissionPolicySet, FfiPermissionUpdateType, - FfiReaction, FfiReactionAction, FfiReactionSchema, FfiSubscribeError, + FfiListConversationsOptions, FfiListMessagesOptions, FfiMessageDisappearingSettings, + FfiMessageWithReactions, FfiMetadataField, FfiPermissionPolicy, FfiPermissionPolicySet, + FfiPermissionUpdateType, FfiReaction, FfiReactionAction, FfiReactionSchema, + FfiSubscribeError, }; use ethers::utils::hex; use prost::Message; @@ -2936,6 +3005,9 @@ mod tests { let amal = new_test_client().await; let bola = new_test_client().await; + let conversation_message_disappearing_settings = + FfiMessageDisappearingSettings::new(10, 100); + let group = amal .conversations() .create_group( @@ -2947,8 +3019,9 @@ mod tests { group_description: Some("group description".to_string()), group_pinned_frame_url: Some("pinned frame".to_string()), custom_permission_policy_set: None, - message_expiration_from_ms: None, - message_expiration_ms: None, + message_disappearing_settings: Some( + conversation_message_disappearing_settings.clone(), + ), }, ) .await @@ -2960,6 +3033,21 @@ mod tests { assert_eq!(group.group_image_url_square().unwrap(), "url"); assert_eq!(group.group_description().unwrap(), "group description"); assert_eq!(group.group_pinned_frame_url().unwrap(), "pinned frame"); + assert_eq!(group.group_pinned_frame_url().unwrap(), "pinned frame"); + assert_eq!( + group + .conversation_message_disappearing_settings() + .unwrap() + .from_ns, + conversation_message_disappearing_settings.clone().from_ns + ); + assert_eq!( + group + .conversation_message_disappearing_settings() + .unwrap() + .in_ns, + conversation_message_disappearing_settings.in_ns + ); } // Looks like this test might be a separate issue @@ -4459,7 +4547,7 @@ mod tests { update_group_description_policy: FfiPermissionPolicy::Admin, update_group_image_url_square_policy: FfiPermissionPolicy::Admin, update_group_pinned_frame_url_policy: FfiPermissionPolicy::Admin, - update_message_expiration_ms_policy: FfiPermissionPolicy::Admin, + update_message_disappearing_policy: FfiPermissionPolicy::Admin, }; assert_eq!(alix_permission_policy_set, expected_permission_policy_set); @@ -4489,7 +4577,7 @@ mod tests { update_group_description_policy: FfiPermissionPolicy::Allow, update_group_image_url_square_policy: FfiPermissionPolicy::Allow, update_group_pinned_frame_url_policy: FfiPermissionPolicy::Allow, - update_message_expiration_ms_policy: FfiPermissionPolicy::Admin, + update_message_disappearing_policy: FfiPermissionPolicy::Admin, }; assert_eq!(alix_permission_policy_set, expected_permission_policy_set); } @@ -4520,7 +4608,7 @@ mod tests { update_group_description_policy: FfiPermissionPolicy::Allow, update_group_image_url_square_policy: FfiPermissionPolicy::Allow, update_group_pinned_frame_url_policy: FfiPermissionPolicy::Allow, - update_message_expiration_ms_policy: FfiPermissionPolicy::Allow, + update_message_disappearing_policy: FfiPermissionPolicy::Allow, }; assert_eq!(alix_permission_policy_set, expected_permission_policy_set); @@ -4550,7 +4638,7 @@ mod tests { update_group_description_policy: FfiPermissionPolicy::Allow, update_group_image_url_square_policy: FfiPermissionPolicy::Allow, update_group_pinned_frame_url_policy: FfiPermissionPolicy::Allow, - update_message_expiration_ms_policy: FfiPermissionPolicy::Admin, + update_message_disappearing_policy: FfiPermissionPolicy::Admin, }; assert_eq!(alix_permission_policy_set, expected_permission_policy_set); } @@ -4584,7 +4672,7 @@ mod tests { update_group_description_policy: FfiPermissionPolicy::Admin, update_group_image_url_square_policy: FfiPermissionPolicy::Admin, update_group_pinned_frame_url_policy: FfiPermissionPolicy::Admin, - update_message_expiration_ms_policy: FfiPermissionPolicy::Admin, + update_message_disappearing_policy: FfiPermissionPolicy::Admin, }; assert_eq!(alix_group_permissions, expected_permission_policy_set); @@ -4612,7 +4700,7 @@ mod tests { update_group_description_policy: FfiPermissionPolicy::Admin, update_group_image_url_square_policy: FfiPermissionPolicy::Allow, update_group_pinned_frame_url_policy: FfiPermissionPolicy::Admin, - update_message_expiration_ms_policy: FfiPermissionPolicy::Admin, + update_message_disappearing_policy: FfiPermissionPolicy::Admin, }; assert_eq!(alix_group_permissions, new_expected_permission_policy_set); @@ -4652,6 +4740,117 @@ mod tests { assert_eq!(alix_group.group_name().unwrap(), ""); } + #[tokio::test(flavor = "multi_thread", worker_threads = 5)] + async fn test_disappearing_messages_deletion() { + let alix = new_test_client().await; + let alix_provider = alix.inner_client.mls_provider().unwrap(); + + // Step 1: Create a group + let alix_group = alix + .conversations() + .create_group(vec![], FfiCreateGroupOptions::default()) + .await + .unwrap(); + + // Step 2: Send a message and sync + alix_group + .send("Msg 1 from group".as_bytes().to_vec()) + .await + .unwrap(); + alix_group.sync().await.unwrap(); + + // Step 3: Verify initial messages + let mut alix_messages = alix_group + .find_messages(FfiListMessagesOptions::default()) + .await + .unwrap(); + assert_eq!(alix_messages.len(), 1); + + // Step 4: Set disappearing settings to 5ns after the latest message + let latest_message_sent_at_ns = alix_messages.last().unwrap().sent_at_ns; + let disappearing_settings = + FfiMessageDisappearingSettings::new(latest_message_sent_at_ns, 5); + alix_group + .update_conversation_message_disappearing_settings(disappearing_settings.clone()) + .await + .unwrap(); + alix_group.sync().await.unwrap(); + + // Verify the settings were applied + let group_from_db = alix_provider + .conn_ref() + .find_group(alix_group.id()) + .unwrap(); + assert_eq!( + group_from_db + .clone() + .unwrap() + .message_disappear_from_ns + .unwrap(), + disappearing_settings.from_ns + ); + assert_eq!( + group_from_db.unwrap().message_disappear_in_ns.unwrap(), + disappearing_settings.in_ns + ); + + // Step 5: Send additional messages + for msg in &["Msg 2 from group", "Msg 3 from group", "Msg 4 from group"] { + alix_group.send(msg.as_bytes().to_vec()).await.unwrap(); + } + alix_group.sync().await.unwrap(); + + // Step 6: Verify total message count before cleanup + alix_messages = alix_group + .find_messages(FfiListMessagesOptions::default()) + .await + .unwrap(); + let msg_counts_before_cleanup = alix_messages.len(); + + // Step 7: Start cleanup worker and delete expired messages + alix.inner_client + .start_disappearing_messages_cleaner_worker(); + + // Wait for cleanup to complete + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + + // Step 8: Disable disappearing messages + alix_group + .remove_conversation_message_disappearing_settings() + .await + .unwrap(); + alix_group.sync().await.unwrap(); + + // Verify disappearing settings are disabled + let group_from_db = alix_provider + .conn_ref() + .find_group(alix_group.id()) + .unwrap(); + assert_eq!( + group_from_db + .clone() + .unwrap() + .message_disappear_from_ns + .unwrap(), + 0 + ); + assert_eq!(group_from_db.unwrap().message_disappear_in_ns.unwrap(), 0); + + // Step 9: Send another message + alix_group + .send("Msg 5 from group".as_bytes().to_vec()) + .await + .unwrap(); + + // Step 10: Verify messages after cleanup + alix_messages = alix_group + .find_messages(FfiListMessagesOptions::default()) + .await + .unwrap(); + assert_eq!(msg_counts_before_cleanup, alix_messages.len()); + // 3 messages got deleted, then two messages got added for metadataUpdate and one normal messaged added later + } + #[tokio::test(flavor = "multi_thread", worker_threads = 5)] async fn test_group_creation_custom_permissions() { let alix = new_test_client().await; @@ -4666,7 +4865,7 @@ mod tests { update_group_pinned_frame_url_policy: FfiPermissionPolicy::Admin, add_member_policy: FfiPermissionPolicy::Allow, remove_member_policy: FfiPermissionPolicy::Deny, - update_message_expiration_ms_policy: FfiPermissionPolicy::Admin, + update_message_disappearing_policy: FfiPermissionPolicy::Admin, }; let create_group_options = FfiCreateGroupOptions { @@ -4676,8 +4875,7 @@ mod tests { group_description: Some("A test group".to_string()), group_pinned_frame_url: Some("https://example.com/frame.png".to_string()), custom_permission_policy_set: Some(custom_permissions), - message_expiration_from_ms: None, - message_expiration_ms: None, + message_disappearing_settings: None, }; let alix_group = alix @@ -4717,7 +4915,7 @@ mod tests { FfiPermissionPolicy::Admin ); assert_eq!( - group_permissions_policy_set.update_message_expiration_ms_policy, + group_permissions_policy_set.update_message_disappearing_policy, FfiPermissionPolicy::Admin ); assert_eq!( @@ -4783,7 +4981,7 @@ mod tests { update_group_pinned_frame_url_policy: FfiPermissionPolicy::Admin, add_member_policy: FfiPermissionPolicy::Allow, remove_member_policy: FfiPermissionPolicy::Deny, - update_message_expiration_ms_policy: FfiPermissionPolicy::Admin, + update_message_disappearing_policy: FfiPermissionPolicy::Admin, }; let custom_permissions_valid = FfiPermissionPolicySet { @@ -4795,7 +4993,7 @@ mod tests { update_group_pinned_frame_url_policy: FfiPermissionPolicy::Admin, add_member_policy: FfiPermissionPolicy::Allow, remove_member_policy: FfiPermissionPolicy::Deny, - update_message_expiration_ms_policy: FfiPermissionPolicy::Admin, + update_message_disappearing_policy: FfiPermissionPolicy::Admin, }; let create_group_options_invalid_1 = FfiCreateGroupOptions { @@ -4805,8 +5003,7 @@ mod tests { group_description: Some("A test group".to_string()), group_pinned_frame_url: Some("https://example.com/frame.png".to_string()), custom_permission_policy_set: Some(custom_permissions_invalid_1), - message_expiration_from_ms: None, - message_expiration_ms: None, + message_disappearing_settings: None, }; let results_1 = alix @@ -4826,8 +5023,7 @@ mod tests { group_description: Some("A test group".to_string()), group_pinned_frame_url: Some("https://example.com/frame.png".to_string()), custom_permission_policy_set: Some(custom_permissions_valid.clone()), - message_expiration_from_ms: None, - message_expiration_ms: None, + message_disappearing_settings: None, }; let results_2 = alix @@ -4847,8 +5043,7 @@ mod tests { group_description: Some("A test group".to_string()), group_pinned_frame_url: Some("https://example.com/frame.png".to_string()), custom_permission_policy_set: Some(custom_permissions_valid.clone()), - message_expiration_from_ms: None, - message_expiration_ms: None, + message_disappearing_settings: None, }; let results_3 = alix @@ -4868,8 +5063,7 @@ mod tests { group_description: Some("A test group".to_string()), group_pinned_frame_url: Some("https://example.com/frame.png".to_string()), custom_permission_policy_set: Some(custom_permissions_valid), - message_expiration_from_ms: None, - message_expiration_ms: None, + message_disappearing_settings: None, }; let results_4 = alix diff --git a/bindings_node/src/conversation.rs b/bindings_node/src/conversation.rs index 98c27179b..1100727d2 100644 --- a/bindings_node/src/conversation.rs +++ b/bindings_node/src/conversation.rs @@ -28,8 +28,8 @@ use crate::{ streams::StreamCloser, ErrorWrapper, }; - use prost::Message as ProstMessage; +use xmtp_mls::groups::group_mutable_metadata::MessageDisappearingSettings as XmtpConversationMessageDisappearingSettings; use napi_derive::napi; @@ -38,6 +38,33 @@ pub struct GroupMetadata { inner: XmtpGroupMetadata, } +/// Settings for disappearing messages in a conversation. +/// +/// # Fields +/// +/// * `from_ns` - The timestamp (in nanoseconds) from when messages should be tracked for deletion. +/// * `in_ns` - The duration (in nanoseconds) after which tracked messages will be deleted. +#[napi(object)] +#[derive(Clone)] +pub struct MessageDisappearingSettings { + pub from_ns: i64, + pub in_ns: i64, +} + +#[napi] +impl MessageDisappearingSettings { + #[napi] + pub fn new(from_ns: i64, in_ns: i64) -> Self { + Self { from_ns, in_ns } + } +} + +impl From for XmtpConversationMessageDisappearingSettings { + fn from(value: MessageDisappearingSettings) -> Self { + XmtpConversationMessageDisappearingSettings::new(value.from_ns, value.in_ns) + } +} + #[napi] impl GroupMetadata { #[napi] diff --git a/bindings_node/src/conversations.rs b/bindings_node/src/conversations.rs index 7111e7fcc..70f5fd170 100644 --- a/bindings_node/src/conversations.rs +++ b/bindings_node/src/conversations.rs @@ -12,6 +12,7 @@ use xmtp_mls::storage::group::ConversationType as XmtpConversationType; use xmtp_mls::storage::group::GroupMembershipState as XmtpGroupMembershipState; use xmtp_mls::storage::group::GroupQueryArgs; +use crate::conversation::MessageDisappearingSettings; use crate::message::Message; use crate::permissions::{GroupPermissionsOptions, PermissionPolicySet}; use crate::ErrorWrapper; @@ -122,8 +123,7 @@ pub struct CreateGroupOptions { pub group_description: Option, pub group_pinned_frame_url: Option, pub custom_permission_policy_set: Option, - pub message_expiration_from_ms: Option, - pub message_expiration_ms: Option, + pub message_disappearing_settings: Option, } impl CreateGroupOptions { @@ -133,8 +133,9 @@ impl CreateGroupOptions { image_url_square: self.group_image_url_square, description: self.group_description, pinned_frame_url: self.group_pinned_frame_url, - message_expiration_from_ms: self.message_expiration_from_ms, - message_expiration_ms: self.message_expiration_ms, + message_disappearing_settings: self + .message_disappearing_settings + .map(|settings| settings.into()), } } } @@ -163,8 +164,7 @@ impl Conversations { group_description: None, group_pinned_frame_url: None, custom_permission_policy_set: None, - message_expiration_from_ms: None, - message_expiration_ms: None, + message_disappearing_settings: None, }); if let Some(GroupPermissionsOptions::CustomPolicy) = options.permissions { diff --git a/bindings_node/src/permissions.rs b/bindings_node/src/permissions.rs index 1eb667f64..e3077dd81 100644 --- a/bindings_node/src/permissions.rs +++ b/bindings_node/src/permissions.rs @@ -161,7 +161,7 @@ pub struct PermissionPolicySet { pub update_group_description_policy: PermissionPolicy, pub update_group_image_url_square_policy: PermissionPolicy, pub update_group_pinned_frame_url_policy: PermissionPolicy, - pub update_message_expiration_ms_policy: PermissionPolicy, + pub update_message_disappearing_policy: PermissionPolicy, } impl From for GroupPermissionsOptions { @@ -216,8 +216,8 @@ impl GroupPermissions { update_group_pinned_frame_url_policy: get_policy( XmtpMetadataField::GroupPinnedFrameUrl.as_str(), ), - update_message_expiration_ms_policy: get_policy( - XmtpMetadataField::MessageExpirationMillis.as_str(), + update_message_disappearing_policy: get_policy( + XmtpMetadataField::MessageDisappearInNS.as_str(), ), }) } @@ -246,8 +246,8 @@ impl TryFrom for PolicySet { policy_set.update_group_pinned_frame_url_policy.try_into()?, ); metadata_permissions_map.insert( - XmtpMetadataField::MessageExpirationMillis.to_string(), - policy_set.update_message_expiration_ms_policy.try_into()?, + XmtpMetadataField::MessageDisappearInNS.to_string(), + policy_set.update_message_disappearing_policy.try_into()?, ); Ok(PolicySet { diff --git a/bindings_node/test/Conversations.test.ts b/bindings_node/test/Conversations.test.ts index ad280521b..0c8ed0292 100644 --- a/bindings_node/test/Conversations.test.ts +++ b/bindings_node/test/Conversations.test.ts @@ -52,7 +52,7 @@ describe('Conversations', () => { updateGroupDescriptionPolicy: 0, updateGroupImageUrlSquarePolicy: 0, updateGroupPinnedFrameUrlPolicy: 0, - updateMessageExpirationMsPolicy: 2, + updateMessageDisappearingPolicy: 2, }) expect(group.addedByInboxId()).toBe(client1.inboxId()) expect((await group.findMessages()).length).toBe(1) @@ -104,7 +104,7 @@ describe('Conversations', () => { updateGroupDescriptionPolicy: 1, updateGroupImageUrlSquarePolicy: 0, updateGroupPinnedFrameUrlPolicy: 3, - updateMessageExpirationMsPolicy: 2, + updateMessageDisappearingPolicy: 2, }, }) expect(group).toBeDefined() @@ -120,7 +120,7 @@ describe('Conversations', () => { updateGroupDescriptionPolicy: 1, updateGroupImageUrlSquarePolicy: 0, updateGroupPinnedFrameUrlPolicy: 3, - updateMessageExpirationMsPolicy: 2, + updateMessageDisappearingPolicy: 2, }) }) @@ -142,7 +142,7 @@ describe('Conversations', () => { updateGroupDescriptionPolicy: 0, updateGroupImageUrlSquarePolicy: 0, updateGroupPinnedFrameUrlPolicy: 0, - updateMessageExpirationMsPolicy: 2, + updateMessageDisappearingPolicy: 2, }) await group.updatePermissionPolicy( @@ -159,7 +159,7 @@ describe('Conversations', () => { updateGroupDescriptionPolicy: 0, updateGroupImageUrlSquarePolicy: 0, updateGroupPinnedFrameUrlPolicy: 0, - updateMessageExpirationMsPolicy: 2, + updateMessageDisappearingPolicy: 2, }) await group.updatePermissionPolicy( @@ -177,7 +177,7 @@ describe('Conversations', () => { updateGroupDescriptionPolicy: 0, updateGroupImageUrlSquarePolicy: 0, updateGroupPinnedFrameUrlPolicy: 0, - updateMessageExpirationMsPolicy: 2, + updateMessageDisappearingPolicy: 2, }) }) @@ -204,7 +204,7 @@ describe('Conversations', () => { updateGroupImageUrlSquarePolicy: 0, updateGroupNamePolicy: 0, updateGroupPinnedFrameUrlPolicy: 0, - updateMessageExpirationMsPolicy: 0, + updateMessageDisappearingPolicy: 0, }) expect(group.addedByInboxId()).toBe(client1.inboxId()) expect((await group.findMessages()).length).toBe(0) @@ -342,7 +342,7 @@ describe('Conversations', () => { updateGroupDescriptionPolicy: 2, updateGroupImageUrlSquarePolicy: 2, updateGroupPinnedFrameUrlPolicy: 2, - updateMessageExpirationMsPolicy: 2, + updateMessageDisappearingPolicy: 2, }) const groupWithDescription = await client1 diff --git a/bindings_wasm/src/conversation.rs b/bindings_wasm/src/conversation.rs index 6509c5873..d8874b066 100644 --- a/bindings_wasm/src/conversation.rs +++ b/bindings_wasm/src/conversation.rs @@ -18,12 +18,29 @@ use xmtp_mls::storage::group_message::{GroupMessageKind as XmtpGroupMessageKind, use xmtp_proto::xmtp::mls::message_contents::EncodedContent as XmtpEncodedContent; use prost::Message as ProstMessage; +use xmtp_mls::groups::group_mutable_metadata::MessageDisappearingSettings as XmtpMessageDisappearingSettings; #[wasm_bindgen] pub struct GroupMetadata { inner: XmtpGroupMetadata, } +#[wasm_bindgen] +#[derive(Clone)] +pub struct MessageDisappearingSettings { + #[allow(dead_code)] + inner: XmtpMessageDisappearingSettings, +} + +impl From for XmtpMessageDisappearingSettings { + fn from(value: MessageDisappearingSettings) -> Self { + Self { + from_ns: value.inner.from_ns, + in_ns: value.inner.in_ns, + } + } +} + #[wasm_bindgen] impl GroupMetadata { #[wasm_bindgen(js_name = creatorInboxId)] diff --git a/bindings_wasm/src/conversations.rs b/bindings_wasm/src/conversations.rs index 6eeb0f426..1daa849bb 100644 --- a/bindings_wasm/src/conversations.rs +++ b/bindings_wasm/src/conversations.rs @@ -7,6 +7,7 @@ use xmtp_mls::storage::group::ConversationType as XmtpConversationType; use xmtp_mls::storage::group::GroupMembershipState as XmtpGroupMembershipState; use xmtp_mls::storage::group::GroupQueryArgs; +use crate::conversation::MessageDisappearingSettings; use crate::messages::Message; use crate::permissions::{GroupPermissionsOptions, PermissionPolicySet}; use crate::{client::RustXmtpClient, conversation::Conversation}; @@ -130,10 +131,8 @@ pub struct CreateGroupOptions { pub group_pinned_frame_url: Option, #[wasm_bindgen(js_name = customPermissionPolicySet)] pub custom_permission_policy_set: Option, - #[wasm_bindgen(js_name = messageExpirationFromMillis)] - pub message_expiration_from_ms: Option, - #[wasm_bindgen(js_name = messageExpirationMillis)] - pub message_expiration_ms: Option, + #[wasm_bindgen(js_name = messageDisappearingSettings)] + pub message_disappearing_settings: Option, } #[wasm_bindgen] @@ -147,8 +146,7 @@ impl CreateGroupOptions { group_description: Option, group_pinned_frame_url: Option, custom_permission_policy_set: Option, - message_expiration_from_ms: Option, - message_expiration_ms: Option, + message_disappearing_settings: Option, ) -> Self { Self { permissions, @@ -157,8 +155,7 @@ impl CreateGroupOptions { group_description, group_pinned_frame_url, custom_permission_policy_set, - message_expiration_from_ms, - message_expiration_ms, + message_disappearing_settings, } } } @@ -170,8 +167,9 @@ impl CreateGroupOptions { image_url_square: self.group_image_url_square, description: self.group_description, pinned_frame_url: self.group_pinned_frame_url, - message_expiration_from_ms: self.message_expiration_from_ms, - message_expiration_ms: self.message_expiration_ms, + message_disappearing_settings: self + .message_disappearing_settings + .map(|settings| settings.into()), } } } @@ -218,8 +216,7 @@ impl Conversations { group_description: None, group_pinned_frame_url: None, custom_permission_policy_set: None, - message_expiration_from_ms: None, - message_expiration_ms: None, + message_disappearing_settings: None, }); if let Some(GroupPermissionsOptions::CustomPolicy) = options.permissions { diff --git a/bindings_wasm/src/permissions.rs b/bindings_wasm/src/permissions.rs index 2baa7ca74..3256b9eeb 100644 --- a/bindings_wasm/src/permissions.rs +++ b/bindings_wasm/src/permissions.rs @@ -170,8 +170,8 @@ pub struct PermissionPolicySet { pub update_group_image_url_square_policy: PermissionPolicy, #[wasm_bindgen(js_name = updateGroupPinnedFrameUrlPolicy)] pub update_group_pinned_frame_url_policy: PermissionPolicy, - #[wasm_bindgen(js_name = updateMessageExpirationPolicy)] - pub update_message_expiration_ms_policy: PermissionPolicy, + #[wasm_bindgen(js_name = updateMessageDisappearingPolicy)] + pub update_message_disappearing_policy: PermissionPolicy, } #[wasm_bindgen] @@ -187,7 +187,7 @@ impl PermissionPolicySet { update_group_description_policy: PermissionPolicy, update_group_image_url_square_policy: PermissionPolicy, update_group_pinned_frame_url_policy: PermissionPolicy, - update_message_expiration_ms_policy: PermissionPolicy, + update_message_disappearing_policy: PermissionPolicy, ) -> Self { Self { add_member_policy, @@ -198,7 +198,7 @@ impl PermissionPolicySet { update_group_description_policy, update_group_image_url_square_policy, update_group_pinned_frame_url_policy, - update_message_expiration_ms_policy, + update_message_disappearing_policy, } } } @@ -257,8 +257,8 @@ impl GroupPermissions { update_group_pinned_frame_url_policy: get_policy( XmtpMetadataField::GroupPinnedFrameUrl.as_str(), ), - update_message_expiration_ms_policy: get_policy( - XmtpMetadataField::MessageExpirationMillis.as_str(), + update_message_disappearing_policy: get_policy( + XmtpMetadataField::MessageDisappearInNS.as_str(), ), }) } @@ -285,8 +285,8 @@ impl TryFrom for PolicySet { policy_set.update_group_pinned_frame_url_policy.try_into()?, ); metadata_permissions_map.insert( - XmtpMetadataField::MessageExpirationMillis.to_string(), - policy_set.update_message_expiration_ms_policy.try_into()?, + XmtpMetadataField::MessageDisappearInNS.to_string(), + policy_set.update_message_disappearing_policy.try_into()?, ); Ok(PolicySet { @@ -306,6 +306,8 @@ pub enum MetadataField { Description, ImageUrlSquare, PinnedFrameUrl, + MessageExpirationFromMS, + MessageExpirationMS, } impl From<&MetadataField> for XmtpMetadataField { @@ -315,6 +317,8 @@ impl From<&MetadataField> for XmtpMetadataField { MetadataField::Description => XmtpMetadataField::Description, MetadataField::ImageUrlSquare => XmtpMetadataField::GroupImageUrlSquare, MetadataField::PinnedFrameUrl => XmtpMetadataField::GroupPinnedFrameUrl, + MetadataField::MessageExpirationFromMS => XmtpMetadataField::MessageDisappearFromNS, + MetadataField::MessageExpirationMS => XmtpMetadataField::MessageDisappearInNS, } } } diff --git a/xmtp_mls/migrations/2025-01-16-143131_add_message_expiration_to_groups/down.sql b/xmtp_mls/migrations/2025-01-16-143131_add_message_expiration_to_groups/down.sql new file mode 100644 index 000000000..c5f2a3b25 --- /dev/null +++ b/xmtp_mls/migrations/2025-01-16-143131_add_message_expiration_to_groups/down.sql @@ -0,0 +1,2 @@ +ALTER TABLE GROUPS DROP COLUMN message_disappear_from_ns; +ALTER TABLE GROUPS DROP COLUMN message_disappear_in_ns; \ No newline at end of file diff --git a/xmtp_mls/migrations/2025-01-16-143131_add_message_expiration_to_groups/up.sql b/xmtp_mls/migrations/2025-01-16-143131_add_message_expiration_to_groups/up.sql new file mode 100644 index 000000000..9a7c21984 --- /dev/null +++ b/xmtp_mls/migrations/2025-01-16-143131_add_message_expiration_to_groups/up.sql @@ -0,0 +1,2 @@ +ALTER TABLE GROUPS ADD COLUMN message_disappear_from_ns BIGINT; +ALTER TABLE GROUPS ADD COLUMN message_disappear_in_ns BIGINT; diff --git a/xmtp_mls/src/groups/device_sync.rs b/xmtp_mls/src/groups/device_sync.rs index 963af7725..586265469 100644 --- a/xmtp_mls/src/groups/device_sync.rs +++ b/xmtp_mls/src/groups/device_sync.rs @@ -1,4 +1,5 @@ use super::{GroupError, MlsGroup}; +use crate::groups::disappearing_messages::DisappearingMessagesCleanerWorker; #[cfg(any(test, feature = "test-utils"))] pub use crate::utils::WorkerHandle; use crate::{ @@ -138,6 +139,19 @@ where self.set_sync_worker_handle(worker.handle.clone()); worker.spawn_worker(); } + + #[instrument(level = "trace", skip_all)] + pub fn start_disappearing_messages_cleaner_worker(&self) { + let client = self.clone(); + tracing::trace!( + inbox_id = client.inbox_id(), + installation_id = hex::encode(client.installation_public_key()), + "starting expired messages cleaner worker" + ); + + let worker = DisappearingMessagesCleanerWorker::new(client); + worker.spawn_worker(); + } } pub struct SyncWorker { diff --git a/xmtp_mls/src/groups/disappearing_messages.rs b/xmtp_mls/src/groups/disappearing_messages.rs new file mode 100644 index 000000000..2a7110bb8 --- /dev/null +++ b/xmtp_mls/src/groups/disappearing_messages.rs @@ -0,0 +1,88 @@ +use crate::client::ClientError; +use crate::storage::StorageError; +use crate::Client; +use std::time::Duration; +use thiserror::Error; +use tokio::sync::OnceCell; +use xmtp_id::scw_verifier::SmartContractSignatureVerifier; +use xmtp_proto::api_client::trait_impls::XmtpApi; + +/// Restart the DisappearingMessagesCleanerWorker every 1 sec to delete the expired messages +pub const WORKER_RESTART_DELAY: Duration = Duration::from_secs(1); + +#[derive(Debug, Error)] +pub enum DisappearingMessagesCleanerError { + #[error("storage error: {0}")] + Storage(#[from] StorageError), + #[error("client error: {0}")] + Client(#[from] ClientError), +} + +pub struct DisappearingMessagesCleanerWorker { + client: Client, + #[allow(dead_code)] + init: OnceCell<()>, +} +impl DisappearingMessagesCleanerWorker +where + ApiClient: XmtpApi + Send + Sync + 'static, + V: SmartContractSignatureVerifier + Send + Sync + 'static, +{ + pub fn new(client: Client) -> Self { + Self { + client, + init: OnceCell::new(), + } + } + pub(crate) fn spawn_worker(mut self) { + crate::spawn(None, async move { + let inbox_id = self.client.inbox_id().to_string(); + let installation_id = hex::encode(self.client.installation_public_key()); + while let Err(err) = self.run().await { + tracing::info!("Running worker.."); + match err { + DisappearingMessagesCleanerError::Client(ClientError::Storage( + StorageError::PoolNeedsConnection, + )) => { + tracing::warn!( + inbox_id, + installation_id, + "Pool disconnected. task will restart on reconnect" + ); + break; + } + _ => { + tracing::error!(inbox_id, installation_id, "sync worker error {err}"); + xmtp_common::time::sleep(WORKER_RESTART_DELAY).await; + } + } + } + }); + } +} + +impl DisappearingMessagesCleanerWorker +where + ApiClient: XmtpApi + Send + Sync + 'static, + V: SmartContractSignatureVerifier + Send + Sync + 'static, +{ + /// Iterate on the list of groups and delete expired messages + async fn delete_expired_messages(&mut self) -> Result<(), DisappearingMessagesCleanerError> { + let provider = self.client.mls_provider()?; + match provider.conn_ref().delete_expired_messages() { + Ok(deleted_count) => { + tracing::info!("Successfully deleted {} expired messages", deleted_count); + } + Err(e) => { + tracing::error!("Failed to delete expired messages, error: {:?}", e); + } + } + Ok(()) + } + async fn run(&mut self) -> Result<(), DisappearingMessagesCleanerError> { + if let Err(err) = self.delete_expired_messages().await { + tracing::error!("Error during deletion of expired messages: {:?}", err); + } + Ok(()) + } +} diff --git a/xmtp_mls/src/groups/group_mutable_metadata.rs b/xmtp_mls/src/groups/group_mutable_metadata.rs index e1285a2aa..d9f51d396 100644 --- a/xmtp_mls/src/groups/group_mutable_metadata.rs +++ b/xmtp_mls/src/groups/group_mutable_metadata.rs @@ -6,18 +6,16 @@ use openmls::{ }; use prost::Message; use thiserror::Error; - use xmtp_proto::xmtp::mls::message_contents::{ GroupMutableMetadataV1 as GroupMutableMetadataProto, Inboxes as InboxesProto, }; +use super::GroupMetadataOptions; use crate::configuration::{ DEFAULT_GROUP_DESCRIPTION, DEFAULT_GROUP_IMAGE_URL_SQUARE, DEFAULT_GROUP_NAME, DEFAULT_GROUP_PINNED_FRAME_URL, MUTABLE_METADATA_EXTENSION_ID, }; -use super::GroupMetadataOptions; - /// Errors that can occur when working with GroupMutableMetadata. #[derive(Debug, Error)] pub enum GroupMutableMetadataError { @@ -47,8 +45,8 @@ pub enum MetadataField { Description, GroupImageUrlSquare, GroupPinnedFrameUrl, - MessageExpirationFromMillis, - MessageExpirationMillis, + MessageDisappearFromNS, + MessageDisappearInNS, } impl MetadataField { @@ -59,8 +57,8 @@ impl MetadataField { MetadataField::Description => "description", MetadataField::GroupImageUrlSquare => "group_image_url_square", MetadataField::GroupPinnedFrameUrl => "group_pinned_frame_url", - MetadataField::MessageExpirationFromMillis => "message_expiration_from_ms", - MetadataField::MessageExpirationMillis => "message_expiration_ms", + MetadataField::MessageDisappearFromNS => "message_disappear_from_ns", + MetadataField::MessageDisappearInNS => "message_disappear_in_ns", } } } @@ -71,6 +69,24 @@ impl fmt::Display for MetadataField { } } +/// Settings for disappearing messages in a conversation. +/// +/// # Fields +/// +/// * `from_ns` - The timestamp (in nanoseconds) from when messages should be tracked for deletion. +/// * `in_ns` - The duration (in nanoseconds) after which tracked messages will be deleted. +#[derive(Default, Debug, Clone, PartialEq)] +pub struct MessageDisappearingSettings { + pub from_ns: i64, + pub in_ns: i64, +} + +impl MessageDisappearingSettings { + pub fn new(from_ns: i64, in_ns: i64) -> Self { + Self { from_ns, in_ns } + } +} + /// Represents the mutable metadata for a group. /// /// This struct is stored as an MLS Unknown Group Context Extension. @@ -126,16 +142,14 @@ impl GroupMutableMetadata { .unwrap_or_else(|| DEFAULT_GROUP_PINNED_FRAME_URL.to_string()), ); - if let Some(message_expiration_from_ms) = opts.message_expiration_from_ms { + if let Some(message_disappearing_settings) = opts.message_disappearing_settings { attributes.insert( - MetadataField::MessageExpirationFromMillis.to_string(), - message_expiration_from_ms.to_string(), + MetadataField::MessageDisappearFromNS.to_string(), + message_disappearing_settings.from_ns.to_string(), ); - } - if let Some(message_expiration_ms) = opts.message_expiration_ms { attributes.insert( - MetadataField::MessageExpirationMillis.to_string(), - message_expiration_ms.to_string(), + MetadataField::MessageDisappearInNS.to_string(), + message_disappearing_settings.in_ns.to_string(), ); } @@ -186,7 +200,8 @@ impl GroupMutableMetadata { MetadataField::Description, MetadataField::GroupImageUrlSquare, MetadataField::GroupPinnedFrameUrl, - MetadataField::MessageExpirationMillis, + MetadataField::MessageDisappearFromNS, + MetadataField::MessageDisappearInNS, ] } diff --git a/xmtp_mls/src/groups/group_permissions.rs b/xmtp_mls/src/groups/group_permissions.rs index f15f4c6b6..1bb092db0 100644 --- a/xmtp_mls/src/groups/group_permissions.rs +++ b/xmtp_mls/src/groups/group_permissions.rs @@ -31,7 +31,7 @@ use super::{ }; use crate::configuration::{GROUP_PERMISSIONS_EXTENSION_ID, SUPER_ADMIN_METADATA_PREFIX}; use crate::groups::group_mutable_metadata::MetadataField; -use crate::groups::group_mutable_metadata::MetadataField::MessageExpirationMillis; +use crate::groups::group_mutable_metadata::MetadataField::MessageDisappearInNS; /// Errors that can occur when working with GroupMutablePermissions. #[derive(Debug, Error)] @@ -228,7 +228,7 @@ impl MetadataPolicies { pub fn default_map(policies: MetadataPolicies) -> HashMap { let mut map: HashMap = HashMap::new(); for field in GroupMutableMetadata::supported_fields() { - if field == MessageExpirationMillis { + if field == MessageDisappearInNS { map.insert(field.to_string(), MetadataPolicies::allow_if_actor_admin()); } else { map.insert(field.to_string(), policies.clone()); @@ -1158,7 +1158,7 @@ pub fn is_policy_default(policy: &PolicySet) -> Result { name: field_name.to_string(), }, )?; - if field_name == MessageExpirationMillis.as_str() { + if field_name == MessageDisappearInNS.as_str() { metadata_policies_equal = metadata_policies_equal && metadata_policy.eq(&MetadataPolicies::allow_if_actor_admin()); } else { @@ -1208,7 +1208,7 @@ pub(crate) fn default_policy() -> PolicySet { metadata_policies_map.insert(field.to_string(), MetadataPolicies::allow()); } metadata_policies_map.insert( - MessageExpirationMillis.to_string(), + MessageDisappearInNS.to_string(), MetadataPolicies::allow_if_actor_admin(), ); @@ -1231,7 +1231,7 @@ pub(crate) fn policy_admin_only() -> PolicySet { metadata_policies_map.insert(field.to_string(), MetadataPolicies::allow_if_actor_admin()); } metadata_policies_map.insert( - MetadataField::MessageExpirationMillis.to_string(), + MetadataField::MessageDisappearInNS.to_string(), MetadataPolicies::allow_if_actor_admin(), ); diff --git a/xmtp_mls/src/groups/intents.rs b/xmtp_mls/src/groups/intents.rs index 369cbad10..00e28fd86 100644 --- a/xmtp_mls/src/groups/intents.rs +++ b/xmtp_mls/src/groups/intents.rs @@ -25,6 +25,13 @@ use xmtp_proto::xmtp::mls::database::{ UpdateAdminListsData, UpdateGroupMembershipData, UpdateMetadataData, UpdatePermissionData, }; +use super::{ + group_membership::GroupMembership, + group_mutable_metadata::MetadataField, + group_permissions::{MembershipPolicies, MetadataPolicies, PermissionsPolicies}, + scoped_client::ScopedGroupClient, + GroupError, MlsGroup, +}; use crate::{ configuration::GROUP_KEY_ROTATION_INTERVAL_NS, storage::{ @@ -37,14 +44,6 @@ use crate::{ XmtpOpenMlsProvider, }; -use super::{ - group_membership::GroupMembership, - group_mutable_metadata::MetadataField, - group_permissions::{MembershipPolicies, MetadataPolicies, PermissionsPolicies}, - scoped_client::ScopedGroupClient, - GroupError, MlsGroup, -}; - #[derive(Debug, Error)] pub enum IntentError { #[error("decode error: {0}")] @@ -241,6 +240,19 @@ impl UpdateMetadataIntentData { field_value: pinned_frame_url, } } + + pub fn new_update_conversation_message_disappear_from_ns(from_ns: i64) -> Self { + Self { + field_name: MetadataField::MessageDisappearFromNS.to_string(), + field_value: from_ns.to_string(), + } + } + pub fn new_update_conversation_message_disappear_in_ns(in_ns: i64) -> Self { + Self { + field_name: MetadataField::MessageDisappearInNS.to_string(), + field_value: in_ns.to_string(), + } + } } impl From for Vec { diff --git a/xmtp_mls/src/groups/mls_sync.rs b/xmtp_mls/src/groups/mls_sync.rs index 9c22bc492..b9bdf2f5f 100644 --- a/xmtp_mls/src/groups/mls_sync.rs +++ b/xmtp_mls/src/groups/mls_sync.rs @@ -8,6 +8,8 @@ use super::{ validated_commit::{extract_group_membership, CommitValidationError}, GroupError, HmacKey, MlsGroup, ScopedGroupClient, }; +use crate::groups::group_mutable_metadata::MetadataField; +use crate::storage::group_intent::IntentKind::MetadataUpdate; use crate::{ configuration::{ GRPC_DATA_LIMIT, HMAC_SALT, MAX_GROUP_SIZE, MAX_INTENT_PUBLISH_ATTEMPTS, MAX_PAST_EPOCHS, @@ -699,6 +701,7 @@ where "[{}] staged commit is valid, will attempt to merge", self.context().inbox_id() ); + mls_group.merge_staged_commit(provider, sc)?; self.save_transcript_message( provider.conn_ref(), @@ -760,13 +763,14 @@ where intent_id ); match self - .process_own_message(intent, provider, message.into(), envelope) + .process_own_message(intent.clone(), provider, message.into(), envelope) .await? { IntentState::ToPublish => { Ok(provider.conn_ref().set_group_intent_to_publish(intent_id)?) } IntentState::Committed => { + self.handle_metadata_update(provider, &intent)?; Ok(provider.conn_ref().set_group_intent_committed(intent_id)?) } IntentState::Published => { @@ -797,6 +801,35 @@ where } } + /// In case of metadataUpdate will extract the updated fields and store them to the db + fn handle_metadata_update( + &self, + provider: &XmtpOpenMlsProvider, + intent: &StoredGroupIntent, + ) -> Result<(), StorageError> { + if intent.kind == MetadataUpdate { + let data = UpdateMetadataIntentData::try_from(intent.data.clone())?; + + match data.field_name.as_str() { + field_name if field_name == MetadataField::MessageDisappearFromNS.as_str() => { + provider.conn_ref().update_message_disappearing_from_ns( + self.group_id.clone(), + data.field_value.parse::().ok(), + )? + } + field_name if field_name == MetadataField::MessageDisappearInNS.as_str() => { + provider.conn_ref().update_message_disappearing_in_ns( + self.group_id.clone(), + data.field_value.parse::().ok(), + )? + } + _ => {} // handle other metadata updates + } + } + + Ok(()) + } + #[tracing::instrument(level = "trace", skip_all)] async fn consume_message( &self, @@ -976,7 +1009,6 @@ where authority_id: content_type.authority_id.to_string(), reference_id: None, }; - msg.store_or_ignore(conn)?; Ok(Some(msg)) } diff --git a/xmtp_mls/src/groups/mod.rs b/xmtp_mls/src/groups/mod.rs index 7ec7ce91f..a75391123 100644 --- a/xmtp_mls/src/groups/mod.rs +++ b/xmtp_mls/src/groups/mod.rs @@ -7,6 +7,7 @@ pub mod intents; pub mod members; pub mod scoped_client; +mod disappearing_messages; pub(super) mod mls_sync; pub(super) mod subscriptions; pub mod validated_commit; @@ -107,6 +108,7 @@ use std::{collections::HashSet, sync::Arc}; use xmtp_cryptography::signature::{sanitize_evm_addresses, AddressValidationError}; use xmtp_id::{InboxId, InboxIdRef}; +use crate::groups::group_mutable_metadata::MessageDisappearingSettings; use xmtp_common::retry::RetryableError; #[derive(Debug, Error)] @@ -293,8 +295,7 @@ pub struct GroupMetadataOptions { pub image_url_square: Option, pub description: Option, pub pinned_frame_url: Option, - pub message_expiration_from_ms: Option, - pub message_expiration_ms: Option, + pub message_disappearing_settings: Option, } impl Clone for MlsGroup { @@ -1138,6 +1139,80 @@ impl MlsGroup { } } + pub async fn update_conversation_message_disappearing_settings( + &self, + settings: MessageDisappearingSettings, + ) -> Result<(), GroupError> { + let provider = self.client.mls_provider()?; + + self.update_conversation_message_disappear_from_ns(&provider, settings.from_ns) + .await?; + self.update_conversation_message_disappear_in_ns(&provider, settings.in_ns) + .await + } + + pub async fn remove_conversation_message_disappearing_settings( + &self, + ) -> Result<(), GroupError> { + self.update_conversation_message_disappearing_settings( + MessageDisappearingSettings::default(), + ) + .await + } + + async fn update_conversation_message_disappear_from_ns( + &self, + provider: &XmtpOpenMlsProvider, + expire_from_ms: i64, + ) -> Result<(), GroupError> { + let intent_data: Vec = + UpdateMetadataIntentData::new_update_conversation_message_disappear_from_ns( + expire_from_ms, + ) + .into(); + let intent = self.queue_intent(provider, IntentKind::MetadataUpdate, intent_data)?; + self.sync_until_intent_resolved(provider, intent.id).await + } + + async fn update_conversation_message_disappear_in_ns( + &self, + provider: &XmtpOpenMlsProvider, + expire_in_ms: i64, + ) -> Result<(), GroupError> { + let intent_data: Vec = + UpdateMetadataIntentData::new_update_conversation_message_disappear_in_ns(expire_in_ms) + .into(); + let intent = self.queue_intent(provider, IntentKind::MetadataUpdate, intent_data)?; + self.sync_until_intent_resolved(provider, intent.id).await + } + + pub fn conversation_message_disappearing_settings( + &self, + provider: &XmtpOpenMlsProvider, + ) -> Result { + let mutable_metadata = self.mutable_metadata(provider)?; + let disappear_from_ns = mutable_metadata + .attributes + .get(&MetadataField::MessageDisappearFromNS.to_string()); + let disappear_in_ns = mutable_metadata + .attributes + .get(&MetadataField::MessageDisappearInNS.to_string()); + + if let (Some(Ok(message_disappear_from_ns)), Some(Ok(message_disappear_in_ns))) = ( + disappear_from_ns.map(|s| s.parse::()), + disappear_in_ns.map(|s| s.parse::()), + ) { + Ok(MessageDisappearingSettings::new( + message_disappear_from_ns, + message_disappear_in_ns, + )) + } else { + Err(GroupError::GroupMetadata( + GroupMetadataError::MissingExtension, + )) + } + } + /// Retrieves the admin list of the group from the group's mutable metadata extension. pub fn admin_list(&self, provider: &XmtpOpenMlsProvider) -> Result, GroupError> { let mutable_metadata = self.mutable_metadata(provider)?; @@ -1791,6 +1866,8 @@ pub(crate) mod tests { use xmtp_proto::xmtp::mls::api::v1::group_message::Version; use xmtp_proto::xmtp::mls::message_contents::EncodedContent; + use super::{group_permissions::PolicySet, MlsGroup}; + use crate::groups::group_mutable_metadata::MessageDisappearingSettings; use crate::storage::group::StoredGroup; use crate::storage::schema::groups; use crate::{ @@ -1817,8 +1894,6 @@ pub(crate) mod tests { InboxOwner, StreamHandle as _, }; - use super::{group_permissions::PolicySet, MlsGroup}; - async fn receive_group_invite(client: &FullXmtpClient) -> MlsGroup { client .sync_welcomes(&client.mls_provider().unwrap()) @@ -2602,6 +2677,9 @@ pub(crate) mod tests { #[wasm_bindgen_test(unsupported = tokio::test(flavor = "current_thread"))] async fn test_group_options() { + let expected_group_message_disappearing_settings = + MessageDisappearingSettings::new(100, 200); + let amal = ClientBuilder::new_test_client(&generate_local_wallet()).await; let amal_group = amal @@ -2612,8 +2690,9 @@ pub(crate) mod tests { image_url_square: Some("url".to_string()), description: Some("group description".to_string()), pinned_frame_url: Some("pinned frame".to_string()), - message_expiration_from_ms: None, - message_expiration_ms: None, + message_disappearing_settings: Some( + expected_group_message_disappearing_settings.clone(), + ), }, ) .unwrap(); @@ -2637,11 +2716,30 @@ pub(crate) mod tests { .attributes .get(&MetadataField::GroupPinnedFrameUrl.to_string()) .unwrap(); - + let amal_group_message_disappear_from_ns = binding + .attributes + .get(&MetadataField::MessageDisappearFromNS.to_string()) + .unwrap(); + let amal_group_message_disappear_in_ns = binding + .attributes + .get(&MetadataField::MessageDisappearInNS.to_string()) + .unwrap(); assert_eq!(amal_group_name, "Group Name"); assert_eq!(amal_group_image_url, "url"); assert_eq!(amal_group_description, "group description"); assert_eq!(amal_group_pinned_frame_url, "pinned frame"); + assert_eq!( + amal_group_message_disappear_from_ns.clone(), + expected_group_message_disappearing_settings + .from_ns + .to_string() + ); + assert_eq!( + amal_group_message_disappear_in_ns.clone(), + expected_group_message_disappearing_settings + .in_ns + .to_string() + ); } #[wasm_bindgen_test(unsupported = tokio::test(flavor = "current_thread"))] @@ -2904,6 +3002,68 @@ pub(crate) mod tests { assert_eq!(amal_group_pinned_frame_url, "a frame url"); } + #[wasm_bindgen_test(unsupported = tokio::test(flavor = "current_thread"))] + async fn test_update_group_message_expiration_settings() { + let amal = ClientBuilder::new_test_client(&generate_local_wallet()).await; + + // Create a group and verify it has the default group name + let policy_set = Some(PreconfiguredPolicies::AdminsOnly.to_policy_set()); + let amal_group = amal + .create_group(policy_set, GroupMetadataOptions::default()) + .unwrap(); + amal_group.sync().await.unwrap(); + + let group_mutable_metadata = amal_group + .mutable_metadata(&amal_group.mls_provider().unwrap()) + .unwrap(); + assert_eq!( + group_mutable_metadata + .attributes + .get(&MetadataField::MessageDisappearInNS.to_string()), + None + ); + assert_eq!( + group_mutable_metadata + .attributes + .get(&MetadataField::MessageDisappearFromNS.to_string()), + None + ); + + // Update group name + let expected_group_message_expiration_settings = MessageDisappearingSettings::new(100, 200); + + amal_group + .update_conversation_message_disappearing_settings( + expected_group_message_expiration_settings.clone(), + ) + .await + .unwrap(); + + // Verify amal group sees update + amal_group.sync().await.unwrap(); + let binding = amal_group + .mutable_metadata(&amal_group.mls_provider().unwrap()) + .expect("msg"); + let amal_message_expiration_from_ms: &String = binding + .attributes + .get(&MetadataField::MessageDisappearFromNS.to_string()) + .unwrap(); + let amal_message_disappear_in_ns: &String = binding + .attributes + .get(&MetadataField::MessageDisappearInNS.to_string()) + .unwrap(); + assert_eq!( + amal_message_expiration_from_ms.clone(), + expected_group_message_expiration_settings + .from_ns + .to_string() + ); + assert_eq!( + amal_message_disappear_in_ns.clone(), + expected_group_message_expiration_settings.in_ns.to_string() + ); + } + #[wasm_bindgen_test(unsupported = tokio::test(flavor = "current_thread"))] async fn test_group_mutable_data_group_permissions() { let amal = ClientBuilder::new_test_client(&generate_local_wallet()).await; diff --git a/xmtp_mls/src/storage/encrypted_store/group.rs b/xmtp_mls/src/storage/encrypted_store/group.rs index 6c0a9bdfd..a6d5073ec 100644 --- a/xmtp_mls/src/storage/encrypted_store/group.rs +++ b/xmtp_mls/src/storage/encrypted_store/group.rs @@ -51,6 +51,10 @@ pub struct StoredGroup { pub dm_id: Option, /// Timestamp of when the last message was sent for this group (updated automatically in a trigger) pub last_message_ns: Option, + /// The Time in NS when the messages should be deleted + pub message_disappear_from_ns: Option, + /// How long a message in the group can live in NS + pub message_disappear_in_ns: Option, } impl_fetch!(StoredGroup, groups, Vec); @@ -78,6 +82,8 @@ impl StoredGroup { rotated_at_ns: 0, dm_id: dm_members.map(String::from), last_message_ns: Some(now_ns()), + message_disappear_from_ns: None, + message_disappear_in_ns: None, } } @@ -103,6 +109,8 @@ impl StoredGroup { rotated_at_ns: 0, dm_id: dm_members.map(String::from), last_message_ns: Some(now_ns()), + message_disappear_from_ns: None, + message_disappear_in_ns: None, } } @@ -124,6 +132,8 @@ impl StoredGroup { rotated_at_ns: 0, dm_id: None, last_message_ns: Some(now_ns()), + message_disappear_from_ns: None, + message_disappear_in_ns: None, } } } @@ -458,6 +468,34 @@ impl DbConnection { Ok(()) } + pub fn update_message_disappearing_from_ns( + &self, + group_id: Vec, + from_ns: Option, + ) -> Result<(), StorageError> { + self.raw_query(|conn| { + diesel::update(dsl::groups.find(&group_id)) + .set(dsl::message_disappear_from_ns.eq(from_ns)) + .execute(conn) + })?; + + Ok(()) + } + + pub fn update_message_disappearing_in_ns( + &self, + group_id: Vec, + in_ns: Option, + ) -> Result<(), StorageError> { + self.raw_query(|conn| { + diesel::update(dsl::groups.find(&group_id)) + .set(dsl::message_disappear_in_ns.eq(in_ns)) + .execute(conn) + })?; + + Ok(()) + } + pub fn insert_or_replace_group(&self, group: StoredGroup) -> Result { tracing::info!("Trying to insert group"); let stored_group = self.raw_query(|conn| { diff --git a/xmtp_mls/src/storage/encrypted_store/group_message.rs b/xmtp_mls/src/storage/encrypted_store/group_message.rs index 20335e88c..1dcda5674 100644 --- a/xmtp_mls/src/storage/encrypted_store/group_message.rs +++ b/xmtp_mls/src/storage/encrypted_store/group_message.rs @@ -1,5 +1,5 @@ -use std::collections::HashMap; - +use diesel::dsl::sql; +use diesel::sql_types::BigInt; use diesel::{ backend::Backend, deserialize::{self, FromSql, FromSqlRow}, @@ -8,8 +8,10 @@ use diesel::{ serialize::{self, IsNull, Output, ToSql}, sql_types::Integer, }; - use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::ops::Sub; +use xmtp_common::time::now_ns; use xmtp_content_types::{ attachment, group_updated, membership_change, reaction, read_receipt, remote_attachment, reply, text, transaction_reference, @@ -426,6 +428,52 @@ impl DbConnection { .execute(conn) })?) } + + pub fn delete_expired_messages(&self) -> Result { + Ok(self.raw_query(|conn| { + use diesel::prelude::*; + let disappear_from_ns = groups_dsl::message_disappear_from_ns + .assume_not_null() + .into_sql::(); + let disappear_duration_ns = groups_dsl::message_disappear_in_ns + .assume_not_null() + .into_sql::(); + let now = now_ns(); + + let expire_messages = dsl::group_messages + .left_join( + groups_dsl::groups.on(sql::( + "lower(hex(group_messages.group_id))", + ) + .eq(sql::("lower(hex(groups.id))"))), + ) + .filter(dsl::delivery_status.eq(DeliveryStatus::Published)) + .filter(dsl::kind.eq(GroupMessageKind::Application)) + .filter( + groups_dsl::message_disappear_from_ns + .is_not_null() + .and(groups_dsl::message_disappear_in_ns.is_not_null()), + ) + .filter( + disappear_from_ns + .gt(0) // to make sure the settings are correct + .and( + dsl::sent_at_ns.gt(disappear_from_ns).and( + dsl::sent_at_ns.lt(sql::("") + .bind::(now) + .assume_not_null() + .sub(disappear_duration_ns)), + ), + ), + ) + .select(dsl::id); + let expired_message_ids = expire_messages.load::>(conn)?; + + // Then delete the rows by their IDs + diesel::delete(dsl::group_messages.filter(dsl::id.eq_any(expired_message_ids))) + .execute(conn) + })?) + } } #[cfg(test)] @@ -455,7 +503,7 @@ pub(crate) mod tests { sender_installation_id: rand_vec::<24>(), sender_inbox_id: "0x0".to_string(), kind: kind.unwrap_or(GroupMessageKind::Application), - delivery_status: DeliveryStatus::Unpublished, + delivery_status: DeliveryStatus::Published, content_type: content_type.unwrap_or(ContentType::Unknown), version_major: 0, version_minor: 0, @@ -589,6 +637,49 @@ pub(crate) mod tests { .await } + #[wasm_bindgen_test(unsupported = tokio::test)] + async fn it_deletes_middle_message_by_expiration_time() { + with_connection(|conn| { + let mut group = generate_group(None); + + let disappear_from_ns = Some(1_000_500_000); // After Message 1 + let disappear_in_ns = Some(500_000); // Before Message 3 + group.message_disappear_from_ns = disappear_from_ns; + group.message_disappear_in_ns = disappear_in_ns; + + group.store(conn).unwrap(); + + let messages = vec![ + generate_message(None, Some(&group.id), Some(1_000_000_000), None), + generate_message(None, Some(&group.id), Some(1_001_000_000), None), + generate_message(None, Some(&group.id), Some(2_000_000_000_000_000_000), None), + ]; + assert_ok!(messages.store(conn)); + + let result = conn.delete_expired_messages().unwrap(); + assert_eq!(result, 1); // Ensure exactly 1 message is deleted + + let remaining_messages = conn + .get_group_messages( + &group.id, + &MsgQueryArgs { + ..Default::default() + }, + ) + .unwrap(); + + // Verify the count and content of the remaining messages + assert_eq!(remaining_messages.len(), 2); + assert!(remaining_messages + .iter() + .any(|msg| msg.sent_at_ns == 1_000_000_000)); // Message 1 + assert!(remaining_messages + .iter() + .any(|msg| msg.sent_at_ns == 2_000_000_000_000_000_000)); // Message 3 + }) + .await + } + #[wasm_bindgen_test(unsupported = tokio::test)] async fn it_gets_messages_by_kind() { with_connection(|conn| { diff --git a/xmtp_mls/src/storage/encrypted_store/schema_gen.rs b/xmtp_mls/src/storage/encrypted_store/schema_gen.rs index f687f4fb9..2022d2441 100644 --- a/xmtp_mls/src/storage/encrypted_store/schema_gen.rs +++ b/xmtp_mls/src/storage/encrypted_store/schema_gen.rs @@ -63,6 +63,8 @@ diesel::table! { conversation_type -> Integer, dm_id -> Nullable, last_message_ns -> Nullable, + message_disappear_from_ns -> Nullable, + message_disappear_in_ns -> Nullable, } }