diff --git a/crates/matrix-sdk-ui/tests/integration/notification.rs b/crates/matrix-sdk-ui/tests/integration/notification.rs index 7a2c4df11c4..bdf03129117 100644 --- a/crates/matrix-sdk-ui/tests/integration/notification.rs +++ b/crates/matrix-sdk-ui/tests/integration/notification.rs @@ -20,12 +20,12 @@ async fn test_smoke_test_notification_api() -> anyhow::Result<()> { "conn_id": "notifs", "extensions": { "e2ee": { - "enabled": true + "enabled": true, }, "to_device": { - "enabled": true - } - } + "enabled": true, + }, + }, }, respond with = { "pos": "0" @@ -44,9 +44,9 @@ async fn test_smoke_test_notification_api() -> anyhow::Result<()> { "pos": "1", "extensions": { "to_device": { - "next_batch": "nb0" - } - } + "next_batch": "nb0", + }, + }, }, }; @@ -59,39 +59,33 @@ async fn test_smoke_test_notification_api() -> anyhow::Result<()> { "conn_id": "notifs", "extensions": { "to_device": { - "since": "nb0" - } - } + "since": "nb0", + }, + }, }, respond with = { "pos": "2", "extensions": { "to_device": { - "next_batch": "nb1" - } - } + "next_batch": "nb1", + }, + }, }, }; // The to-device since token is passed from the previous request. // The extensions haven't changed, so they're not updated (sticky parameters - // ftw)... in the first request. Then, the sliding sync instance will retry - // those requests, so it will include them again; as a matter of fact, the - // last request that we assert against will contain those. + // ftw). sliding_sync_then_assert_request_and_fake_response! { [server, notification_stream] sync matches Some(Err(_)), assert request = { "conn_id": "notifs", "extensions": { - "e2ee": { - "enabled": true, - }, "to_device": { - "enabled": true, - "since": "nb1" - } - } + "since": "nb1", + }, + }, }, respond with = (code 400) { "error": "foo", diff --git a/crates/matrix-sdk/src/sliding_sync/builder.rs b/crates/matrix-sdk/src/sliding_sync/builder.rs index 82faf5c294f..0972cf50879 100644 --- a/crates/matrix-sdk/src/sliding_sync/builder.rs +++ b/crates/matrix-sdk/src/sliding_sync/builder.rs @@ -280,8 +280,6 @@ impl SlidingSyncBuilder { lists, rooms, - reset_counter: Default::default(), - position: StdRwLock::new(SlidingSyncPositionMarkers { pos: None, delta_token, diff --git a/crates/matrix-sdk/src/sliding_sync/mod.rs b/crates/matrix-sdk/src/sliding_sync/mod.rs index 1b3e0166a09..73a6efcd5d3 100644 --- a/crates/matrix-sdk/src/sliding_sync/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/mod.rs @@ -27,10 +27,7 @@ use std::{ collections::{BTreeMap, BTreeSet}, fmt::Debug, future::Future, - sync::{ - atomic::{AtomicU8, Ordering}, - Arc, RwLock as StdRwLock, - }, + sync::{Arc, RwLock as StdRwLock}, time::Duration, }; @@ -59,15 +56,6 @@ use url::Url; use self::sticky_parameters::{LazyTransactionId, SlidingSyncStickyManager, StickyData}; use crate::{config::RequestConfig, Client, Result}; -/// Number of times a Sliding Sync session can expire before raising an error. -/// -/// A Sliding Sync session can expire. In this case, it is reset. However, to -/// avoid entering an infinite loop of “it's expired, let's reset, it's expired, -/// let's reset…” (maybe if the network has an issue, or the server, or anything -/// else), we define a maximum times a session can expire before -/// raising a proper error. -const MAXIMUM_SLIDING_SYNC_SESSION_EXPIRATION: u8 = 3; - /// The Sliding Sync instance. /// /// It is OK to clone this type as much as you need: cloning it is cheap. @@ -111,9 +99,6 @@ pub(super) struct SlidingSyncInner { /// Rooms to unsubscribe, see [`Self::room_subscriptions`]. room_unsubscriptions: StdRwLock>, - /// Number of times a Sliding Sync session has been reset. - reset_counter: AtomicU8, - /// Internal channel used to pass messages between Sliding Sync and other /// types. internal_channel: Sender, @@ -590,35 +575,15 @@ impl SlidingSync { update_summary = self.sync_once().instrument(sync_span.clone()) => { match update_summary { Ok(updates) => { - self.inner.reset_counter.store(0, Ordering::SeqCst); - yield Ok(updates); } Err(error) => { if error.client_api_error_kind() == Some(&ErrorKind::UnknownPos) { - // The session has expired. - - // Has it expired too many times? - if self.inner.reset_counter.fetch_add(1, Ordering::SeqCst) - >= MAXIMUM_SLIDING_SYNC_SESSION_EXPIRATION - { - sync_span.in_scope(|| { - error!("Session expired {MAXIMUM_SLIDING_SYNC_SESSION_EXPIRATION} times in a row"); - }); - - // The session has expired too many times, let's raise an error! - yield Err(error); - - // Terminates the loop, and terminates the stream. - break; - } - - // Let's reset the Sliding Sync session. + // The Sliding Sync session has expired. Let's reset `pos` and sticky parameters. sync_span.in_scope(|| async { - warn!("Session expired. Restarting Sliding Sync."); + warn!("Session expired; resetting `pos` and sticky parameters"); - // To “restart” a Sliding Sync session, we set `pos` to its initial value, and uncommit the sticky parameters, so they're sent next time. { let mut position_lock = self.inner.position.write().unwrap(); position_lock.pos = None; @@ -628,15 +593,12 @@ impl SlidingSync { let _ = self.inner.sticky.write().unwrap().data_mut(); self.inner.lists.read().await.values().for_each(|list| list.invalidate_sticky_data()); - - debug!(?self.inner.position, "Sliding Sync has been reset"); }).await; - - continue; } yield Err(error); + // Terminates the loop, and terminates the stream. break; } } @@ -1190,7 +1152,7 @@ mod tests { } #[async_test] - async fn test_sticky_parameters_invalidated_by_reset() -> Result<()> { + async fn test_unknown_pos_resets_pos_and_sticky_parameters() -> Result<()> { let server = MockServer::start().await; let client = logged_in_client(Some(server.uri())).await; @@ -1208,26 +1170,19 @@ mod tests { let sync = sliding_sync.sync(); pin_mut!(sync); - #[derive(Clone)] - struct SlidingSyncMatcher; - - impl Match for SlidingSyncMatcher { - fn matches(&self, request: &wiremock::Request) -> bool { - request.url.path() == "/_matrix/client/unstable/org.matrix.msc3575/sync" - && request.method == wiremock::http::Method::Post - } - } + // `pos` is `None` to start with. + assert!(sliding_sync.inner.position.read().unwrap().pos.is_none()); #[derive(Deserialize)] struct PartialRequest { txn_id: Option, } - let _mock_guard = wiremock::Mock::given(SlidingSyncMatcher) - .respond_with(|request: &wiremock::Request| { + let _mock_guard = Mock::given(SlidingSyncMatcher) + .respond_with(|request: &Request| { // Repeat the txn_id in the response, if set. let request: PartialRequest = request.body_json().unwrap(); - wiremock::ResponseTemplate::new(200).set_body_json(json!({ + ResponseTemplate::new(200).set_body_json(json!({ "txn_id": request.txn_id, "pos": "0" })) @@ -1238,6 +1193,9 @@ mod tests { let next = sync.next().await; assert_matches!(next, Some(Ok(_update_summary))); + // `pos` has been updated. + assert_eq!(sliding_sync.inner.position.read().unwrap().pos, Some("0".to_owned())); + // Next request doesn't ask to enable the extension. let (request, _, _) = sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?; @@ -1250,9 +1208,9 @@ mod tests { drop(_mock_guard); // When responding with M_UNKNOWN_POS, that regenerates the sticky parameters, - // so they're reset. - let _mock_guard = wiremock::Mock::given(SlidingSyncMatcher) - .respond_with(wiremock::ResponseTemplate::new(400).set_body_json(json!({ + // so they're reset. It also resets the `pos`. + let _mock_guard = Mock::given(SlidingSyncMatcher) + .respond_with(ResponseTemplate::new(400).set_body_json(json!({ "error": "foo", "errcode": "M_UNKNOWN_POS", }))) @@ -1261,14 +1219,21 @@ mod tests { let next = sync.next().await; - // The request will retry a few times, then end in an error eventually. + // The expected error is returned. assert_matches!(next, Some(Err(err)) if err.client_api_error_kind() == Some(&ErrorKind::UnknownPos)); + // `pos` has been reset. + assert!(sliding_sync.inner.position.read().unwrap().pos.is_none()); + // Next request asks to enable the extension again. let (request, _, _) = sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?; + assert!(request.extensions.to_device.enabled.is_some()); + // `sync` has been stopped. + assert!(sync.next().await.is_none()); + Ok(()) }