Skip to content

Commit

Permalink
fix(sdk): Remove the Sliding Sync retry mechanism on M_UNKNOWN_POS
Browse files Browse the repository at this point in the history
fix(sdk): Remove the Sliding Sync retry mechanism on `M_UNKNOWN_POS`
  • Loading branch information
Hywan authored Jun 19, 2023
2 parents 67ade0a + 0c85b17 commit d105b72
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 84 deletions.
40 changes: 17 additions & 23 deletions crates/matrix-sdk-ui/tests/integration/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ async fn test_smoke_test_notification_api() -> anyhow::Result<()> {
"conn_id": "notifs",
"extensions": {
"e2ee": {
"enabled": true
"enabled": true,
},
"to_device": {
"enabled": true
}
}
"enabled": true,
},
},
},
respond with = {
"pos": "0"
Expand All @@ -44,9 +44,9 @@ async fn test_smoke_test_notification_api() -> anyhow::Result<()> {
"pos": "1",
"extensions": {
"to_device": {
"next_batch": "nb0"
}
}
"next_batch": "nb0",
},
},
},
};

Expand All @@ -59,39 +59,33 @@ async fn test_smoke_test_notification_api() -> anyhow::Result<()> {
"conn_id": "notifs",
"extensions": {
"to_device": {
"since": "nb0"
}
}
"since": "nb0",
},
},
},
respond with = {
"pos": "2",
"extensions": {
"to_device": {
"next_batch": "nb1"
}
}
"next_batch": "nb1",
},
},
},
};

// The to-device since token is passed from the previous request.
// The extensions haven't changed, so they're not updated (sticky parameters
// ftw)... in the first request. Then, the sliding sync instance will retry
// those requests, so it will include them again; as a matter of fact, the
// last request that we assert against will contain those.
// ftw).
sliding_sync_then_assert_request_and_fake_response! {
[server, notification_stream]
sync matches Some(Err(_)),
assert request = {
"conn_id": "notifs",
"extensions": {
"e2ee": {
"enabled": true,
},
"to_device": {
"enabled": true,
"since": "nb1"
}
}
"since": "nb1",
},
},
},
respond with = (code 400) {
"error": "foo",
Expand Down
2 changes: 0 additions & 2 deletions crates/matrix-sdk/src/sliding_sync/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,8 +280,6 @@ impl SlidingSyncBuilder {
lists,
rooms,

reset_counter: Default::default(),

position: StdRwLock::new(SlidingSyncPositionMarkers {
pos: None,
delta_token,
Expand Down
83 changes: 24 additions & 59 deletions crates/matrix-sdk/src/sliding_sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,7 @@ use std::{
collections::{BTreeMap, BTreeSet},
fmt::Debug,
future::Future,
sync::{
atomic::{AtomicU8, Ordering},
Arc, RwLock as StdRwLock,
},
sync::{Arc, RwLock as StdRwLock},
time::Duration,
};

Expand Down Expand Up @@ -59,15 +56,6 @@ use url::Url;
use self::sticky_parameters::{LazyTransactionId, SlidingSyncStickyManager, StickyData};
use crate::{config::RequestConfig, Client, Result};

/// Number of times a Sliding Sync session can expire before raising an error.
///
/// A Sliding Sync session can expire. In this case, it is reset. However, to
/// avoid entering an infinite loop of “it's expired, let's reset, it's expired,
/// let's reset…” (maybe if the network has an issue, or the server, or anything
/// else), we define a maximum times a session can expire before
/// raising a proper error.
const MAXIMUM_SLIDING_SYNC_SESSION_EXPIRATION: u8 = 3;

/// The Sliding Sync instance.
///
/// It is OK to clone this type as much as you need: cloning it is cheap.
Expand Down Expand Up @@ -111,9 +99,6 @@ pub(super) struct SlidingSyncInner {
/// Rooms to unsubscribe, see [`Self::room_subscriptions`].
room_unsubscriptions: StdRwLock<BTreeSet<OwnedRoomId>>,

/// Number of times a Sliding Sync session has been reset.
reset_counter: AtomicU8,

/// Internal channel used to pass messages between Sliding Sync and other
/// types.
internal_channel: Sender<SlidingSyncInternalMessage>,
Expand Down Expand Up @@ -590,35 +575,15 @@ impl SlidingSync {
update_summary = self.sync_once().instrument(sync_span.clone()) => {
match update_summary {
Ok(updates) => {
self.inner.reset_counter.store(0, Ordering::SeqCst);

yield Ok(updates);
}

Err(error) => {
if error.client_api_error_kind() == Some(&ErrorKind::UnknownPos) {
// The session has expired.

// Has it expired too many times?
if self.inner.reset_counter.fetch_add(1, Ordering::SeqCst)
>= MAXIMUM_SLIDING_SYNC_SESSION_EXPIRATION
{
sync_span.in_scope(|| {
error!("Session expired {MAXIMUM_SLIDING_SYNC_SESSION_EXPIRATION} times in a row");
});

// The session has expired too many times, let's raise an error!
yield Err(error);

// Terminates the loop, and terminates the stream.
break;
}

// Let's reset the Sliding Sync session.
// The Sliding Sync session has expired. Let's reset `pos` and sticky parameters.
sync_span.in_scope(|| async {
warn!("Session expired. Restarting Sliding Sync.");
warn!("Session expired; resetting `pos` and sticky parameters");

// To “restart” a Sliding Sync session, we set `pos` to its initial value, and uncommit the sticky parameters, so they're sent next time.
{
let mut position_lock = self.inner.position.write().unwrap();
position_lock.pos = None;
Expand All @@ -628,15 +593,12 @@ impl SlidingSync {
let _ = self.inner.sticky.write().unwrap().data_mut();

self.inner.lists.read().await.values().for_each(|list| list.invalidate_sticky_data());

debug!(?self.inner.position, "Sliding Sync has been reset");
}).await;

continue;
}

yield Err(error);

// Terminates the loop, and terminates the stream.
break;
}
}
Expand Down Expand Up @@ -1190,7 +1152,7 @@ mod tests {
}

#[async_test]
async fn test_sticky_parameters_invalidated_by_reset() -> Result<()> {
async fn test_unknown_pos_resets_pos_and_sticky_parameters() -> Result<()> {
let server = MockServer::start().await;
let client = logged_in_client(Some(server.uri())).await;

Expand All @@ -1208,26 +1170,19 @@ mod tests {
let sync = sliding_sync.sync();
pin_mut!(sync);

#[derive(Clone)]
struct SlidingSyncMatcher;

impl Match for SlidingSyncMatcher {
fn matches(&self, request: &wiremock::Request) -> bool {
request.url.path() == "/_matrix/client/unstable/org.matrix.msc3575/sync"
&& request.method == wiremock::http::Method::Post
}
}
// `pos` is `None` to start with.
assert!(sliding_sync.inner.position.read().unwrap().pos.is_none());

#[derive(Deserialize)]
struct PartialRequest {
txn_id: Option<String>,
}

let _mock_guard = wiremock::Mock::given(SlidingSyncMatcher)
.respond_with(|request: &wiremock::Request| {
let _mock_guard = Mock::given(SlidingSyncMatcher)
.respond_with(|request: &Request| {
// Repeat the txn_id in the response, if set.
let request: PartialRequest = request.body_json().unwrap();
wiremock::ResponseTemplate::new(200).set_body_json(json!({
ResponseTemplate::new(200).set_body_json(json!({
"txn_id": request.txn_id,
"pos": "0"
}))
Expand All @@ -1238,6 +1193,9 @@ mod tests {
let next = sync.next().await;
assert_matches!(next, Some(Ok(_update_summary)));

// `pos` has been updated.
assert_eq!(sliding_sync.inner.position.read().unwrap().pos, Some("0".to_owned()));

// Next request doesn't ask to enable the extension.
let (request, _, _) =
sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
Expand All @@ -1250,9 +1208,9 @@ mod tests {
drop(_mock_guard);

// When responding with M_UNKNOWN_POS, that regenerates the sticky parameters,
// so they're reset.
let _mock_guard = wiremock::Mock::given(SlidingSyncMatcher)
.respond_with(wiremock::ResponseTemplate::new(400).set_body_json(json!({
// so they're reset. It also resets the `pos`.
let _mock_guard = Mock::given(SlidingSyncMatcher)
.respond_with(ResponseTemplate::new(400).set_body_json(json!({
"error": "foo",
"errcode": "M_UNKNOWN_POS",
})))
Expand All @@ -1261,14 +1219,21 @@ mod tests {

let next = sync.next().await;

// The request will retry a few times, then end in an error eventually.
// The expected error is returned.
assert_matches!(next, Some(Err(err)) if err.client_api_error_kind() == Some(&ErrorKind::UnknownPos));

// `pos` has been reset.
assert!(sliding_sync.inner.position.read().unwrap().pos.is_none());

// Next request asks to enable the extension again.
let (request, _, _) =
sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;

assert!(request.extensions.to_device.enabled.is_some());

// `sync` has been stopped.
assert!(sync.next().await.is_none());

Ok(())
}

Expand Down

0 comments on commit d105b72

Please sign in to comment.