Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(groups): add a worker to delete expired messages #1503

Merged
merged 31 commits into from
Jan 27, 2025
Merged
Changes from 1 commit
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
843faee
wip
mchenani Jan 14, 2025
8bd7422
Merge branch 'main' into mc/disappearning-messages-worker
mchenani Jan 14, 2025
9e84a95
Merge branch 'main' into mc/disappearning-messages-worker
mchenani Jan 14, 2025
5a994fd
Merge branch 'main' into mc/disappearning-messages-worker
mchenani Jan 16, 2025
b9be121
add group message expiration setting to db
mchenani Jan 16, 2025
255c6f8
Merge branch 'main' into mc/disappearning-messages-worker
mchenani Jan 16, 2025
22e3eff
Merge branch 'main' into mc/disappearning-messages-worker
mchenani Jan 17, 2025
ecd3b92
Merge branch 'main' into mc/disappearning-messages-worker
mchenani Jan 17, 2025
8e7835b
Merge branch 'main' into mc/disappearning-messages-worker
mchenani Jan 20, 2025
1dcb898
rename to ConversationMessageDisappearingSettings
mchenani Jan 20, 2025
d62cac0
Merge branch 'main' into mc/disappearning-messages-worker
mchenani Jan 21, 2025
354b7e6
wip
mchenani Jan 21, 2025
1b11016
extract disappearing messages worker to a separate file
mchenani Jan 21, 2025
508c792
Merge branch 'main' into mc/disappearning-messages-worker
mchenani Jan 21, 2025
8951e87
store metadatas into db
mchenani Jan 23, 2025
de91ebf
remove comments
mchenani Jan 23, 2025
62e2aae
wip
mchenani Jan 23, 2025
c3a3f91
add test
mchenani Jan 24, 2025
6f54733
fix tests
mchenani Jan 24, 2025
0825f6b
fix tests
mchenani Jan 24, 2025
3911e8f
address comments
mchenani Jan 24, 2025
736fc06
Merge branch 'main' into mc/disappearning-messages-worker
mchenani Jan 24, 2025
6bf97ed
fix clippy
mchenani Jan 27, 2025
bf2b894
fix clippy
mchenani Jan 27, 2025
e6136cb
fix clippy
mchenani Jan 27, 2025
c783def
Merge branch 'main' into mc/disappearning-messages-worker
mchenani Jan 27, 2025
1a527a9
fix napi and address comments
mchenani Jan 27, 2025
fbdd11f
add comments to messageDisappearingSettings
mchenani Jan 27, 2025
aeccd04
fix missing mappings
mchenani Jan 27, 2025
9ec0034
fix bindings_node
mchenani Jan 27, 2025
94f5b34
fix bindings_node
mchenani Jan 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
store metadatas into db
mchenani committed Jan 23, 2025
commit 8951e87ef9eed58682fe9db0093fdc79aab0fe7b
28 changes: 14 additions & 14 deletions bindings_ffi/src/mls.rs
Original file line number Diff line number Diff line change
@@ -790,7 +790,7 @@ pub struct FfiPermissionPolicySet {
pub update_group_description_policy: FfiPermissionPolicy,
pub update_group_image_url_square_policy: FfiPermissionPolicy,
pub update_group_pinned_frame_url_policy: FfiPermissionPolicy,
pub update_message_expiration_ms_policy: FfiPermissionPolicy,
pub update_message_disappearing_policy: FfiPermissionPolicy,
}

impl From<PreconfiguredPolicies> for FfiGroupPermissionsOptions {
@@ -826,13 +826,13 @@ impl TryFrom<FfiPermissionPolicySet> for PolicySet {
metadata_permissions_map.insert(
MetadataField::MessageDisappearFromNS.to_string(),
policy_set
.update_message_expiration_ms_policy
.update_message_disappearing_policy
.clone()
.try_into()?,
);
metadata_permissions_map.insert(
MetadataField::MessageDisappearInNS.to_string(),
policy_set.update_message_expiration_ms_policy.try_into()?,
policy_set.update_message_disappearing_policy.try_into()?,
);

Ok(PolicySet {
@@ -2263,7 +2263,7 @@ impl FfiGroupPermissions {
update_group_pinned_frame_url_policy: get_policy(
MetadataField::GroupPinnedFrameUrl.as_str(),
),
update_message_expiration_ms_policy: get_policy(
update_message_disappearing_policy: get_policy(
MetadataField::MessageDisappearInNS.as_str(),
),
})
@@ -4534,7 +4534,7 @@ mod tests {
update_group_description_policy: FfiPermissionPolicy::Admin,
update_group_image_url_square_policy: FfiPermissionPolicy::Admin,
update_group_pinned_frame_url_policy: FfiPermissionPolicy::Admin,
update_message_expiration_ms_policy: FfiPermissionPolicy::Admin,
update_message_disappearing_policy: FfiPermissionPolicy::Admin,
};
assert_eq!(alix_permission_policy_set, expected_permission_policy_set);

@@ -4564,7 +4564,7 @@ mod tests {
update_group_description_policy: FfiPermissionPolicy::Allow,
update_group_image_url_square_policy: FfiPermissionPolicy::Allow,
update_group_pinned_frame_url_policy: FfiPermissionPolicy::Allow,
update_message_expiration_ms_policy: FfiPermissionPolicy::Admin,
update_message_disappearing_policy: FfiPermissionPolicy::Admin,
};
assert_eq!(alix_permission_policy_set, expected_permission_policy_set);
}
@@ -4595,7 +4595,7 @@ mod tests {
update_group_description_policy: FfiPermissionPolicy::Allow,
update_group_image_url_square_policy: FfiPermissionPolicy::Allow,
update_group_pinned_frame_url_policy: FfiPermissionPolicy::Allow,
update_message_expiration_ms_policy: FfiPermissionPolicy::Allow,
update_message_disappearing_policy: FfiPermissionPolicy::Allow,
};
assert_eq!(alix_permission_policy_set, expected_permission_policy_set);

@@ -4625,7 +4625,7 @@ mod tests {
update_group_description_policy: FfiPermissionPolicy::Allow,
update_group_image_url_square_policy: FfiPermissionPolicy::Allow,
update_group_pinned_frame_url_policy: FfiPermissionPolicy::Allow,
update_message_expiration_ms_policy: FfiPermissionPolicy::Admin,
update_message_disappearing_policy: FfiPermissionPolicy::Admin,
};
assert_eq!(alix_permission_policy_set, expected_permission_policy_set);
}
@@ -4659,7 +4659,7 @@ mod tests {
update_group_description_policy: FfiPermissionPolicy::Admin,
update_group_image_url_square_policy: FfiPermissionPolicy::Admin,
update_group_pinned_frame_url_policy: FfiPermissionPolicy::Admin,
update_message_expiration_ms_policy: FfiPermissionPolicy::Admin,
update_message_disappearing_policy: FfiPermissionPolicy::Admin,
};
assert_eq!(alix_group_permissions, expected_permission_policy_set);

@@ -4687,7 +4687,7 @@ mod tests {
update_group_description_policy: FfiPermissionPolicy::Admin,
update_group_image_url_square_policy: FfiPermissionPolicy::Allow,
update_group_pinned_frame_url_policy: FfiPermissionPolicy::Admin,
update_message_expiration_ms_policy: FfiPermissionPolicy::Admin,
update_message_disappearing_policy: FfiPermissionPolicy::Admin,
};
assert_eq!(alix_group_permissions, new_expected_permission_policy_set);

@@ -4741,7 +4741,7 @@ mod tests {
update_group_pinned_frame_url_policy: FfiPermissionPolicy::Admin,
add_member_policy: FfiPermissionPolicy::Allow,
remove_member_policy: FfiPermissionPolicy::Deny,
update_message_expiration_ms_policy: FfiPermissionPolicy::Admin,
update_message_disappearing_policy: FfiPermissionPolicy::Admin,
};

let create_group_options = FfiCreateGroupOptions {
@@ -4792,7 +4792,7 @@ mod tests {
FfiPermissionPolicy::Admin
);
assert_eq!(
group_permissions_policy_set.update_message_expiration_ms_policy,
group_permissions_policy_set.update_message_disappearing_policy,
FfiPermissionPolicy::Admin
);
assert_eq!(
@@ -4858,7 +4858,7 @@ mod tests {
update_group_pinned_frame_url_policy: FfiPermissionPolicy::Admin,
add_member_policy: FfiPermissionPolicy::Allow,
remove_member_policy: FfiPermissionPolicy::Deny,
update_message_expiration_ms_policy: FfiPermissionPolicy::Admin,
update_message_disappearing_policy: FfiPermissionPolicy::Admin,
};

let custom_permissions_valid = FfiPermissionPolicySet {
@@ -4870,7 +4870,7 @@ mod tests {
update_group_pinned_frame_url_policy: FfiPermissionPolicy::Admin,
add_member_policy: FfiPermissionPolicy::Allow,
remove_member_policy: FfiPermissionPolicy::Deny,
update_message_expiration_ms_policy: FfiPermissionPolicy::Admin,
update_message_disappearing_policy: FfiPermissionPolicy::Admin,
};

let create_group_options_invalid_1 = FfiCreateGroupOptions {
6 changes: 3 additions & 3 deletions bindings_node/src/permissions.rs
Original file line number Diff line number Diff line change
@@ -161,7 +161,7 @@ pub struct PermissionPolicySet {
pub update_group_description_policy: PermissionPolicy,
pub update_group_image_url_square_policy: PermissionPolicy,
pub update_group_pinned_frame_url_policy: PermissionPolicy,
pub update_message_expiration_ms_policy: PermissionPolicy,
pub update_message_disappearing_policy: PermissionPolicy,
}

impl From<PreconfiguredPolicies> for GroupPermissionsOptions {
@@ -216,7 +216,7 @@ impl GroupPermissions {
update_group_pinned_frame_url_policy: get_policy(
XmtpMetadataField::GroupPinnedFrameUrl.as_str(),
),
update_message_expiration_ms_policy: get_policy(
update_message_disappearing_policy: get_policy(
XmtpMetadataField::MessageDisappearInNS.as_str(),
),
})
@@ -247,7 +247,7 @@ impl TryFrom<PermissionPolicySet> for PolicySet {
);
metadata_permissions_map.insert(
XmtpMetadataField::MessageDisappearInNS.to_string(),
policy_set.update_message_expiration_ms_policy.try_into()?,
policy_set.update_message_disappearing_policy.try_into()?,
);

Ok(PolicySet {
12 changes: 6 additions & 6 deletions bindings_wasm/src/permissions.rs
Original file line number Diff line number Diff line change
@@ -170,8 +170,8 @@ pub struct PermissionPolicySet {
pub update_group_image_url_square_policy: PermissionPolicy,
#[wasm_bindgen(js_name = updateGroupPinnedFrameUrlPolicy)]
pub update_group_pinned_frame_url_policy: PermissionPolicy,
#[wasm_bindgen(js_name = updateMessageExpirationPolicy)]
pub update_message_expiration_ms_policy: PermissionPolicy,
#[wasm_bindgen(js_name = updateMessageDisappearingPolicy)]
pub update_message_disappearing_policy: PermissionPolicy,
}

#[wasm_bindgen]
@@ -187,7 +187,7 @@ impl PermissionPolicySet {
update_group_description_policy: PermissionPolicy,
update_group_image_url_square_policy: PermissionPolicy,
update_group_pinned_frame_url_policy: PermissionPolicy,
update_message_expiration_ms_policy: PermissionPolicy,
update_message_disappearing_policy: PermissionPolicy,
) -> Self {
Self {
add_member_policy,
@@ -198,7 +198,7 @@ impl PermissionPolicySet {
update_group_description_policy,
update_group_image_url_square_policy,
update_group_pinned_frame_url_policy,
update_message_expiration_ms_policy,
update_message_disappearing_policy,
}
}
}
@@ -257,7 +257,7 @@ impl GroupPermissions {
update_group_pinned_frame_url_policy: get_policy(
XmtpMetadataField::GroupPinnedFrameUrl.as_str(),
),
update_message_expiration_ms_policy: get_policy(
update_message_disappearing_policy: get_policy(
XmtpMetadataField::MessageDisappearInNS.as_str(),
),
})
@@ -286,7 +286,7 @@ impl TryFrom<PermissionPolicySet> for PolicySet {
);
metadata_permissions_map.insert(
XmtpMetadataField::MessageDisappearInNS.to_string(),
policy_set.update_message_expiration_ms_policy.try_into()?,
policy_set.update_message_disappearing_policy.try_into()?,
);

Ok(PolicySet {
15 changes: 9 additions & 6 deletions xmtp_mls/src/groups/group_mutable_metadata.rs
Original file line number Diff line number Diff line change
@@ -6,7 +6,6 @@ use openmls::{
};
use prost::Message;
use thiserror::Error;

use xmtp_proto::xmtp::mls::message_contents::{
GroupMutableMetadataV1 as GroupMutableMetadataProto, Inboxes as InboxesProto,
};
@@ -58,8 +57,8 @@ impl MetadataField {
MetadataField::Description => "description",
MetadataField::GroupImageUrlSquare => "group_image_url_square",
MetadataField::GroupPinnedFrameUrl => "group_pinned_frame_url",
MetadataField::MessageDisappearFromNS => "message_expiration_from_ms",
MetadataField::MessageDisappearInNS => "message_expiration_ms",
MetadataField::MessageDisappearFromNS => "message_disappear_from_ns",
MetadataField::MessageDisappearInNS => "message_disappear_in_ns",
}
}
}
@@ -143,14 +142,18 @@ impl GroupMutableMetadata {
.unwrap_or_else(|| DEFAULT_GROUP_PINNED_FRAME_URL.to_string()),
);

if let Some(message_retention_settings) = opts.message_disappearing_settings {
if let Some(message_disappearing_settings) = opts.message_disappearing_settings {
println!(
"message_disappearing_setting:{:?}",
message_disappearing_settings.clone()
);
attributes.insert(
MetadataField::MessageDisappearFromNS.to_string(),
message_retention_settings.from_ns.to_string(),
message_disappearing_settings.from_ns.to_string(),
);
attributes.insert(
MetadataField::MessageDisappearInNS.to_string(),
message_retention_settings.in_ns.to_string(),
message_disappearing_settings.in_ns.to_string(),
);
}

35 changes: 33 additions & 2 deletions xmtp_mls/src/groups/mls_sync.rs
Original file line number Diff line number Diff line change
@@ -8,6 +8,8 @@ use super::{
validated_commit::{extract_group_membership, CommitValidationError},
GroupError, HmacKey, MlsGroup, ScopedGroupClient,
};
use crate::groups::group_mutable_metadata::MetadataField;
use crate::storage::group_intent::IntentKind::MetadataUpdate;
use crate::{
configuration::{
GRPC_DATA_LIMIT, HMAC_SALT, MAX_GROUP_SIZE, MAX_INTENT_PUBLISH_ATTEMPTS, MAX_PAST_EPOCHS,
@@ -761,15 +763,15 @@ where
intent_id
);
match self
.process_own_message(intent, provider, message.into(), envelope)
.process_own_message(intent.clone(), provider, message.into(), envelope)
.await?
{
IntentState::ToPublish => {
Ok(provider.conn_ref().set_group_intent_to_publish(intent_id)?)
}
IntentState::Committed => {
self.handle_metadata_update(&provider, &intent)?;
Ok(provider.conn_ref().set_group_intent_committed(intent_id)?)
//todo: update the group based on the intent here?
}
IntentState::Published => {
tracing::error!("Unexpected behaviour: returned intent state published from process_own_message");
@@ -799,6 +801,35 @@ where
}
}

/// In case of metadataUpdate will extract the updated fields and store them to the db
fn handle_metadata_update(
&self,
provider: &XmtpOpenMlsProvider,
intent: &StoredGroupIntent,
) -> Result<(), StorageError> {
if intent.kind == MetadataUpdate {
let data = UpdateMetadataIntentData::try_from(intent.data.clone())?;

match data.field_name.as_str() {
field_name if field_name == MetadataField::MessageDisappearFromNS.as_str() => {
provider.conn_ref().update_message_disappearing_from_ns(
self.group_id.clone(),
data.field_value.parse::<i64>().ok(),
)?
}
field_name if field_name == MetadataField::MessageDisappearInNS.as_str() => {
provider.conn_ref().update_message_disappearing_in_ns(
self.group_id.clone(),
data.field_value.parse::<i64>().ok(),
)?
}
_ => {} // handle other metadata updates
}
}

Ok(())
}

#[tracing::instrument(level = "trace", skip_all)]
async fn consume_message(
&self,
18 changes: 9 additions & 9 deletions xmtp_mls/src/groups/mod.rs
Original file line number Diff line number Diff line change
@@ -1193,20 +1193,20 @@ impl<ScopedClient: ScopedGroupClient> MlsGroup<ScopedClient> {
provider: &XmtpOpenMlsProvider,
) -> Result<ConversationMessageDisappearingSettings, GroupError> {
let mutable_metadata = self.mutable_metadata(provider)?;
let expire_from_ms = mutable_metadata
let disappear_from_ns = mutable_metadata
.attributes
.get(&MetadataField::MessageDisappearFromNS.to_string());
let expire_in_ms = mutable_metadata
let disappear_in_ns = mutable_metadata
.attributes
.get(&MetadataField::MessageDisappearInNS.to_string());

if let (Some(Ok(message_expiration_from_ms)), Some(Ok(message_expiration_ms))) = (
expire_from_ms.map(|s| s.parse::<i64>()),
expire_in_ms.map(|s| s.parse::<i64>()),
if let (Some(Ok(message_disappear_from_ns)), Some(Ok(message_disappear_in_ns))) = (
disappear_from_ns.map(|s| s.parse::<i64>()),
disappear_in_ns.map(|s| s.parse::<i64>()),
) {
Ok(ConversationMessageDisappearingSettings::new(
message_expiration_from_ms,
message_expiration_ms,
message_disappear_from_ns,
message_disappear_in_ns,
))
} else {
Err(GroupError::GroupMetadata(
@@ -3064,7 +3064,7 @@ pub(crate) mod tests {
.attributes
.get(&MetadataField::MessageDisappearFromNS.to_string())
.unwrap();
let amal_message_expiration_ms: &String = binding
let amal_message_disappear_in_ns: &String = binding
.attributes
.get(&MetadataField::MessageDisappearInNS.to_string())
.unwrap();
@@ -3075,7 +3075,7 @@ pub(crate) mod tests {
.to_string()
);
assert_eq!(
amal_message_expiration_ms.clone(),
amal_message_disappear_in_ns.clone(),
expected_group_message_expiration_settings.in_ns.to_string()
);
}
28 changes: 28 additions & 0 deletions xmtp_mls/src/storage/encrypted_store/group.rs
Original file line number Diff line number Diff line change
@@ -468,6 +468,34 @@ impl DbConnection {
Ok(())
}

pub fn update_message_disappearing_from_ns(
&self,
group_id: Vec<u8>,
from_ns: Option<i64>,
) -> Result<(), StorageError> {
self.raw_query(|conn| {
diesel::update(dsl::groups.find(&group_id))
.set(dsl::message_disappear_from_ns.eq(from_ns))
.execute(conn)
})?;

Ok(())
}

pub fn update_message_disappearing_in_ns(
&self,
group_id: Vec<u8>,
in_ns: Option<i64>,
) -> Result<(), StorageError> {
self.raw_query(|conn| {
diesel::update(dsl::groups.find(&group_id))
.set(dsl::message_disappear_in_ns.eq(in_ns))
.execute(conn)
})?;

Ok(())
}

pub fn insert_or_replace_group(&self, group: StoredGroup) -> Result<StoredGroup, StorageError> {
tracing::info!("Trying to insert group");
let stored_group = self.raw_query(|conn| {