From 4d3ca15be3488af6f9e9565dea692d19d27696b4 Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Mon, 19 Jun 2023 10:56:58 +0200 Subject: [PATCH] feat(ui): Implement `Notification` API (#2023) This is the first PR for splitting the sync loop into two. This offers a new high-level API, `NotificationApi`, that makes use of a separate `SlidingSync` instance which sole role is to listen to to-device events and e2ee; it's pre-configured to do so. That means we're not force-enabling e2ee and to-device by default for every sliding sync instance, and as such we won't either generate Olm requests to the home server in general. In the future, this new high-level API will hide some low-level dirty details so that its can be instantiated in multiple processes at the same time (lock across process, invalidate and refill crypto caches, etc.). An embedder who would want to make use of this would need the following: - a main sliding sync instance, without e2ee and to-device. Using the `matrix_sdk_ui::RoomList` would be the best bet, at this time. - an instance of this `matrix_sdk_ui::NotificationApi`, with a different identifier. Note that this is not ready to be used in an external process; or it will cause the same kind of issues that we're seeing as of today: invalid crypto caches resulting in UTD, etc. Fixes https://github.com/matrix-org/matrix-rust-sdk/issues/1961. --- .../matrix-sdk-crypto-nodejs/src/machine.rs | 2 +- bindings/matrix-sdk-ffi/Cargo.toml | 2 +- bindings/matrix-sdk-ffi/src/error.rs | 7 + bindings/matrix-sdk-ffi/src/sliding_sync.rs | 65 +++++++- crates/matrix-sdk-ui/Cargo.toml | 3 +- crates/matrix-sdk-ui/src/lib.rs | 2 + crates/matrix-sdk-ui/src/notifications/mod.rs | 117 ++++++++++++++ crates/matrix-sdk-ui/src/room_list/mod.rs | 5 +- .../matrix-sdk-ui/tests/integration/main.rs | 3 + .../tests/integration/notification.rs | 130 ++++++++++++++++ .../tests/integration/room_list.rs | 143 +++++++----------- .../tests/integration/sliding_sync.rs | 99 ++++++++++++ crates/matrix-sdk/src/sliding_sync/builder.rs | 14 +- crates/matrix-sdk/src/sliding_sync/mod.rs | 58 ++++--- 14 files changed, 528 insertions(+), 122 deletions(-) create mode 100644 crates/matrix-sdk-ui/src/notifications/mod.rs create mode 100644 crates/matrix-sdk-ui/tests/integration/notification.rs create mode 100644 crates/matrix-sdk-ui/tests/integration/sliding_sync.rs diff --git a/bindings/matrix-sdk-crypto-nodejs/src/machine.rs b/bindings/matrix-sdk-crypto-nodejs/src/machine.rs index b458d76cfdb..1d3d92807c9 100644 --- a/bindings/matrix-sdk-crypto-nodejs/src/machine.rs +++ b/bindings/matrix-sdk-crypto-nodejs/src/machine.rs @@ -152,7 +152,7 @@ impl OlmMachine { #[napi(constructor)] pub fn new() -> napi::Result { Err(napi::Error::from_reason( - "To build an `OldMachine`, please use the `initialize` method", + "To build an `OlmMachine`, please use the `initialize` method", )) } diff --git a/bindings/matrix-sdk-ffi/Cargo.toml b/bindings/matrix-sdk-ffi/Cargo.toml index c8369181589..4344e81c102 100644 --- a/bindings/matrix-sdk-ffi/Cargo.toml +++ b/bindings/matrix-sdk-ffi/Cargo.toml @@ -29,7 +29,7 @@ eyeball-im = { workspace = true } extension-trait = "1.0.1" futures-core = { workspace = true } futures-util = { workspace = true } -matrix-sdk-ui = { path = "../../crates/matrix-sdk-ui", default-features = false, features = ["e2e-encryption", "experimental-room-list"] } +matrix-sdk-ui = { path = "../../crates/matrix-sdk-ui", default-features = false, features = ["e2e-encryption", "experimental-room-list", "experimental-notification"] } mime = "0.3.16" # FIXME: we currently can't feature flag anything in the api.udl, therefore we must enforce experimental-sliding-sync being exposed here.. # see https://github.com/matrix-org/matrix-rust-sdk/issues/1014 diff --git a/bindings/matrix-sdk-ffi/src/error.rs b/bindings/matrix-sdk-ffi/src/error.rs index 0e9f802f3b2..a086f731691 100644 --- a/bindings/matrix-sdk-ffi/src/error.rs +++ b/bindings/matrix-sdk-ffi/src/error.rs @@ -1,6 +1,7 @@ use std::fmt::Display; use matrix_sdk::{self, encryption::CryptoStoreError, HttpError, IdParseError, StoreError}; +use matrix_sdk_ui::notifications; #[derive(Debug, thiserror::Error)] pub enum ClientError { @@ -68,6 +69,12 @@ impl From for ClientError { } } +impl From for ClientError { + fn from(e: notifications::Error) -> Self { + Self::new(e) + } +} + #[derive(Debug, thiserror::Error, uniffi::Error)] #[uniffi(flat_error)] pub enum RoomError { diff --git a/bindings/matrix-sdk-ffi/src/sliding_sync.rs b/bindings/matrix-sdk-ffi/src/sliding_sync.rs index 77c53c07563..5841b2d5a4f 100644 --- a/bindings/matrix-sdk-ffi/src/sliding_sync.rs +++ b/bindings/matrix-sdk-ffi/src/sliding_sync.rs @@ -15,7 +15,9 @@ use matrix_sdk::{ LoopCtrl, RoomListEntry as MatrixRoomEntry, SlidingSyncBuilder as MatrixSlidingSyncBuilder, SlidingSyncListLoadingState, SlidingSyncMode, }; -use matrix_sdk_ui::timeline::SlidingSyncRoomExt; +use matrix_sdk_ui::{ + notifications::NotificationSync as MatrixNotificationSync, timeline::SlidingSyncRoomExt, +}; use tracing::{error, warn}; use crate::{ @@ -869,6 +871,55 @@ impl SlidingSyncBuilder { } } +#[uniffi::export(callback_interface)] +pub trait NotificationSyncListener: Sync + Send { + /// Called whenever the notification sync loop terminates, and must be + /// restarted. + fn did_terminate(&self); +} + +/// Full context for the notification sync loop. +#[derive(uniffi::Object)] +pub struct NotificationSync { + /// Unused field, kept for its `Drop` semantics. + _handle: TaskHandle, +} + +impl NotificationSync { + fn start( + notification: MatrixNotificationSync, + listener: Box, + ) -> TaskHandle { + TaskHandle::new(RUNTIME.spawn(async move { + let stream = notification.sync(); + pin_mut!(stream); + + loop { + match stream.next().await { + Some(Ok(())) => { + // Yay. + } + + None => { + warn!("Notification sliding sync ended"); + break; + } + + Some(Err(err)) => { + // The internal sliding sync instance already handles retries for us, so if + // we get an error here, it means the maximum number of retries has been + // reached, and there's not much we can do anymore. + warn!("Error when handling notifications: {err}"); + break; + } + } + } + + listener.did_terminate(); + })) + } +} + #[uniffi::export] impl Client { /// Creates a new Sliding Sync instance with the given identifier. @@ -878,4 +929,16 @@ impl Client { let inner = self.inner.sliding_sync(id)?; Ok(Arc::new(SlidingSyncBuilder { inner, client: self.clone() })) } + + pub fn notification_sliding_sync( + &self, + id: String, + listener: Box, + ) -> Result, ClientError> { + RUNTIME.block_on(async move { + let inner = MatrixNotificationSync::new(id, self.inner.clone()).await?; + let handle = NotificationSync::start(inner, listener); + Ok(Arc::new(NotificationSync { _handle: handle })) + }) + } } diff --git a/crates/matrix-sdk-ui/Cargo.toml b/crates/matrix-sdk-ui/Cargo.toml index fe664e34d93..3ae90d905ca 100644 --- a/crates/matrix-sdk-ui/Cargo.toml +++ b/crates/matrix-sdk-ui/Cargo.toml @@ -4,7 +4,7 @@ version = "0.6.0" edition = "2021" [features] -default = ["e2e-encryption", "native-tls", "experimental-room-list"] +default = ["e2e-encryption", "native-tls", "experimental-room-list", "experimental-notification"] e2e-encryption = ["matrix-sdk/e2e-encryption"] @@ -12,6 +12,7 @@ native-tls = ["matrix-sdk/native-tls"] rustls-tls = ["matrix-sdk/rustls-tls"] experimental-room-list = ["experimental-sliding-sync", "dep:async-stream", "dep:eyeball-im-util"] +experimental-notification = ["experimental-sliding-sync", "dep:async-stream"] experimental-sliding-sync = ["matrix-sdk/experimental-sliding-sync"] testing = ["dep:eyeball-im-util"] diff --git a/crates/matrix-sdk-ui/src/lib.rs b/crates/matrix-sdk-ui/src/lib.rs index a08debc48f6..831489187c4 100644 --- a/crates/matrix-sdk-ui/src/lib.rs +++ b/crates/matrix-sdk-ui/src/lib.rs @@ -14,6 +14,8 @@ mod events; +#[cfg(feature = "experimental-notification")] +pub mod notifications; #[cfg(feature = "experimental-room-list")] pub mod room_list; pub mod timeline; diff --git a/crates/matrix-sdk-ui/src/notifications/mod.rs b/crates/matrix-sdk-ui/src/notifications/mod.rs new file mode 100644 index 00000000000..6ede78ad241 --- /dev/null +++ b/crates/matrix-sdk-ui/src/notifications/mod.rs @@ -0,0 +1,117 @@ +// Copyright 2023 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for that specific language governing permissions and +// limitations under the License. + +//! Notification API. +//! +//! The notification API is a high-level helper that is designed to take care of +//! handling the synchronization of notifications, be they received within the +//! app or within a dedicated notification process (e.g. the [NSE] process on +//! iOS devices). +//! +//! Under the hood, this uses a sliding sync instance configured with no lists, +//! but that enables the e2ee and to-device extensions, so that it can both +//! handle encryption and manage encryption keys; that's sufficient to decrypt +//! messages received in the notification processes. +//! +//! As this may be used across different processes, this also makes sure that +//! there's only one process writing to the databases holding encryption +//! information. TODO as of 2023-06-06, this hasn't been done yet. +//! +//! [NSE]: https://developer.apple.com/documentation/usernotifications/unnotificationserviceextension + +use async_stream::stream; +use futures_core::stream::Stream; +use futures_util::{pin_mut, StreamExt}; +use matrix_sdk::{Client, SlidingSync}; +use ruma::{api::client::sync::sync_events::v4, assign}; +use tracing::error; + +/// High-level helper for synchronizing notifications using sliding sync. +/// +/// See the module's documentation for more details. +#[derive(Clone)] +pub struct NotificationSync { + sliding_sync: SlidingSync, +} + +impl NotificationSync { + /// Creates a new instance of a `NotificationSync`. + /// + /// This will create and manage an instance of [`matrix_sdk::SlidingSync`]. + /// The `id` is used as the identifier of that instance, as such make + /// sure to not reuse a name used by another Sliding Sync instance, at + /// the risk of causing problems. + pub async fn new(id: impl Into, client: Client) -> Result { + let sliding_sync = client + .sliding_sync(id)? + .enable_caching()? + .with_to_device_extension( + assign!(v4::ToDeviceConfig::default(), { enabled: Some(true)}), + ) + .with_e2ee_extension(assign!(v4::E2EEConfig::default(), { enabled: Some(true)})) + .build() + .await?; + + Ok(Self { sliding_sync }) + } + + /// Start synchronization of notifications. + /// + /// This should be regularly polled, so as to ensure that the notifications + /// are sync'd. + pub fn sync(&self) -> impl Stream> + '_ { + stream!({ + let sync = self.sliding_sync.sync(); + + pin_mut!(sync); + + loop { + match sync.next().await { + Some(Ok(update_summary)) => { + // This API is only concerned with the e2ee and to-device extensions. + // Warn if anything weird has been received from the proxy. + if !update_summary.lists.is_empty() || !update_summary.rooms.is_empty() { + yield Err(Error::UnexpectedNonEmptyListsOrRooms); + } else { + // Cool cool, let's do it again. + yield Ok(()); + } + + continue; + } + + Some(Err(err)) => { + yield Err(err.into()); + + break; + } + + None => { + break; + } + } + } + }) + } +} + +/// Errors for the [`NotificationSync`]. +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("Unexpected rooms or lists in the sliding sync response.")] + UnexpectedNonEmptyListsOrRooms, + + #[error("Something wrong happened in sliding sync: {0:#}")] + SlidingSyncError(#[from] matrix_sdk::Error), +} diff --git a/crates/matrix-sdk-ui/src/room_list/mod.rs b/crates/matrix-sdk-ui/src/room_list/mod.rs index 6b7d9975af3..0e06c5ac229 100644 --- a/crates/matrix-sdk-ui/src/room_list/mod.rs +++ b/crates/matrix-sdk-ui/src/room_list/mod.rs @@ -77,7 +77,7 @@ use matrix_sdk::{ }; pub use room::*; use ruma::{ - api::client::sync::sync_events::v4::SyncRequestListFilters, + api::client::sync::sync_events::v4::{E2EEConfig, SyncRequestListFilters, ToDeviceConfig}, assign, events::{StateEventType, TimelineEventType}, OwnedRoomId, RoomId, @@ -104,6 +104,9 @@ impl RoomList { .enable_caching() .map_err(Error::SlidingSync)? .with_common_extensions() + // TODO different strategy when the encryption sync is in main by default + .with_e2ee_extension(assign!(E2EEConfig::default(), { enabled: Some(true) })) + .with_to_device_extension(assign!(ToDeviceConfig::default(), { enabled: Some(true) })) // TODO revert to `add_cached_list` when reloading rooms from the cache is blazingly // fast .add_list( diff --git a/crates/matrix-sdk-ui/tests/integration/main.rs b/crates/matrix-sdk-ui/tests/integration/main.rs index a507258ba60..4489ad5933a 100644 --- a/crates/matrix-sdk-ui/tests/integration/main.rs +++ b/crates/matrix-sdk-ui/tests/integration/main.rs @@ -21,8 +21,11 @@ use wiremock::{ Mock, MockServer, ResponseTemplate, }; +#[cfg(feature = "experimental-notification")] +mod notification; #[cfg(feature = "experimental-room-list")] mod room_list; +mod sliding_sync; mod timeline; #[cfg(all(test, not(target_arch = "wasm32")))] diff --git a/crates/matrix-sdk-ui/tests/integration/notification.rs b/crates/matrix-sdk-ui/tests/integration/notification.rs new file mode 100644 index 00000000000..7a2c4df11c4 --- /dev/null +++ b/crates/matrix-sdk-ui/tests/integration/notification.rs @@ -0,0 +1,130 @@ +use futures_util::{pin_mut, StreamExt as _}; +use matrix_sdk_test::async_test; +use matrix_sdk_ui::notifications::NotificationSync; + +use crate::{logged_in_client, sliding_sync_then_assert_request_and_fake_response}; + +#[async_test] +async fn test_smoke_test_notification_api() -> anyhow::Result<()> { + let (client, server) = logged_in_client().await; + + let notification_api = NotificationSync::new("notifs".to_owned(), client).await?; + + let notification_stream = notification_api.sync(); + pin_mut!(notification_stream); + + // Requests enable the e2ee and to_device extensions on the first run. + sliding_sync_then_assert_request_and_fake_response! { + [server, notification_stream] + assert request = { + "conn_id": "notifs", + "extensions": { + "e2ee": { + "enabled": true + }, + "to_device": { + "enabled": true + } + } + }, + respond with = { + "pos": "0" + }, + }; + + // The request then passes the `pos`ition marker to the next request, as usual + // in sliding sync. The extensions haven't changed, so they're not updated + // (sticky parameters ftw). + sliding_sync_then_assert_request_and_fake_response! { + [server, notification_stream] + assert request = { + "conn_id": "notifs", + }, + respond with = { + "pos": "1", + "extensions": { + "to_device": { + "next_batch": "nb0" + } + } + }, + }; + + // The to-device since token is passed from the previous request. + // The extensions haven't changed, so they're not updated (sticky parameters + // ftw). + sliding_sync_then_assert_request_and_fake_response! { + [server, notification_stream] + assert request = { + "conn_id": "notifs", + "extensions": { + "to_device": { + "since": "nb0" + } + } + }, + respond with = { + "pos": "2", + "extensions": { + "to_device": { + "next_batch": "nb1" + } + } + }, + }; + + // The to-device since token is passed from the previous request. + // The extensions haven't changed, so they're not updated (sticky parameters + // ftw)... in the first request. Then, the sliding sync instance will retry + // those requests, so it will include them again; as a matter of fact, the + // last request that we assert against will contain those. + sliding_sync_then_assert_request_and_fake_response! { + [server, notification_stream] + sync matches Some(Err(_)), + assert request = { + "conn_id": "notifs", + "extensions": { + "e2ee": { + "enabled": true, + }, + "to_device": { + "enabled": true, + "since": "nb1" + } + } + }, + respond with = (code 400) { + "error": "foo", + "errcode": "M_UNKNOWN_POS", + }, + }; + + // The notification stream will stop, as it ran into an error. + assert!(notification_stream.next().await.is_none()); + + // Start a new sync. + let notification_stream = notification_api.sync(); + pin_mut!(notification_stream); + + // The next request will contain sticky parameters again. + sliding_sync_then_assert_request_and_fake_response! { + [server, notification_stream] + assert request = { + "conn_id": "notifs", + "extensions": { + "e2ee": { + "enabled": true + }, + "to_device": { + "enabled": true, + "since": "nb1" + } + } + }, + respond with = { + "pos": "a" + }, + }; + + Ok(()) +} diff --git a/crates/matrix-sdk-ui/tests/integration/room_list.rs b/crates/matrix-sdk-ui/tests/integration/room_list.rs index b5f51b90ecb..4fc4347e2ce 100644 --- a/crates/matrix-sdk-ui/tests/integration/room_list.rs +++ b/crates/matrix-sdk-ui/tests/integration/room_list.rs @@ -21,7 +21,7 @@ use ruma::{ }; use serde_json::json; use stream_assert::{assert_next_eq, assert_pending}; -use wiremock::{http::Method, Match, Mock, MockServer, Request, ResponseTemplate}; +use wiremock::MockServer; use crate::{ logged_in_client, @@ -35,52 +35,34 @@ async fn new_room_list() -> Result<(MockServer, RoomList), Error> { Ok((server, room_list)) } -#[derive(Copy, Clone)] -struct SlidingSyncMatcher; - -impl Match for SlidingSyncMatcher { - fn matches(&self, request: &Request) -> bool { - request.url.path() == "/_matrix/client/unstable/org.matrix.msc3575/sync" - && request.method == Method::Post - } -} - +// Same macro as in the main, with additional checking that the state +// before/after the sync loop match those we expect. macro_rules! sync_then_assert_request_and_fake_response { ( - [$server:ident, $room_list:ident, $room_list_sync_stream:ident] + [$server:ident, $room_list:ident, $stream:ident] $( states = $pre_state:pat => $post_state:pat, )? - assert request = { $( $request_json:tt )* }, + assert request >= { $( $request_json:tt )* }, respond with = $( ( code $code:expr ) )? { $( $response_json:tt )* } $(,)? ) => { sync_then_assert_request_and_fake_response! { - [$server, $room_list, $room_list_sync_stream] + [$server, $room_list, $stream] sync matches Some(Ok(_)), $( states = $pre_state => $post_state, )? - assert request = { $( $request_json )* }, + assert request >= { $( $request_json )* }, respond with = $( ( code $code ) )? { $( $response_json )* }, } }; ( - [$server:ident, $room_list:ident, $room_list_sync_stream:ident] + [$server:ident, $room_list:ident, $stream:ident] sync matches $sync_result:pat, $( states = $pre_state:pat => $post_state:pat, )? - assert request = { $( $request_json:tt )* }, + assert request >= { $( $request_json:tt )* }, respond with = $( ( code $code:expr ) )? { $( $response_json:tt )* } $(,)? ) => { { - let _code = 200; - $( let _code = $code; )? - - let _mock_guard = Mock::given(SlidingSyncMatcher) - .respond_with(ResponseTemplate::new(_code).set_body_json( - json!({ $( $response_json )* }) - )) - .mount_as_scoped(&$server) - .await; - $( use State::*; @@ -89,26 +71,12 @@ macro_rules! sync_then_assert_request_and_fake_response { assert_matches!(state.get(), $pre_state, "pre state"); )? - let next = $room_list_sync_stream.next().await; - - assert_matches!(next, $sync_result, "sync's result"); - - for request in $server.received_requests().await.expect("Request recording has been disabled").iter().rev() { - if SlidingSyncMatcher.matches(request) { - let json_value = serde_json::from_slice::(&request.body).unwrap(); - - if let Err(error) = assert_json_diff::assert_json_matches_no_panic( - &json_value, - &json!({ $( $request_json )* }), - assert_json_diff::Config::new(assert_json_diff::CompareMode::Inclusive), - ) { - dbg!(json_value); - panic!("{}", error); - } - - break; - } - } + let next = super::sliding_sync_then_assert_request_and_fake_response! { + [$server, $stream] + sync matches $sync_result, + assert request >= { $( $request_json )* }, + respond with = $( ( code $code ) )? { $( $response_json )* }, + }; $( assert_matches!(state.next().now_or_never(), Some(Some($post_state)), "post state"); )? @@ -247,7 +215,7 @@ async fn test_sync_from_init_to_enjoy() -> Result<(), Error> { sync_then_assert_request_and_fake_response! { [server, room_list, sync] states = Init => FirstRooms, - assert request = { + assert request >= { "lists": { ALL_ROOMS: { "ranges": [ @@ -310,13 +278,12 @@ async fn test_sync_from_init_to_enjoy() -> Result<(), Error> { sync_then_assert_request_and_fake_response! { [server, room_list, sync] states = FirstRooms => AllRooms, - assert request = { + assert request >= { "lists": { ALL_ROOMS: { "ranges": [ [0, 49], ], - "timeline_limit": 1, }, VISIBLE_ROOMS: { "ranges": [[0, 19]], @@ -383,7 +350,7 @@ async fn test_sync_from_init_to_enjoy() -> Result<(), Error> { sync_then_assert_request_and_fake_response! { [server, room_list, sync] states = AllRooms => CarryOn, - assert request = { + assert request >= { "lists": { ALL_ROOMS: { "ranges": [[0, 99]], @@ -425,7 +392,7 @@ async fn test_sync_from_init_to_enjoy() -> Result<(), Error> { sync_then_assert_request_and_fake_response! { [server, room_list, sync] states = CarryOn => CarryOn, - assert request = { + assert request >= { "lists": { ALL_ROOMS: { "ranges": [[0, 149]], @@ -467,7 +434,7 @@ async fn test_sync_from_init_to_enjoy() -> Result<(), Error> { sync_then_assert_request_and_fake_response! { [server, room_list, sync] states = CarryOn => CarryOn, - assert request = { + assert request >= { "lists": { ALL_ROOMS: { "ranges": [[0, 199]], @@ -525,7 +492,7 @@ async fn test_sync_resumes_from_previous_state() -> Result<(), Error> { sync_then_assert_request_and_fake_response! { [server, room_list, sync] states = Init => FirstRooms, - assert request = { + assert request >= { "lists": { ALL_ROOMS: { "ranges": [[0, 19]], @@ -553,7 +520,7 @@ async fn test_sync_resumes_from_previous_state() -> Result<(), Error> { sync_then_assert_request_and_fake_response! { [server, room_list, sync] states = FirstRooms => AllRooms, - assert request = { + assert request >= { "lists": { ALL_ROOMS: { "ranges": [[0, 9]], @@ -595,7 +562,7 @@ async fn test_sync_resumes_from_previous_state() -> Result<(), Error> { sync_then_assert_request_and_fake_response! { [server, room_list, sync] states = AllRooms => CarryOn, - assert request = { + assert request >= { "lists": { ALL_ROOMS: { "ranges": [[0, 9]], @@ -644,7 +611,7 @@ async fn test_sync_resumes_from_terminated() -> Result<(), Error> { [server, room_list, sync] sync matches Some(Err(_)), states = Init => Terminated { .. }, - assert request = { + assert request >= { "lists": { ALL_ROOMS: { // The default range, in selective sync-mode. @@ -669,7 +636,7 @@ async fn test_sync_resumes_from_terminated() -> Result<(), Error> { sync_then_assert_request_and_fake_response! { [server, room_list, sync] states = Terminated { .. } => FirstRooms, - assert request = { + assert request >= { "lists": { ALL_ROOMS: { // Still the default range, in selective sync-mode. @@ -693,7 +660,7 @@ async fn test_sync_resumes_from_terminated() -> Result<(), Error> { [server, room_list, sync] sync matches Some(Err(_)), states = FirstRooms => Terminated { .. }, - assert request = { + assert request >= { "lists": { ALL_ROOMS: { // In `FirstRooms`, the sync-mode has changed to growing, with @@ -730,7 +697,7 @@ async fn test_sync_resumes_from_terminated() -> Result<(), Error> { sync_then_assert_request_and_fake_response! { [server, room_list, sync] states = Terminated { .. } => AllRooms, - assert request = { + assert request >= { "lists": { ALL_ROOMS: { // In `AllRooms`, the sync-mode is still growing, but the range @@ -766,7 +733,7 @@ async fn test_sync_resumes_from_terminated() -> Result<(), Error> { [server, room_list, sync] sync matches Some(Err(_)), states = AllRooms => Terminated { .. }, - assert request = { + assert request >= { "lists": { ALL_ROOMS: { // In `AllRooms`, the sync-mode is still growing, and the range @@ -800,7 +767,7 @@ async fn test_sync_resumes_from_terminated() -> Result<(), Error> { sync_then_assert_request_and_fake_response! { [server, room_list, sync] states = Terminated { .. } => CarryOn, - assert request = { + assert request >= { "lists": { ALL_ROOMS: { // Due to the error, the range is reset to its initial value. @@ -835,7 +802,7 @@ async fn test_sync_resumes_from_terminated() -> Result<(), Error> { sync_then_assert_request_and_fake_response! { [server, room_list, sync] states = CarryOn => CarryOn, - assert request = { + assert request >= { "lists": { ALL_ROOMS: { // No error. The range is making progress. @@ -867,7 +834,7 @@ async fn test_sync_resumes_from_terminated() -> Result<(), Error> { [server, room_list, sync] sync matches Some(Err(_)), states = CarryOn => Terminated { .. }, - assert request = { + assert request >= { "lists": { ALL_ROOMS: { // Range is making progress and is even reaching the maximum @@ -901,7 +868,7 @@ async fn test_sync_resumes_from_terminated() -> Result<(), Error> { sync_then_assert_request_and_fake_response! { [server, room_list, sync] states = Terminated { .. } => CarryOn, - assert request = { + assert request >= { "lists": { ALL_ROOMS: { // An error was received at the previous sync iteration. @@ -946,7 +913,7 @@ async fn test_entries_stream() -> Result<(), Error> { sync_then_assert_request_and_fake_response! { [server, room_list, sync] states = Init => FirstRooms, - assert request = { + assert request >= { "lists": { ALL_ROOMS: { "ranges": [[0, 19]], @@ -1004,7 +971,7 @@ async fn test_entries_stream() -> Result<(), Error> { sync_then_assert_request_and_fake_response! { [server, room_list, sync] states = FirstRooms => AllRooms, - assert request = { + assert request >= { "lists": { ALL_ROOMS: { "ranges": [[0, 9]], @@ -1079,7 +1046,7 @@ async fn test_entries_stream_with_updated_filter() -> Result<(), Error> { sync_then_assert_request_and_fake_response! { [server, room_list, sync] states = Init => FirstRooms, - assert request = { + assert request >= { "lists": { ALL_ROOMS: { "ranges": [[0, 19]], @@ -1133,7 +1100,7 @@ async fn test_entries_stream_with_updated_filter() -> Result<(), Error> { sync_then_assert_request_and_fake_response! { [server, room_list, sync] states = FirstRooms => AllRooms, - assert request = { + assert request >= { "lists": { ALL_ROOMS: { "ranges": [[0, 9]], @@ -1220,7 +1187,7 @@ async fn test_invites_stream() -> Result<(), Error> { sync_then_assert_request_and_fake_response! { [server, room_list, sync] states = Init => FirstRooms, - assert request = { + assert request >= { "lists": { ALL_ROOMS: { "ranges": [[0, 19]], @@ -1246,7 +1213,7 @@ async fn test_invites_stream() -> Result<(), Error> { sync_then_assert_request_and_fake_response! { [server, room_list, sync] states = FirstRooms => AllRooms, - assert request = { + assert request >= { "lists": { ALL_ROOMS: { "ranges": [[0, 0]], @@ -1306,7 +1273,7 @@ async fn test_invites_stream() -> Result<(), Error> { sync_then_assert_request_and_fake_response! { [server, room_list, sync] states = AllRooms => CarryOn, - assert request = { + assert request >= { "lists": { ALL_ROOMS: { "ranges": [[0, 0]], @@ -1375,7 +1342,7 @@ async fn test_room() -> Result<(), Error> { sync_then_assert_request_and_fake_response! { [server, room_list, sync] - assert request = {}, + assert request >= {}, respond with = { "pos": "0", "lists": { @@ -1415,7 +1382,7 @@ async fn test_room() -> Result<(), Error> { sync_then_assert_request_and_fake_response! { [server, room_list, sync] - assert request = {}, + assert request >= {}, respond with = { "pos": "1", "lists": { @@ -1475,7 +1442,7 @@ async fn test_room_subscription() -> Result<(), Error> { sync_then_assert_request_and_fake_response! { [server, room_list, sync] - assert request = { + assert request >= { "lists": { ALL_ROOMS: { "ranges": [[0, 19]], @@ -1533,7 +1500,7 @@ async fn test_room_subscription() -> Result<(), Error> { sync_then_assert_request_and_fake_response! { [server, room_list, sync] - assert request = { + assert request >= { "lists": { ALL_ROOMS: { "ranges": [[0, 2]], @@ -1565,7 +1532,7 @@ async fn test_room_subscription() -> Result<(), Error> { sync_then_assert_request_and_fake_response! { [server, room_list, sync] - assert request = { + assert request >= { "lists": { ALL_ROOMS: { "ranges": [[0, 2]], @@ -1594,7 +1561,7 @@ async fn test_room_unread_notifications() -> Result<(), Error> { sync_then_assert_request_and_fake_response! { [server, room_list, sync] - assert request = { + assert request >= { "lists": { ALL_ROOMS: { "ranges": [[0, 19]], @@ -1634,7 +1601,7 @@ async fn test_room_unread_notifications() -> Result<(), Error> { sync_then_assert_request_and_fake_response! { [server, room_list, sync] - assert request = { + assert request >= { "lists": { ALL_ROOMS: { "ranges": [[0, 0]], @@ -1681,7 +1648,7 @@ async fn test_room_timeline() -> Result<(), Error> { sync_then_assert_request_and_fake_response! { [server, room_list, sync] - assert request = {}, + assert request >= {}, respond with = { "pos": "0", "lists": { @@ -1715,7 +1682,7 @@ async fn test_room_timeline() -> Result<(), Error> { sync_then_assert_request_and_fake_response! { [server, room_list, sync] - assert request = {}, + assert request >= {}, respond with = { "pos": "0", "lists": {}, @@ -1765,7 +1732,7 @@ async fn test_room_latest_event() -> Result<(), Error> { sync_then_assert_request_and_fake_response! { [server, room_list, sync] - assert request = {}, + assert request >= {}, respond with = { "pos": "0", "lists": { @@ -1796,7 +1763,7 @@ async fn test_room_latest_event() -> Result<(), Error> { sync_then_assert_request_and_fake_response! { [server, room_list, sync] - assert request = {}, + assert request >= {}, respond with = { "pos": "0", "lists": {}, @@ -1820,7 +1787,7 @@ async fn test_room_latest_event() -> Result<(), Error> { sync_then_assert_request_and_fake_response! { [server, room_list, sync] - assert request = {}, + assert request >= {}, respond with = { "pos": "0", "lists": {}, @@ -1862,7 +1829,7 @@ async fn test_input_viewport() -> Result<(), Error> { sync_then_assert_request_and_fake_response! { [server, room_list, sync] states = Init => FirstRooms, - assert request = { + assert request >= { "lists": { ALL_ROOMS: { "ranges": [[0, 19]], @@ -1879,7 +1846,7 @@ async fn test_input_viewport() -> Result<(), Error> { sync_then_assert_request_and_fake_response! { [server, room_list, sync] states = FirstRooms => AllRooms, - assert request = { + assert request >= { "lists": { ALL_ROOMS: { "ranges": [[0, 49]], @@ -1902,17 +1869,17 @@ async fn test_input_viewport() -> Result<(), Error> { assert!(room_list.apply_input(Input::Viewport(vec![10..=15, 20..=25])).await.is_ok()); + // The `timeline_limit` is not repeated because it's sticky. sync_then_assert_request_and_fake_response! { [server, room_list, sync] states = AllRooms => CarryOn, - assert request = { + assert request >= { "lists": { ALL_ROOMS: { "ranges": [[0, 49]], }, VISIBLE_ROOMS: { "ranges": [[10, 15], [20, 25]], - "timeline_limit": 20, }, INVITES: { "ranges": [[0, 99]], diff --git a/crates/matrix-sdk-ui/tests/integration/sliding_sync.rs b/crates/matrix-sdk-ui/tests/integration/sliding_sync.rs new file mode 100644 index 00000000000..fd3c019bb1f --- /dev/null +++ b/crates/matrix-sdk-ui/tests/integration/sliding_sync.rs @@ -0,0 +1,99 @@ +//! Helpers for integration tests involving sliding sync. + +use wiremock::{http::Method, Match, Request}; + +pub(crate) struct SlidingSyncMatcher; + +#[derive(serde::Deserialize)] +pub(crate) struct PartialSlidingSyncRequest { + pub txn_id: Option, +} + +impl Match for SlidingSyncMatcher { + fn matches(&self, request: &Request) -> bool { + request.url.path() == "/_matrix/client/unstable/org.matrix.msc3575/sync" + && request.method == Method::Post + } +} + +/// Run a single sliding sync request, checking that the request is a subset of +/// what we expect it to be, and providing the given next response. +#[macro_export] +macro_rules! sliding_sync_then_assert_request_and_fake_response { + ( + [$server:ident, $stream:ident] + assert request $sign:tt { $( $request_json:tt )* }, + respond with = $( ( code $code:expr ) )? { $( $response_json:tt )* } + $(,)? + ) => { + sliding_sync_then_assert_request_and_fake_response! { + [$server, $stream] + sync matches Some(Ok(_)), + assert request $sign { $( $request_json )* }, + respond with = $( ( code $code ) )? { $( $response_json )* }, + } + }; + + ( + [$server:ident, $stream:ident] + sync matches $sync_result:pat, + assert request $sign:tt { $( $request_json:tt )* }, + respond with = $( ( code $code:expr ) )? { $( $response_json:tt )* } + $(,)? + ) => { + { + use $crate::sliding_sync::{SlidingSyncMatcher, PartialSlidingSyncRequest}; + use wiremock::{Mock, ResponseTemplate, Match as _, Request}; + use assert_matches::assert_matches; + use serde_json::json; + + let _code = 200; + $( let _code = $code; )? + + let _mock_guard = Mock::given(SlidingSyncMatcher) + .respond_with(move |request: &Request| { + let partial_request: PartialSlidingSyncRequest = request.body_json().unwrap(); + // Repeat the transaction id in the response, to validate sticky parameters. + ResponseTemplate::new(_code).set_body_json( + json!({ + "txn_id": partial_request.txn_id, + $( $response_json )* + }) + ) + }) + .mount_as_scoped(&$server) + .await; + + let next = $stream.next().await; + + assert_matches!(next, $sync_result, "sync's result"); + + for request in $server.received_requests().await.expect("Request recording has been disabled").iter().rev() { + if SlidingSyncMatcher.matches(request) { + let mut json_value = serde_json::from_slice::(&request.body).unwrap(); + + // Strip the transaction id, if present. + if let Some(root) = json_value.as_object_mut() { + root.remove("txn_id"); + } + + if let Err(error) = assert_json_diff::assert_json_matches_no_panic( + &json_value, + &json!({ $( $request_json )* }), + $crate::sliding_sync_then_assert_request_and_fake_response!(@assertion_config $sign) + ) { + dbg!(json_value); + panic!("{}", error); + } + + break; + } + } + + next + } + }; + + (@assertion_config >=) => { assert_json_diff::Config::new(assert_json_diff::CompareMode::Inclusive) }; + (@assertion_config =) => { assert_json_diff::Config::new(assert_json_diff::CompareMode::Strict) }; +} diff --git a/crates/matrix-sdk/src/sliding_sync/builder.rs b/crates/matrix-sdk/src/sliding_sync/builder.rs index 48c6b387f13..82faf5c294f 100644 --- a/crates/matrix-sdk/src/sliding_sync/builder.rs +++ b/crates/matrix-sdk/src/sliding_sync/builder.rs @@ -270,15 +270,10 @@ impl SlidingSyncBuilder { // auto-discovered by the client, if any. let sliding_sync_proxy = self.sliding_sync_proxy.or_else(|| client.sliding_sync_proxy()); - // Always enable to-device events and the e2ee-extension on the initial request, - // no matter what the caller wants. - let mut extensions = self.extensions.unwrap_or_default(); - extensions.to_device.enabled = Some(true); - extensions.e2ee.enabled = Some(true); - Ok(SlidingSync::new(SlidingSyncInner { - _id: Some(self.id), + id: self.id, sliding_sync_proxy, + client, storage_key: self.storage_key, @@ -294,7 +289,10 @@ impl SlidingSyncBuilder { }), sticky: StdRwLock::new(SlidingSyncStickyManager::new( - SlidingSyncStickyParameters::new(self.subscriptions, extensions), + SlidingSyncStickyParameters::new( + self.subscriptions, + self.extensions.unwrap_or_default(), + ), )), room_unsubscriptions: Default::default(), diff --git a/crates/matrix-sdk/src/sliding_sync/mod.rs b/crates/matrix-sdk/src/sliding_sync/mod.rs index bb8dc0ed52f..1b3e0166a09 100644 --- a/crates/matrix-sdk/src/sliding_sync/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/mod.rs @@ -85,7 +85,7 @@ pub(super) struct SlidingSyncInner { /// A unique identifier for this instance of sliding sync. /// /// Used to distinguish different connections to the sliding sync proxy. - _id: Option, + id: String, /// Customize the sliding sync proxy URL. sliding_sync_proxy: Option, @@ -410,6 +410,7 @@ impl SlidingSync { let timeout = Duration::from_secs(30); let mut request = assign!(v4::Request::new(), { + conn_id: Some(self.inner.id.clone()), pos, delta_token, timeout: Some(timeout), @@ -463,19 +464,26 @@ impl SlidingSync { // Sending the `/sync` request out when end-to-end encryption is enabled means // that we need to also send out any outgoing e2ee related request out // coming from the `OlmMachine::outgoing_requests()` method. + #[cfg(feature = "e2e-encryption")] let response = { - debug!("Sliding Sync is sending the request along with outgoing E2EE requests"); + if self.inner.sticky.read().unwrap().data().extensions.e2ee.enabled == Some(true) { + debug!("Sliding Sync is sending the request along with outgoing E2EE requests"); - let (e2ee_uploads, response) = - futures_util::future::join(self.inner.client.send_outgoing_requests(), request) - .await; + let (e2ee_uploads, response) = + futures_util::future::join(self.inner.client.send_outgoing_requests(), request) + .await; - if let Err(error) = e2ee_uploads { - error!(?error, "Error while sending outgoing E2EE requests"); - } + if let Err(error) = e2ee_uploads { + error!(?error, "Error while sending outgoing E2EE requests"); + } + + response + } else { + debug!("Sliding Sync is sending the request (e2ee not enabled in this instance)"); - response + request.await + } }?; // Send the request and get a response _without_ end-to-end encryption support. @@ -1091,20 +1099,28 @@ mod tests { let server = MockServer::start().await; let client = logged_in_client(Some(server.uri())).await; - let mut ss_builder = client.sliding_sync("test-slidingsync")?; - ss_builder = ss_builder.add_list(SlidingSyncList::builder("new_list")); - - let sync = ss_builder.build().await?; + let sync = client + .sliding_sync("test-slidingsync")? + .add_list(SlidingSyncList::builder("new_list")) + .build() + .await?; - // We get to-device and e2ee even without requesting it. - assert_eq!( - sync.inner.sticky.read().unwrap().data().extensions.to_device.enabled, - Some(true) - ); - assert_eq!(sync.inner.sticky.read().unwrap().data().extensions.e2ee.enabled, Some(true)); - // But what we didn't enable... isn't enabled. + // No extensions have been explicitly enabled here. + assert_eq!(sync.inner.sticky.read().unwrap().data().extensions.to_device.enabled, None,); + assert_eq!(sync.inner.sticky.read().unwrap().data().extensions.e2ee.enabled, None); assert_eq!(sync.inner.sticky.read().unwrap().data().extensions.account_data.enabled, None); + // Now enable e2ee and to-device. + let sync = client + .sliding_sync("test-slidingsync")? + .add_list(SlidingSyncList::builder("new_list")) + .with_to_device_extension( + assign!(v4::ToDeviceConfig::default(), { enabled: Some(true)}), + ) + .with_e2ee_extension(assign!(v4::E2EEConfig::default(), { enabled: Some(true)})) + .build() + .await?; + // Even without a since token, the first request will contain the extensions // configuration, at least. let txn_id = TransactionId::new(); @@ -1209,7 +1225,7 @@ mod tests { let _mock_guard = wiremock::Mock::given(SlidingSyncMatcher) .respond_with(|request: &wiremock::Request| { - // Repeat with the txn_id in the response, if set. + // Repeat the txn_id in the response, if set. let request: PartialRequest = request.body_json().unwrap(); wiremock::ResponseTemplate::new(200).set_body_json(json!({ "txn_id": request.txn_id,