Skip to content

Commit

Permalink
base: Add state store method to fetch several presence events at once
Browse files Browse the repository at this point in the history
Signed-off-by: Kévin Commaille <[email protected]>
  • Loading branch information
zecakeh authored and jplatte committed Jun 14, 2023
1 parent b8abd1c commit 7cba6d0
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 3 deletions.
66 changes: 66 additions & 0 deletions crates/matrix-sdk-base/src/store/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ pub trait StateStoreIntegrationTests {
async fn test_stripped_non_stripped(&self) -> Result<()>;
/// Test room removal.
async fn test_room_removal(&self) -> Result<()>;
/// Test presence saving.
async fn test_presence_saving(&self);
}

#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
Expand Down Expand Up @@ -1012,6 +1014,53 @@ impl StateStoreIntegrationTests for DynStateStore {
assert!(stripped_rooms.is_empty(), "still stripped room info found");
Ok(())
}

async fn test_presence_saving(&self) {
let user_id = user_id();
let second_user_id = user_id!("@second:localhost");
let third_user_id = user_id!("@third:localhost");
let unknown_user_id = user_id!("@unknown:localhost");

// No event in store.
let mut user_ids = vec![user_id.to_owned()];
let presence_event = self.get_presence_event(user_id).await;
assert!(presence_event.unwrap().is_none());
let presence_events = self.get_presence_events(&user_ids).await;
assert!(presence_events.unwrap().is_empty());

// One event in store.
let mut changes = StateChanges::default();
changes.presence.insert(user_id.to_owned(), custom_presence_event(user_id));
self.save_changes(&changes).await.unwrap();

let presence_event = self.get_presence_event(user_id).await;
assert!(presence_event.unwrap().is_some());
let presence_events = self.get_presence_events(&user_ids).await;
assert_eq!(presence_events.unwrap().len(), 1);

// Several events in store.
let mut changes = StateChanges::default();
changes.presence.insert(second_user_id.to_owned(), custom_presence_event(second_user_id));
changes.presence.insert(third_user_id.to_owned(), custom_presence_event(third_user_id));
self.save_changes(&changes).await.unwrap();

user_ids.extend([second_user_id.to_owned(), third_user_id.to_owned()]);
let presence_event = self.get_presence_event(second_user_id).await;
assert!(presence_event.unwrap().is_some());
let presence_event = self.get_presence_event(third_user_id).await;
assert!(presence_event.unwrap().is_some());
let presence_events = self.get_presence_events(&user_ids).await;
assert_eq!(presence_events.unwrap().len(), 3);

// Several events in store with one unknown.
user_ids.push(unknown_user_id.to_owned());
let member_events = self.get_presence_events(&user_ids).await;
assert_eq!(member_events.unwrap().len(), 3);

// Empty user IDs list.
let presence_events = self.get_presence_events(&[]).await;
assert!(presence_events.unwrap().is_empty());
}
}

/// Macro building to allow your StateStore implementation to run the entire
Expand Down Expand Up @@ -1136,6 +1185,12 @@ macro_rules! statestore_integration_tests {
let store = get_store().await?.into_state_store();
store.test_room_removal().await
}

#[async_test]
async fn test_presence_saving() {
let store = get_store().await.expect("creating store failed").into_state_store();
store.test_presence_saving().await;
}
};
}

Expand Down Expand Up @@ -1205,3 +1260,14 @@ fn custom_membership_event(user_id: &UserId, event_id: &EventId) -> Raw<SyncRoom

Raw::new(&ev_json).unwrap().cast()
}

fn custom_presence_event(user_id: &UserId) -> Raw<PresenceEvent> {
let ev_json = json!({
"content": {
"presence": "online"
},
"sender": user_id,
});

Raw::new(&ev_json).unwrap().cast()
}
22 changes: 19 additions & 3 deletions crates/matrix-sdk-base/src/store/memory_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,8 +360,17 @@ impl MemoryStore {
Ok(())
}

async fn get_presence_event(&self, user_id: &UserId) -> Result<Option<Raw<PresenceEvent>>> {
Ok(self.presence.get(user_id).map(|p| p.clone()))
async fn get_presence_events<'a, I>(&self, user_ids: I) -> Result<Vec<Raw<PresenceEvent>>>
where
I: IntoIterator<Item = &'a UserId>,
I::IntoIter: ExactSizeIterator,
{
let user_ids = user_ids.into_iter();
if user_ids.len() == 0 {
return Ok(Vec::new());
}

Ok(user_ids.filter_map(|user_id| self.presence.get(user_id).map(|p| p.clone())).collect())
}

async fn get_state_events(
Expand Down Expand Up @@ -597,7 +606,14 @@ impl StateStore for MemoryStore {
}

async fn get_presence_event(&self, user_id: &UserId) -> Result<Option<Raw<PresenceEvent>>> {
self.get_presence_event(user_id).await
Ok(self.get_presence_events(iter::once(user_id)).await?.into_iter().next())
}

async fn get_presence_events(
&self,
user_ids: &[OwnedUserId],
) -> Result<Vec<Raw<PresenceEvent>>> {
self.get_presence_events(user_ids.iter().map(AsRef::as_ref)).await
}

async fn get_state_event(
Expand Down
17 changes: 17 additions & 0 deletions crates/matrix-sdk-base/src/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,16 @@ pub trait StateStore: AsyncTraitDeps {
user_id: &UserId,
) -> Result<Option<Raw<PresenceEvent>>, Self::Error>;

/// Get the stored presence events for the given users.
///
/// # Arguments
///
/// * `user_ids` - The IDs of the users to fetch the presence events for.
async fn get_presence_events(
&self,
user_ids: &[OwnedUserId],
) -> Result<Vec<Raw<PresenceEvent>>, Self::Error>;

/// Get a state event out of the state store.
///
/// # Arguments
Expand Down Expand Up @@ -388,6 +398,13 @@ impl<T: StateStore> StateStore for EraseStateStoreError<T> {
self.0.get_presence_event(user_id).await.map_err(Into::into)
}

async fn get_presence_events(
&self,
user_ids: &[OwnedUserId],
) -> Result<Vec<Raw<PresenceEvent>>, Self::Error> {
self.0.get_presence_events(user_ids).await.map_err(Into::into)
}

async fn get_state_event(
&self,
room_id: &RoomId,
Expand Down
29 changes: 29 additions & 0 deletions crates/matrix-sdk-indexeddb/src/state_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,35 @@ impl_state_store!({
.transpose()
}

async fn get_presence_events(
&self,
user_ids: &[OwnedUserId],
) -> Result<Vec<Raw<PresenceEvent>>> {
if user_ids.is_empty() {
return Ok(Vec::new());
}

let txn = self
.inner
.transaction_on_one_with_mode(keys::PRESENCE, IdbTransactionMode::Readonly)?;
let store = txn.object_store(keys::PRESENCE)?;

let mut events = Vec::with_capacity(user_ids.len());

for user_id in user_ids {
if let Some(event) = store
.get(&self.encode_key(keys::PRESENCE, user_id))?
.await?
.map(|f| self.deserialize_event(&f))
.transpose()?
{
events.push(event)
}
}

Ok(events)
}

async fn get_state_event(
&self,
room_id: &RoomId,
Expand Down
29 changes: 29 additions & 0 deletions crates/matrix-sdk-sqlite/src/state_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,17 @@ trait SqliteObjectStateStoreExt: SqliteObjectExt {
.optional()?)
}

async fn get_kv_blobs(&self, keys: Vec<Key>) -> Result<Vec<Vec<u8>>> {
let sql_params = vec!["?"; keys.len()].join(", ");
let sql = format!("SELECT value FROM kv_blob WHERE key IN ({sql_params})");

Ok(self
.prepare(sql, move |mut stmt| {
stmt.query(rusqlite::params_from_iter(keys))?.mapped(|row| row.get(0)).collect()
})
.await?)
}

async fn set_kv_blob(&self, key: Key, value: Vec<u8>) -> Result<()>;

async fn delete_kv_blob(&self, key: Key) -> Result<()> {
Expand Down Expand Up @@ -1110,6 +1121,24 @@ impl StateStore for SqliteStateStore {
.transpose()
}

async fn get_presence_events(
&self,
user_ids: &[OwnedUserId],
) -> Result<Vec<Raw<PresenceEvent>>> {
if user_ids.is_empty() {
return Ok(Vec::new());
}

let user_ids = user_ids.iter().map(|u| self.encode_presence_key(u)).collect();
self.acquire()
.await?
.get_kv_blobs(user_ids)
.await?
.into_iter()
.map(|data| self.deserialize_json(&data))
.collect()
}

async fn get_state_event(
&self,
room_id: &RoomId,
Expand Down

0 comments on commit 7cba6d0

Please sign in to comment.