diff --git a/bindings/matrix-sdk-crypto-nodejs/src/machine.rs b/bindings/matrix-sdk-crypto-nodejs/src/machine.rs index b458d76cfdb..1d3d92807c9 100644 --- a/bindings/matrix-sdk-crypto-nodejs/src/machine.rs +++ b/bindings/matrix-sdk-crypto-nodejs/src/machine.rs @@ -152,7 +152,7 @@ impl OlmMachine { #[napi(constructor)] pub fn new() -> napi::Result { Err(napi::Error::from_reason( - "To build an `OldMachine`, please use the `initialize` method", + "To build an `OlmMachine`, please use the `initialize` method", )) } diff --git a/bindings/matrix-sdk-ffi/Cargo.toml b/bindings/matrix-sdk-ffi/Cargo.toml index c8369181589..4344e81c102 100644 --- a/bindings/matrix-sdk-ffi/Cargo.toml +++ b/bindings/matrix-sdk-ffi/Cargo.toml @@ -29,7 +29,7 @@ eyeball-im = { workspace = true } extension-trait = "1.0.1" futures-core = { workspace = true } futures-util = { workspace = true } -matrix-sdk-ui = { path = "../../crates/matrix-sdk-ui", default-features = false, features = ["e2e-encryption", "experimental-room-list"] } +matrix-sdk-ui = { path = "../../crates/matrix-sdk-ui", default-features = false, features = ["e2e-encryption", "experimental-room-list", "experimental-notification"] } mime = "0.3.16" # FIXME: we currently can't feature flag anything in the api.udl, therefore we must enforce experimental-sliding-sync being exposed here.. # see https://github.com/matrix-org/matrix-rust-sdk/issues/1014 diff --git a/bindings/matrix-sdk-ffi/src/error.rs b/bindings/matrix-sdk-ffi/src/error.rs index 0e9f802f3b2..a086f731691 100644 --- a/bindings/matrix-sdk-ffi/src/error.rs +++ b/bindings/matrix-sdk-ffi/src/error.rs @@ -1,6 +1,7 @@ use std::fmt::Display; use matrix_sdk::{self, encryption::CryptoStoreError, HttpError, IdParseError, StoreError}; +use matrix_sdk_ui::notifications; #[derive(Debug, thiserror::Error)] pub enum ClientError { @@ -68,6 +69,12 @@ impl From for ClientError { } } +impl From for ClientError { + fn from(e: notifications::Error) -> Self { + Self::new(e) + } +} + #[derive(Debug, thiserror::Error, uniffi::Error)] #[uniffi(flat_error)] pub enum RoomError { diff --git a/bindings/matrix-sdk-ffi/src/sliding_sync.rs b/bindings/matrix-sdk-ffi/src/sliding_sync.rs index 77c53c07563..5841b2d5a4f 100644 --- a/bindings/matrix-sdk-ffi/src/sliding_sync.rs +++ b/bindings/matrix-sdk-ffi/src/sliding_sync.rs @@ -15,7 +15,9 @@ use matrix_sdk::{ LoopCtrl, RoomListEntry as MatrixRoomEntry, SlidingSyncBuilder as MatrixSlidingSyncBuilder, SlidingSyncListLoadingState, SlidingSyncMode, }; -use matrix_sdk_ui::timeline::SlidingSyncRoomExt; +use matrix_sdk_ui::{ + notifications::NotificationSync as MatrixNotificationSync, timeline::SlidingSyncRoomExt, +}; use tracing::{error, warn}; use crate::{ @@ -869,6 +871,55 @@ impl SlidingSyncBuilder { } } +#[uniffi::export(callback_interface)] +pub trait NotificationSyncListener: Sync + Send { + /// Called whenever the notification sync loop terminates, and must be + /// restarted. + fn did_terminate(&self); +} + +/// Full context for the notification sync loop. +#[derive(uniffi::Object)] +pub struct NotificationSync { + /// Unused field, kept for its `Drop` semantics. + _handle: TaskHandle, +} + +impl NotificationSync { + fn start( + notification: MatrixNotificationSync, + listener: Box, + ) -> TaskHandle { + TaskHandle::new(RUNTIME.spawn(async move { + let stream = notification.sync(); + pin_mut!(stream); + + loop { + match stream.next().await { + Some(Ok(())) => { + // Yay. + } + + None => { + warn!("Notification sliding sync ended"); + break; + } + + Some(Err(err)) => { + // The internal sliding sync instance already handles retries for us, so if + // we get an error here, it means the maximum number of retries has been + // reached, and there's not much we can do anymore. + warn!("Error when handling notifications: {err}"); + break; + } + } + } + + listener.did_terminate(); + })) + } +} + #[uniffi::export] impl Client { /// Creates a new Sliding Sync instance with the given identifier. @@ -878,4 +929,16 @@ impl Client { let inner = self.inner.sliding_sync(id)?; Ok(Arc::new(SlidingSyncBuilder { inner, client: self.clone() })) } + + pub fn notification_sliding_sync( + &self, + id: String, + listener: Box, + ) -> Result, ClientError> { + RUNTIME.block_on(async move { + let inner = MatrixNotificationSync::new(id, self.inner.clone()).await?; + let handle = NotificationSync::start(inner, listener); + Ok(Arc::new(NotificationSync { _handle: handle })) + }) + } } diff --git a/crates/matrix-sdk-ui/Cargo.toml b/crates/matrix-sdk-ui/Cargo.toml index fe664e34d93..3ae90d905ca 100644 --- a/crates/matrix-sdk-ui/Cargo.toml +++ b/crates/matrix-sdk-ui/Cargo.toml @@ -4,7 +4,7 @@ version = "0.6.0" edition = "2021" [features] -default = ["e2e-encryption", "native-tls", "experimental-room-list"] +default = ["e2e-encryption", "native-tls", "experimental-room-list", "experimental-notification"] e2e-encryption = ["matrix-sdk/e2e-encryption"] @@ -12,6 +12,7 @@ native-tls = ["matrix-sdk/native-tls"] rustls-tls = ["matrix-sdk/rustls-tls"] experimental-room-list = ["experimental-sliding-sync", "dep:async-stream", "dep:eyeball-im-util"] +experimental-notification = ["experimental-sliding-sync", "dep:async-stream"] experimental-sliding-sync = ["matrix-sdk/experimental-sliding-sync"] testing = ["dep:eyeball-im-util"] diff --git a/crates/matrix-sdk-ui/src/lib.rs b/crates/matrix-sdk-ui/src/lib.rs index a08debc48f6..831489187c4 100644 --- a/crates/matrix-sdk-ui/src/lib.rs +++ b/crates/matrix-sdk-ui/src/lib.rs @@ -14,6 +14,8 @@ mod events; +#[cfg(feature = "experimental-notification")] +pub mod notifications; #[cfg(feature = "experimental-room-list")] pub mod room_list; pub mod timeline; diff --git a/crates/matrix-sdk-ui/src/notifications/mod.rs b/crates/matrix-sdk-ui/src/notifications/mod.rs new file mode 100644 index 00000000000..6ede78ad241 --- /dev/null +++ b/crates/matrix-sdk-ui/src/notifications/mod.rs @@ -0,0 +1,117 @@ +// Copyright 2023 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for that specific language governing permissions and +// limitations under the License. + +//! Notification API. +//! +//! The notification API is a high-level helper that is designed to take care of +//! handling the synchronization of notifications, be they received within the +//! app or within a dedicated notification process (e.g. the [NSE] process on +//! iOS devices). +//! +//! Under the hood, this uses a sliding sync instance configured with no lists, +//! but that enables the e2ee and to-device extensions, so that it can both +//! handle encryption and manage encryption keys; that's sufficient to decrypt +//! messages received in the notification processes. +//! +//! As this may be used across different processes, this also makes sure that +//! there's only one process writing to the databases holding encryption +//! information. TODO as of 2023-06-06, this hasn't been done yet. +//! +//! [NSE]: https://developer.apple.com/documentation/usernotifications/unnotificationserviceextension + +use async_stream::stream; +use futures_core::stream::Stream; +use futures_util::{pin_mut, StreamExt}; +use matrix_sdk::{Client, SlidingSync}; +use ruma::{api::client::sync::sync_events::v4, assign}; +use tracing::error; + +/// High-level helper for synchronizing notifications using sliding sync. +/// +/// See the module's documentation for more details. +#[derive(Clone)] +pub struct NotificationSync { + sliding_sync: SlidingSync, +} + +impl NotificationSync { + /// Creates a new instance of a `NotificationSync`. + /// + /// This will create and manage an instance of [`matrix_sdk::SlidingSync`]. + /// The `id` is used as the identifier of that instance, as such make + /// sure to not reuse a name used by another Sliding Sync instance, at + /// the risk of causing problems. + pub async fn new(id: impl Into, client: Client) -> Result { + let sliding_sync = client + .sliding_sync(id)? + .enable_caching()? + .with_to_device_extension( + assign!(v4::ToDeviceConfig::default(), { enabled: Some(true)}), + ) + .with_e2ee_extension(assign!(v4::E2EEConfig::default(), { enabled: Some(true)})) + .build() + .await?; + + Ok(Self { sliding_sync }) + } + + /// Start synchronization of notifications. + /// + /// This should be regularly polled, so as to ensure that the notifications + /// are sync'd. + pub fn sync(&self) -> impl Stream> + '_ { + stream!({ + let sync = self.sliding_sync.sync(); + + pin_mut!(sync); + + loop { + match sync.next().await { + Some(Ok(update_summary)) => { + // This API is only concerned with the e2ee and to-device extensions. + // Warn if anything weird has been received from the proxy. + if !update_summary.lists.is_empty() || !update_summary.rooms.is_empty() { + yield Err(Error::UnexpectedNonEmptyListsOrRooms); + } else { + // Cool cool, let's do it again. + yield Ok(()); + } + + continue; + } + + Some(Err(err)) => { + yield Err(err.into()); + + break; + } + + None => { + break; + } + } + } + }) + } +} + +/// Errors for the [`NotificationSync`]. +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("Unexpected rooms or lists in the sliding sync response.")] + UnexpectedNonEmptyListsOrRooms, + + #[error("Something wrong happened in sliding sync: {0:#}")] + SlidingSyncError(#[from] matrix_sdk::Error), +} diff --git a/crates/matrix-sdk-ui/src/room_list/mod.rs b/crates/matrix-sdk-ui/src/room_list/mod.rs index 6b7d9975af3..0e06c5ac229 100644 --- a/crates/matrix-sdk-ui/src/room_list/mod.rs +++ b/crates/matrix-sdk-ui/src/room_list/mod.rs @@ -77,7 +77,7 @@ use matrix_sdk::{ }; pub use room::*; use ruma::{ - api::client::sync::sync_events::v4::SyncRequestListFilters, + api::client::sync::sync_events::v4::{E2EEConfig, SyncRequestListFilters, ToDeviceConfig}, assign, events::{StateEventType, TimelineEventType}, OwnedRoomId, RoomId, @@ -104,6 +104,9 @@ impl RoomList { .enable_caching() .map_err(Error::SlidingSync)? .with_common_extensions() + // TODO different strategy when the encryption sync is in main by default + .with_e2ee_extension(assign!(E2EEConfig::default(), { enabled: Some(true) })) + .with_to_device_extension(assign!(ToDeviceConfig::default(), { enabled: Some(true) })) // TODO revert to `add_cached_list` when reloading rooms from the cache is blazingly // fast .add_list( diff --git a/crates/matrix-sdk-ui/tests/integration/main.rs b/crates/matrix-sdk-ui/tests/integration/main.rs index a507258ba60..4489ad5933a 100644 --- a/crates/matrix-sdk-ui/tests/integration/main.rs +++ b/crates/matrix-sdk-ui/tests/integration/main.rs @@ -21,8 +21,11 @@ use wiremock::{ Mock, MockServer, ResponseTemplate, }; +#[cfg(feature = "experimental-notification")] +mod notification; #[cfg(feature = "experimental-room-list")] mod room_list; +mod sliding_sync; mod timeline; #[cfg(all(test, not(target_arch = "wasm32")))] diff --git a/crates/matrix-sdk-ui/tests/integration/notification.rs b/crates/matrix-sdk-ui/tests/integration/notification.rs new file mode 100644 index 00000000000..7a2c4df11c4 --- /dev/null +++ b/crates/matrix-sdk-ui/tests/integration/notification.rs @@ -0,0 +1,130 @@ +use futures_util::{pin_mut, StreamExt as _}; +use matrix_sdk_test::async_test; +use matrix_sdk_ui::notifications::NotificationSync; + +use crate::{logged_in_client, sliding_sync_then_assert_request_and_fake_response}; + +#[async_test] +async fn test_smoke_test_notification_api() -> anyhow::Result<()> { + let (client, server) = logged_in_client().await; + + let notification_api = NotificationSync::new("notifs".to_owned(), client).await?; + + let notification_stream = notification_api.sync(); + pin_mut!(notification_stream); + + // Requests enable the e2ee and to_device extensions on the first run. + sliding_sync_then_assert_request_and_fake_response! { + [server, notification_stream] + assert request = { + "conn_id": "notifs", + "extensions": { + "e2ee": { + "enabled": true + }, + "to_device": { + "enabled": true + } + } + }, + respond with = { + "pos": "0" + }, + }; + + // The request then passes the `pos`ition marker to the next request, as usual + // in sliding sync. The extensions haven't changed, so they're not updated + // (sticky parameters ftw). + sliding_sync_then_assert_request_and_fake_response! { + [server, notification_stream] + assert request = { + "conn_id": "notifs", + }, + respond with = { + "pos": "1", + "extensions": { + "to_device": { + "next_batch": "nb0" + } + } + }, + }; + + // The to-device since token is passed from the previous request. + // The extensions haven't changed, so they're not updated (sticky parameters + // ftw). + sliding_sync_then_assert_request_and_fake_response! { + [server, notification_stream] + assert request = { + "conn_id": "notifs", + "extensions": { + "to_device": { + "since": "nb0" + } + } + }, + respond with = { + "pos": "2", + "extensions": { + "to_device": { + "next_batch": "nb1" + } + } + }, + }; + + // The to-device since token is passed from the previous request. + // The extensions haven't changed, so they're not updated (sticky parameters + // ftw)... in the first request. Then, the sliding sync instance will retry + // those requests, so it will include them again; as a matter of fact, the + // last request that we assert against will contain those. + sliding_sync_then_assert_request_and_fake_response! { + [server, notification_stream] + sync matches Some(Err(_)), + assert request = { + "conn_id": "notifs", + "extensions": { + "e2ee": { + "enabled": true, + }, + "to_device": { + "enabled": true, + "since": "nb1" + } + } + }, + respond with = (code 400) { + "error": "foo", + "errcode": "M_UNKNOWN_POS", + }, + }; + + // The notification stream will stop, as it ran into an error. + assert!(notification_stream.next().await.is_none()); + + // Start a new sync. + let notification_stream = notification_api.sync(); + pin_mut!(notification_stream); + + // The next request will contain sticky parameters again. + sliding_sync_then_assert_request_and_fake_response! { + [server, notification_stream] + assert request = { + "conn_id": "notifs", + "extensions": { + "e2ee": { + "enabled": true + }, + "to_device": { + "enabled": true, + "since": "nb1" + } + } + }, + respond with = { + "pos": "a" + }, + }; + + Ok(()) +} diff --git a/crates/matrix-sdk-ui/tests/integration/room_list.rs b/crates/matrix-sdk-ui/tests/integration/room_list.rs index b5f51b90ecb..4fc4347e2ce 100644 --- a/crates/matrix-sdk-ui/tests/integration/room_list.rs +++ b/crates/matrix-sdk-ui/tests/integration/room_list.rs @@ -21,7 +21,7 @@ use ruma::{ }; use serde_json::json; use stream_assert::{assert_next_eq, assert_pending}; -use wiremock::{http::Method, Match, Mock, MockServer, Request, ResponseTemplate}; +use wiremock::MockServer; use crate::{ logged_in_client, @@ -35,52 +35,34 @@ async fn new_room_list() -> Result<(MockServer, RoomList), Error> { Ok((server, room_list)) } -#[derive(Copy, Clone)] -struct SlidingSyncMatcher; - -impl Match for SlidingSyncMatcher { - fn matches(&self, request: &Request) -> bool { - request.url.path() == "/_matrix/client/unstable/org.matrix.msc3575/sync" - && request.method == Method::Post - } -} - +// Same macro as in the main, with additional checking that the state +// before/after the sync loop match those we expect. macro_rules! sync_then_assert_request_and_fake_response { ( - [$server:ident, $room_list:ident, $room_list_sync_stream:ident] + [$server:ident, $room_list:ident, $stream:ident] $( states = $pre_state:pat => $post_state:pat, )? - assert request = { $( $request_json:tt )* }, + assert request >= { $( $request_json:tt )* }, respond with = $( ( code $code:expr ) )? { $( $response_json:tt )* } $(,)? ) => { sync_then_assert_request_and_fake_response! { - [$server, $room_list, $room_list_sync_stream] + [$server, $room_list, $stream] sync matches Some(Ok(_)), $( states = $pre_state => $post_state, )? - assert request = { $( $request_json )* }, + assert request >= { $( $request_json )* }, respond with = $( ( code $code ) )? { $( $response_json )* }, } }; ( - [$server:ident, $room_list:ident, $room_list_sync_stream:ident] + [$server:ident, $room_list:ident, $stream:ident] sync matches $sync_result:pat, $( states = $pre_state:pat => $post_state:pat, )? - assert request = { $( $request_json:tt )* }, + assert request >= { $( $request_json:tt )* }, respond with = $( ( code $code:expr ) )? { $( $response_json:tt )* } $(,)? ) => { { - let _code = 200; - $( let _code = $code; )? - - let _mock_guard = Mock::given(SlidingSyncMatcher) - .respond_with(ResponseTemplate::new(_code).set_body_json( - json!({ $( $response_json )* }) - )) - .mount_as_scoped(&$server) - .await; - $( use State::*; @@ -89,26 +71,12 @@ macro_rules! sync_then_assert_request_and_fake_response { assert_matches!(state.get(), $pre_state, "pre state"); )? - let next = $room_list_sync_stream.next().await; - - assert_matches!(next, $sync_result, "sync's result"); - - for request in $server.received_requests().await.expect("Request recording has been disabled").iter().rev() { - if SlidingSyncMatcher.matches(request) { - let json_value = serde_json::from_slice::(&request.body).unwrap(); - - if let Err(error) = assert_json_diff::assert_json_matches_no_panic( - &json_value, - &json!({ $( $request_json )* }), - assert_json_diff::Config::new(assert_json_diff::CompareMode::Inclusive), - ) { - dbg!(json_value); - panic!("{}", error); - } - - break; - } - } + let next = super::sliding_sync_then_assert_request_and_fake_response! { + [$server, $stream] + sync matches $sync_result, + assert request >= { $( $request_json )* }, + respond with = $( ( code $code ) )? { $( $response_json )* }, + }; $( assert_matches!(state.next().now_or_never(), Some(Some($post_state)), "post state"); )? @@ -247,7 +215,7 @@ async fn test_sync_from_init_to_enjoy() -> Result<(), Error> { sync_then_assert_request_and_fake_response! { [server, room_list, sync] states = Init => FirstRooms, - assert request = { + assert request >= { "lists": { ALL_ROOMS: { "ranges": [ @@ -310,13 +278,12 @@ async fn test_sync_from_init_to_enjoy() -> Result<(), Error> { sync_then_assert_request_and_fake_response! { [server, room_list, sync] states = FirstRooms => AllRooms, - assert request = { + assert request >= { "lists": { ALL_ROOMS: { "ranges": [ [0, 49], ], - "timeline_limit": 1, }, VISIBLE_ROOMS: { "ranges": [[0, 19]], @@ -383,7 +350,7 @@ async fn test_sync_from_init_to_enjoy() -> Result<(), Error> { sync_then_assert_request_and_fake_response! { [server, room_list, sync] states = AllRooms => CarryOn, - assert request = { + assert request >= { "lists": { ALL_ROOMS: { "ranges": [[0, 99]], @@ -425,7 +392,7 @@ async fn test_sync_from_init_to_enjoy() -> Result<(), Error> { sync_then_assert_request_and_fake_response! { [server, room_list, sync] states = CarryOn => CarryOn, - assert request = { + assert request >= { "lists": { ALL_ROOMS: { "ranges": [[0, 149]], @@ -467,7 +434,7 @@ async fn test_sync_from_init_to_enjoy() -> Result<(), Error> { sync_then_assert_request_and_fake_response! { [server, room_list, sync] states = CarryOn => CarryOn, - assert request = { + assert request >= { "lists": { ALL_ROOMS: { "ranges": [[0, 199]], @@ -525,7 +492,7 @@ async fn test_sync_resumes_from_previous_state() -> Result<(), Error> { sync_then_assert_request_and_fake_response! { [server, room_list, sync] states = Init => FirstRooms, - assert request = { + assert request >= { "lists": { ALL_ROOMS: { "ranges": [[0, 19]], @@ -553,7 +520,7 @@ async fn test_sync_resumes_from_previous_state() -> Result<(), Error> { sync_then_assert_request_and_fake_response! { [server, room_list, sync] states = FirstRooms => AllRooms, - assert request = { + assert request >= { "lists": { ALL_ROOMS: { "ranges": [[0, 9]], @@ -595,7 +562,7 @@ async fn test_sync_resumes_from_previous_state() -> Result<(), Error> { sync_then_assert_request_and_fake_response! { [server, room_list, sync] states = AllRooms => CarryOn, - assert request = { + assert request >= { "lists": { ALL_ROOMS: { "ranges": [[0, 9]], @@ -644,7 +611,7 @@ async fn test_sync_resumes_from_terminated() -> Result<(), Error> { [server, room_list, sync] sync matches Some(Err(_)), states = Init => Terminated { .. }, - assert request = { + assert request >= { "lists": { ALL_ROOMS: { // The default range, in selective sync-mode. @@ -669,7 +636,7 @@ async fn test_sync_resumes_from_terminated() -> Result<(), Error> { sync_then_assert_request_and_fake_response! { [server, room_list, sync] states = Terminated { .. } => FirstRooms, - assert request = { + assert request >= { "lists": { ALL_ROOMS: { // Still the default range, in selective sync-mode. @@ -693,7 +660,7 @@ async fn test_sync_resumes_from_terminated() -> Result<(), Error> { [server, room_list, sync] sync matches Some(Err(_)), states = FirstRooms => Terminated { .. }, - assert request = { + assert request >= { "lists": { ALL_ROOMS: { // In `FirstRooms`, the sync-mode has changed to growing, with @@ -730,7 +697,7 @@ async fn test_sync_resumes_from_terminated() -> Result<(), Error> { sync_then_assert_request_and_fake_response! { [server, room_list, sync] states = Terminated { .. } => AllRooms, - assert request = { + assert request >= { "lists": { ALL_ROOMS: { // In `AllRooms`, the sync-mode is still growing, but the range @@ -766,7 +733,7 @@ async fn test_sync_resumes_from_terminated() -> Result<(), Error> { [server, room_list, sync] sync matches Some(Err(_)), states = AllRooms => Terminated { .. }, - assert request = { + assert request >= { "lists": { ALL_ROOMS: { // In `AllRooms`, the sync-mode is still growing, and the range @@ -800,7 +767,7 @@ async fn test_sync_resumes_from_terminated() -> Result<(), Error> { sync_then_assert_request_and_fake_response! { [server, room_list, sync] states = Terminated { .. } => CarryOn, - assert request = { + assert request >= { "lists": { ALL_ROOMS: { // Due to the error, the range is reset to its initial value. @@ -835,7 +802,7 @@ async fn test_sync_resumes_from_terminated() -> Result<(), Error> { sync_then_assert_request_and_fake_response! { [server, room_list, sync] states = CarryOn => CarryOn, - assert request = { + assert request >= { "lists": { ALL_ROOMS: { // No error. The range is making progress. @@ -867,7 +834,7 @@ async fn test_sync_resumes_from_terminated() -> Result<(), Error> { [server, room_list, sync] sync matches Some(Err(_)), states = CarryOn => Terminated { .. }, - assert request = { + assert request >= { "lists": { ALL_ROOMS: { // Range is making progress and is even reaching the maximum @@ -901,7 +868,7 @@ async fn test_sync_resumes_from_terminated() -> Result<(), Error> { sync_then_assert_request_and_fake_response! { [server, room_list, sync] states = Terminated { .. } => CarryOn, - assert request = { + assert request >= { "lists": { ALL_ROOMS: { // An error was received at the previous sync iteration. @@ -946,7 +913,7 @@ async fn test_entries_stream() -> Result<(), Error> { sync_then_assert_request_and_fake_response! { [server, room_list, sync] states = Init => FirstRooms, - assert request = { + assert request >= { "lists": { ALL_ROOMS: { "ranges": [[0, 19]], @@ -1004,7 +971,7 @@ async fn test_entries_stream() -> Result<(), Error> { sync_then_assert_request_and_fake_response! { [server, room_list, sync] states = FirstRooms => AllRooms, - assert request = { + assert request >= { "lists": { ALL_ROOMS: { "ranges": [[0, 9]], @@ -1079,7 +1046,7 @@ async fn test_entries_stream_with_updated_filter() -> Result<(), Error> { sync_then_assert_request_and_fake_response! { [server, room_list, sync] states = Init => FirstRooms, - assert request = { + assert request >= { "lists": { ALL_ROOMS: { "ranges": [[0, 19]], @@ -1133,7 +1100,7 @@ async fn test_entries_stream_with_updated_filter() -> Result<(), Error> { sync_then_assert_request_and_fake_response! { [server, room_list, sync] states = FirstRooms => AllRooms, - assert request = { + assert request >= { "lists": { ALL_ROOMS: { "ranges": [[0, 9]], @@ -1220,7 +1187,7 @@ async fn test_invites_stream() -> Result<(), Error> { sync_then_assert_request_and_fake_response! { [server, room_list, sync] states = Init => FirstRooms, - assert request = { + assert request >= { "lists": { ALL_ROOMS: { "ranges": [[0, 19]], @@ -1246,7 +1213,7 @@ async fn test_invites_stream() -> Result<(), Error> { sync_then_assert_request_and_fake_response! { [server, room_list, sync] states = FirstRooms => AllRooms, - assert request = { + assert request >= { "lists": { ALL_ROOMS: { "ranges": [[0, 0]], @@ -1306,7 +1273,7 @@ async fn test_invites_stream() -> Result<(), Error> { sync_then_assert_request_and_fake_response! { [server, room_list, sync] states = AllRooms => CarryOn, - assert request = { + assert request >= { "lists": { ALL_ROOMS: { "ranges": [[0, 0]], @@ -1375,7 +1342,7 @@ async fn test_room() -> Result<(), Error> { sync_then_assert_request_and_fake_response! { [server, room_list, sync] - assert request = {}, + assert request >= {}, respond with = { "pos": "0", "lists": { @@ -1415,7 +1382,7 @@ async fn test_room() -> Result<(), Error> { sync_then_assert_request_and_fake_response! { [server, room_list, sync] - assert request = {}, + assert request >= {}, respond with = { "pos": "1", "lists": { @@ -1475,7 +1442,7 @@ async fn test_room_subscription() -> Result<(), Error> { sync_then_assert_request_and_fake_response! { [server, room_list, sync] - assert request = { + assert request >= { "lists": { ALL_ROOMS: { "ranges": [[0, 19]], @@ -1533,7 +1500,7 @@ async fn test_room_subscription() -> Result<(), Error> { sync_then_assert_request_and_fake_response! { [server, room_list, sync] - assert request = { + assert request >= { "lists": { ALL_ROOMS: { "ranges": [[0, 2]], @@ -1565,7 +1532,7 @@ async fn test_room_subscription() -> Result<(), Error> { sync_then_assert_request_and_fake_response! { [server, room_list, sync] - assert request = { + assert request >= { "lists": { ALL_ROOMS: { "ranges": [[0, 2]], @@ -1594,7 +1561,7 @@ async fn test_room_unread_notifications() -> Result<(), Error> { sync_then_assert_request_and_fake_response! { [server, room_list, sync] - assert request = { + assert request >= { "lists": { ALL_ROOMS: { "ranges": [[0, 19]], @@ -1634,7 +1601,7 @@ async fn test_room_unread_notifications() -> Result<(), Error> { sync_then_assert_request_and_fake_response! { [server, room_list, sync] - assert request = { + assert request >= { "lists": { ALL_ROOMS: { "ranges": [[0, 0]], @@ -1681,7 +1648,7 @@ async fn test_room_timeline() -> Result<(), Error> { sync_then_assert_request_and_fake_response! { [server, room_list, sync] - assert request = {}, + assert request >= {}, respond with = { "pos": "0", "lists": { @@ -1715,7 +1682,7 @@ async fn test_room_timeline() -> Result<(), Error> { sync_then_assert_request_and_fake_response! { [server, room_list, sync] - assert request = {}, + assert request >= {}, respond with = { "pos": "0", "lists": {}, @@ -1765,7 +1732,7 @@ async fn test_room_latest_event() -> Result<(), Error> { sync_then_assert_request_and_fake_response! { [server, room_list, sync] - assert request = {}, + assert request >= {}, respond with = { "pos": "0", "lists": { @@ -1796,7 +1763,7 @@ async fn test_room_latest_event() -> Result<(), Error> { sync_then_assert_request_and_fake_response! { [server, room_list, sync] - assert request = {}, + assert request >= {}, respond with = { "pos": "0", "lists": {}, @@ -1820,7 +1787,7 @@ async fn test_room_latest_event() -> Result<(), Error> { sync_then_assert_request_and_fake_response! { [server, room_list, sync] - assert request = {}, + assert request >= {}, respond with = { "pos": "0", "lists": {}, @@ -1862,7 +1829,7 @@ async fn test_input_viewport() -> Result<(), Error> { sync_then_assert_request_and_fake_response! { [server, room_list, sync] states = Init => FirstRooms, - assert request = { + assert request >= { "lists": { ALL_ROOMS: { "ranges": [[0, 19]], @@ -1879,7 +1846,7 @@ async fn test_input_viewport() -> Result<(), Error> { sync_then_assert_request_and_fake_response! { [server, room_list, sync] states = FirstRooms => AllRooms, - assert request = { + assert request >= { "lists": { ALL_ROOMS: { "ranges": [[0, 49]], @@ -1902,17 +1869,17 @@ async fn test_input_viewport() -> Result<(), Error> { assert!(room_list.apply_input(Input::Viewport(vec![10..=15, 20..=25])).await.is_ok()); + // The `timeline_limit` is not repeated because it's sticky. sync_then_assert_request_and_fake_response! { [server, room_list, sync] states = AllRooms => CarryOn, - assert request = { + assert request >= { "lists": { ALL_ROOMS: { "ranges": [[0, 49]], }, VISIBLE_ROOMS: { "ranges": [[10, 15], [20, 25]], - "timeline_limit": 20, }, INVITES: { "ranges": [[0, 99]], diff --git a/crates/matrix-sdk-ui/tests/integration/sliding_sync.rs b/crates/matrix-sdk-ui/tests/integration/sliding_sync.rs new file mode 100644 index 00000000000..fd3c019bb1f --- /dev/null +++ b/crates/matrix-sdk-ui/tests/integration/sliding_sync.rs @@ -0,0 +1,99 @@ +//! Helpers for integration tests involving sliding sync. + +use wiremock::{http::Method, Match, Request}; + +pub(crate) struct SlidingSyncMatcher; + +#[derive(serde::Deserialize)] +pub(crate) struct PartialSlidingSyncRequest { + pub txn_id: Option, +} + +impl Match for SlidingSyncMatcher { + fn matches(&self, request: &Request) -> bool { + request.url.path() == "/_matrix/client/unstable/org.matrix.msc3575/sync" + && request.method == Method::Post + } +} + +/// Run a single sliding sync request, checking that the request is a subset of +/// what we expect it to be, and providing the given next response. +#[macro_export] +macro_rules! sliding_sync_then_assert_request_and_fake_response { + ( + [$server:ident, $stream:ident] + assert request $sign:tt { $( $request_json:tt )* }, + respond with = $( ( code $code:expr ) )? { $( $response_json:tt )* } + $(,)? + ) => { + sliding_sync_then_assert_request_and_fake_response! { + [$server, $stream] + sync matches Some(Ok(_)), + assert request $sign { $( $request_json )* }, + respond with = $( ( code $code ) )? { $( $response_json )* }, + } + }; + + ( + [$server:ident, $stream:ident] + sync matches $sync_result:pat, + assert request $sign:tt { $( $request_json:tt )* }, + respond with = $( ( code $code:expr ) )? { $( $response_json:tt )* } + $(,)? + ) => { + { + use $crate::sliding_sync::{SlidingSyncMatcher, PartialSlidingSyncRequest}; + use wiremock::{Mock, ResponseTemplate, Match as _, Request}; + use assert_matches::assert_matches; + use serde_json::json; + + let _code = 200; + $( let _code = $code; )? + + let _mock_guard = Mock::given(SlidingSyncMatcher) + .respond_with(move |request: &Request| { + let partial_request: PartialSlidingSyncRequest = request.body_json().unwrap(); + // Repeat the transaction id in the response, to validate sticky parameters. + ResponseTemplate::new(_code).set_body_json( + json!({ + "txn_id": partial_request.txn_id, + $( $response_json )* + }) + ) + }) + .mount_as_scoped(&$server) + .await; + + let next = $stream.next().await; + + assert_matches!(next, $sync_result, "sync's result"); + + for request in $server.received_requests().await.expect("Request recording has been disabled").iter().rev() { + if SlidingSyncMatcher.matches(request) { + let mut json_value = serde_json::from_slice::(&request.body).unwrap(); + + // Strip the transaction id, if present. + if let Some(root) = json_value.as_object_mut() { + root.remove("txn_id"); + } + + if let Err(error) = assert_json_diff::assert_json_matches_no_panic( + &json_value, + &json!({ $( $request_json )* }), + $crate::sliding_sync_then_assert_request_and_fake_response!(@assertion_config $sign) + ) { + dbg!(json_value); + panic!("{}", error); + } + + break; + } + } + + next + } + }; + + (@assertion_config >=) => { assert_json_diff::Config::new(assert_json_diff::CompareMode::Inclusive) }; + (@assertion_config =) => { assert_json_diff::Config::new(assert_json_diff::CompareMode::Strict) }; +} diff --git a/crates/matrix-sdk/src/sliding_sync/builder.rs b/crates/matrix-sdk/src/sliding_sync/builder.rs index 48c6b387f13..82faf5c294f 100644 --- a/crates/matrix-sdk/src/sliding_sync/builder.rs +++ b/crates/matrix-sdk/src/sliding_sync/builder.rs @@ -270,15 +270,10 @@ impl SlidingSyncBuilder { // auto-discovered by the client, if any. let sliding_sync_proxy = self.sliding_sync_proxy.or_else(|| client.sliding_sync_proxy()); - // Always enable to-device events and the e2ee-extension on the initial request, - // no matter what the caller wants. - let mut extensions = self.extensions.unwrap_or_default(); - extensions.to_device.enabled = Some(true); - extensions.e2ee.enabled = Some(true); - Ok(SlidingSync::new(SlidingSyncInner { - _id: Some(self.id), + id: self.id, sliding_sync_proxy, + client, storage_key: self.storage_key, @@ -294,7 +289,10 @@ impl SlidingSyncBuilder { }), sticky: StdRwLock::new(SlidingSyncStickyManager::new( - SlidingSyncStickyParameters::new(self.subscriptions, extensions), + SlidingSyncStickyParameters::new( + self.subscriptions, + self.extensions.unwrap_or_default(), + ), )), room_unsubscriptions: Default::default(), diff --git a/crates/matrix-sdk/src/sliding_sync/mod.rs b/crates/matrix-sdk/src/sliding_sync/mod.rs index bb8dc0ed52f..1b3e0166a09 100644 --- a/crates/matrix-sdk/src/sliding_sync/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/mod.rs @@ -85,7 +85,7 @@ pub(super) struct SlidingSyncInner { /// A unique identifier for this instance of sliding sync. /// /// Used to distinguish different connections to the sliding sync proxy. - _id: Option, + id: String, /// Customize the sliding sync proxy URL. sliding_sync_proxy: Option, @@ -410,6 +410,7 @@ impl SlidingSync { let timeout = Duration::from_secs(30); let mut request = assign!(v4::Request::new(), { + conn_id: Some(self.inner.id.clone()), pos, delta_token, timeout: Some(timeout), @@ -463,19 +464,26 @@ impl SlidingSync { // Sending the `/sync` request out when end-to-end encryption is enabled means // that we need to also send out any outgoing e2ee related request out // coming from the `OlmMachine::outgoing_requests()` method. + #[cfg(feature = "e2e-encryption")] let response = { - debug!("Sliding Sync is sending the request along with outgoing E2EE requests"); + if self.inner.sticky.read().unwrap().data().extensions.e2ee.enabled == Some(true) { + debug!("Sliding Sync is sending the request along with outgoing E2EE requests"); - let (e2ee_uploads, response) = - futures_util::future::join(self.inner.client.send_outgoing_requests(), request) - .await; + let (e2ee_uploads, response) = + futures_util::future::join(self.inner.client.send_outgoing_requests(), request) + .await; - if let Err(error) = e2ee_uploads { - error!(?error, "Error while sending outgoing E2EE requests"); - } + if let Err(error) = e2ee_uploads { + error!(?error, "Error while sending outgoing E2EE requests"); + } + + response + } else { + debug!("Sliding Sync is sending the request (e2ee not enabled in this instance)"); - response + request.await + } }?; // Send the request and get a response _without_ end-to-end encryption support. @@ -1091,20 +1099,28 @@ mod tests { let server = MockServer::start().await; let client = logged_in_client(Some(server.uri())).await; - let mut ss_builder = client.sliding_sync("test-slidingsync")?; - ss_builder = ss_builder.add_list(SlidingSyncList::builder("new_list")); - - let sync = ss_builder.build().await?; + let sync = client + .sliding_sync("test-slidingsync")? + .add_list(SlidingSyncList::builder("new_list")) + .build() + .await?; - // We get to-device and e2ee even without requesting it. - assert_eq!( - sync.inner.sticky.read().unwrap().data().extensions.to_device.enabled, - Some(true) - ); - assert_eq!(sync.inner.sticky.read().unwrap().data().extensions.e2ee.enabled, Some(true)); - // But what we didn't enable... isn't enabled. + // No extensions have been explicitly enabled here. + assert_eq!(sync.inner.sticky.read().unwrap().data().extensions.to_device.enabled, None,); + assert_eq!(sync.inner.sticky.read().unwrap().data().extensions.e2ee.enabled, None); assert_eq!(sync.inner.sticky.read().unwrap().data().extensions.account_data.enabled, None); + // Now enable e2ee and to-device. + let sync = client + .sliding_sync("test-slidingsync")? + .add_list(SlidingSyncList::builder("new_list")) + .with_to_device_extension( + assign!(v4::ToDeviceConfig::default(), { enabled: Some(true)}), + ) + .with_e2ee_extension(assign!(v4::E2EEConfig::default(), { enabled: Some(true)})) + .build() + .await?; + // Even without a since token, the first request will contain the extensions // configuration, at least. let txn_id = TransactionId::new(); @@ -1209,7 +1225,7 @@ mod tests { let _mock_guard = wiremock::Mock::given(SlidingSyncMatcher) .respond_with(|request: &wiremock::Request| { - // Repeat with the txn_id in the response, if set. + // Repeat the txn_id in the response, if set. let request: PartialRequest = request.body_json().unwrap(); wiremock::ResponseTemplate::new(200).set_body_json(json!({ "txn_id": request.txn_id,