From 30b7c4bd7f8e19cc453de363f0e31588947bd1e6 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Mon, 19 Jun 2023 09:19:45 +0200 Subject: [PATCH 1/2] fix(sdk): Remove the Sliding Sync retry mechanism on `M_UNKNOWN_POS`. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `SlidingSync::sync` had a legacy behaviour when a `M_UNKNOWN_POS` error was received from the server: It was resetting `pos` and sticky parameters before re-running a sync-loop iteration, hoping to get a valid response from the server after that. This retrying mechanism was running up to 3 times in row (represented by the `MAXIMUM_SLIDING_SYNC_SESSION_EXPIRATION` constant) before stopping the `sync` for real. While it seemed a good idea, it actually brings numerous problems: 1. Each iteration in the sync-loop generates a new request, thus making the `ranges` of the requests to move forwards. For `SlidingSyncList` that are in `Selective` sync-mode, there is no problem, but for `Growing` or `Paging` sync-modes, the `ranges` increased. Thus, when a `SlidingSync` session expires, instead of returning to the caller to do something clever (like what `RoomList` does: start again with a small range so that the “first” sync after a session expiration is guaranteed to be fast), it was running larger and larger requests, up to 3 times. 2. `M_UNKNOWN_POS` _is_ an error. Yes, `SlidingSync` must reset `pos` and must invalidate sticky parameters, but the `sync` must be stopped, and the error must be returned so that the caller can do something about it. Until now, this error was likely to be missed by the caller. 3. This legacy mechanism was forbidding `SlidingSync::sync`'s caller to do something with this special error. And since the caller was blind to this error, it disallowed more smart error management. This patch removes this legacy retry mechanism entirely. When `M_UNKNOWN_POS` is received from the server, `pos` is reset and sticky parameters are invalidated as before, but the error is returned like any other error. This patch renames and updates an existing test that was testing sticky parameters invalidation on `M_UNKNOWN_POS`, to include some assertions on `pos` and to ensure that the sync-loop is stopped accordingly. --- crates/matrix-sdk/src/sliding_sync/builder.rs | 2 - crates/matrix-sdk/src/sliding_sync/mod.rs | 83 ++++++------------- 2 files changed, 24 insertions(+), 61 deletions(-) diff --git a/crates/matrix-sdk/src/sliding_sync/builder.rs b/crates/matrix-sdk/src/sliding_sync/builder.rs index 82faf5c294f..0972cf50879 100644 --- a/crates/matrix-sdk/src/sliding_sync/builder.rs +++ b/crates/matrix-sdk/src/sliding_sync/builder.rs @@ -280,8 +280,6 @@ impl SlidingSyncBuilder { lists, rooms, - reset_counter: Default::default(), - position: StdRwLock::new(SlidingSyncPositionMarkers { pos: None, delta_token, diff --git a/crates/matrix-sdk/src/sliding_sync/mod.rs b/crates/matrix-sdk/src/sliding_sync/mod.rs index 1b3e0166a09..73a6efcd5d3 100644 --- a/crates/matrix-sdk/src/sliding_sync/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/mod.rs @@ -27,10 +27,7 @@ use std::{ collections::{BTreeMap, BTreeSet}, fmt::Debug, future::Future, - sync::{ - atomic::{AtomicU8, Ordering}, - Arc, RwLock as StdRwLock, - }, + sync::{Arc, RwLock as StdRwLock}, time::Duration, }; @@ -59,15 +56,6 @@ use url::Url; use self::sticky_parameters::{LazyTransactionId, SlidingSyncStickyManager, StickyData}; use crate::{config::RequestConfig, Client, Result}; -/// Number of times a Sliding Sync session can expire before raising an error. -/// -/// A Sliding Sync session can expire. In this case, it is reset. However, to -/// avoid entering an infinite loop of “it's expired, let's reset, it's expired, -/// let's reset…” (maybe if the network has an issue, or the server, or anything -/// else), we define a maximum times a session can expire before -/// raising a proper error. -const MAXIMUM_SLIDING_SYNC_SESSION_EXPIRATION: u8 = 3; - /// The Sliding Sync instance. /// /// It is OK to clone this type as much as you need: cloning it is cheap. @@ -111,9 +99,6 @@ pub(super) struct SlidingSyncInner { /// Rooms to unsubscribe, see [`Self::room_subscriptions`]. room_unsubscriptions: StdRwLock>, - /// Number of times a Sliding Sync session has been reset. - reset_counter: AtomicU8, - /// Internal channel used to pass messages between Sliding Sync and other /// types. internal_channel: Sender, @@ -590,35 +575,15 @@ impl SlidingSync { update_summary = self.sync_once().instrument(sync_span.clone()) => { match update_summary { Ok(updates) => { - self.inner.reset_counter.store(0, Ordering::SeqCst); - yield Ok(updates); } Err(error) => { if error.client_api_error_kind() == Some(&ErrorKind::UnknownPos) { - // The session has expired. - - // Has it expired too many times? - if self.inner.reset_counter.fetch_add(1, Ordering::SeqCst) - >= MAXIMUM_SLIDING_SYNC_SESSION_EXPIRATION - { - sync_span.in_scope(|| { - error!("Session expired {MAXIMUM_SLIDING_SYNC_SESSION_EXPIRATION} times in a row"); - }); - - // The session has expired too many times, let's raise an error! - yield Err(error); - - // Terminates the loop, and terminates the stream. - break; - } - - // Let's reset the Sliding Sync session. + // The Sliding Sync session has expired. Let's reset `pos` and sticky parameters. sync_span.in_scope(|| async { - warn!("Session expired. Restarting Sliding Sync."); + warn!("Session expired; resetting `pos` and sticky parameters"); - // To “restart” a Sliding Sync session, we set `pos` to its initial value, and uncommit the sticky parameters, so they're sent next time. { let mut position_lock = self.inner.position.write().unwrap(); position_lock.pos = None; @@ -628,15 +593,12 @@ impl SlidingSync { let _ = self.inner.sticky.write().unwrap().data_mut(); self.inner.lists.read().await.values().for_each(|list| list.invalidate_sticky_data()); - - debug!(?self.inner.position, "Sliding Sync has been reset"); }).await; - - continue; } yield Err(error); + // Terminates the loop, and terminates the stream. break; } } @@ -1190,7 +1152,7 @@ mod tests { } #[async_test] - async fn test_sticky_parameters_invalidated_by_reset() -> Result<()> { + async fn test_unknown_pos_resets_pos_and_sticky_parameters() -> Result<()> { let server = MockServer::start().await; let client = logged_in_client(Some(server.uri())).await; @@ -1208,26 +1170,19 @@ mod tests { let sync = sliding_sync.sync(); pin_mut!(sync); - #[derive(Clone)] - struct SlidingSyncMatcher; - - impl Match for SlidingSyncMatcher { - fn matches(&self, request: &wiremock::Request) -> bool { - request.url.path() == "/_matrix/client/unstable/org.matrix.msc3575/sync" - && request.method == wiremock::http::Method::Post - } - } + // `pos` is `None` to start with. + assert!(sliding_sync.inner.position.read().unwrap().pos.is_none()); #[derive(Deserialize)] struct PartialRequest { txn_id: Option, } - let _mock_guard = wiremock::Mock::given(SlidingSyncMatcher) - .respond_with(|request: &wiremock::Request| { + let _mock_guard = Mock::given(SlidingSyncMatcher) + .respond_with(|request: &Request| { // Repeat the txn_id in the response, if set. let request: PartialRequest = request.body_json().unwrap(); - wiremock::ResponseTemplate::new(200).set_body_json(json!({ + ResponseTemplate::new(200).set_body_json(json!({ "txn_id": request.txn_id, "pos": "0" })) @@ -1238,6 +1193,9 @@ mod tests { let next = sync.next().await; assert_matches!(next, Some(Ok(_update_summary))); + // `pos` has been updated. + assert_eq!(sliding_sync.inner.position.read().unwrap().pos, Some("0".to_owned())); + // Next request doesn't ask to enable the extension. let (request, _, _) = sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?; @@ -1250,9 +1208,9 @@ mod tests { drop(_mock_guard); // When responding with M_UNKNOWN_POS, that regenerates the sticky parameters, - // so they're reset. - let _mock_guard = wiremock::Mock::given(SlidingSyncMatcher) - .respond_with(wiremock::ResponseTemplate::new(400).set_body_json(json!({ + // so they're reset. It also resets the `pos`. + let _mock_guard = Mock::given(SlidingSyncMatcher) + .respond_with(ResponseTemplate::new(400).set_body_json(json!({ "error": "foo", "errcode": "M_UNKNOWN_POS", }))) @@ -1261,14 +1219,21 @@ mod tests { let next = sync.next().await; - // The request will retry a few times, then end in an error eventually. + // The expected error is returned. assert_matches!(next, Some(Err(err)) if err.client_api_error_kind() == Some(&ErrorKind::UnknownPos)); + // `pos` has been reset. + assert!(sliding_sync.inner.position.read().unwrap().pos.is_none()); + // Next request asks to enable the extension again. let (request, _, _) = sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?; + assert!(request.extensions.to_device.enabled.is_some()); + // `sync` has been stopped. + assert!(sync.next().await.is_none()); + Ok(()) } From 0c85b178acfe00c462c23da4b79bd761280eff51 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Mon, 19 Jun 2023 12:12:26 +0200 Subject: [PATCH 2/2] test(ui): Fix `Notification` test. --- .../tests/integration/notification.rs | 40 ++++++++----------- 1 file changed, 17 insertions(+), 23 deletions(-) diff --git a/crates/matrix-sdk-ui/tests/integration/notification.rs b/crates/matrix-sdk-ui/tests/integration/notification.rs index 7a2c4df11c4..bdf03129117 100644 --- a/crates/matrix-sdk-ui/tests/integration/notification.rs +++ b/crates/matrix-sdk-ui/tests/integration/notification.rs @@ -20,12 +20,12 @@ async fn test_smoke_test_notification_api() -> anyhow::Result<()> { "conn_id": "notifs", "extensions": { "e2ee": { - "enabled": true + "enabled": true, }, "to_device": { - "enabled": true - } - } + "enabled": true, + }, + }, }, respond with = { "pos": "0" @@ -44,9 +44,9 @@ async fn test_smoke_test_notification_api() -> anyhow::Result<()> { "pos": "1", "extensions": { "to_device": { - "next_batch": "nb0" - } - } + "next_batch": "nb0", + }, + }, }, }; @@ -59,39 +59,33 @@ async fn test_smoke_test_notification_api() -> anyhow::Result<()> { "conn_id": "notifs", "extensions": { "to_device": { - "since": "nb0" - } - } + "since": "nb0", + }, + }, }, respond with = { "pos": "2", "extensions": { "to_device": { - "next_batch": "nb1" - } - } + "next_batch": "nb1", + }, + }, }, }; // The to-device since token is passed from the previous request. // The extensions haven't changed, so they're not updated (sticky parameters - // ftw)... in the first request. Then, the sliding sync instance will retry - // those requests, so it will include them again; as a matter of fact, the - // last request that we assert against will contain those. + // ftw). sliding_sync_then_assert_request_and_fake_response! { [server, notification_stream] sync matches Some(Err(_)), assert request = { "conn_id": "notifs", "extensions": { - "e2ee": { - "enabled": true, - }, "to_device": { - "enabled": true, - "since": "nb1" - } - } + "since": "nb1", + }, + }, }, respond with = (code 400) { "error": "foo",