Skip to content

Commit

Permalink
sliding sync: lazily generate and include the transaction id, only if…
Browse files Browse the repository at this point in the history
… it's useful (matrix-org#2063)

* feat: lazily generate and include the transaction id, only if it's useful
* chore: add a small `LazyTransactionId` wrapper that ensures it's only created once

---------

Signed-off-by: Benjamin Bouvier <[email protected]>
  • Loading branch information
bnjbvr authored Jun 15, 2023
1 parent 16a63d3 commit 1fd039c
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 32 deletions.
19 changes: 11 additions & 8 deletions crates/matrix-sdk/src/sliding_sync/list/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ use tokio::sync::broadcast::Sender;
use tracing::{instrument, warn};

use self::sticky::SlidingSyncListStickyParameters;
use super::{sticky_parameters::SlidingSyncStickyManager, Error, SlidingSyncInternalMessage};
use super::{
sticky_parameters::{LazyTransactionId, SlidingSyncStickyManager},
Error, SlidingSyncInternalMessage,
};
use crate::Result;

/// Should this [`SlidingSyncList`] be stored in the cache, and automatically
Expand Down Expand Up @@ -197,7 +200,7 @@ impl SlidingSyncList {
/// ([`SlidingSyncListRequestGenerator`]).
pub(super) fn next_request(
&self,
txn_id: &TransactionId,
txn_id: &mut LazyTransactionId,
) -> Result<v4::SyncRequestList, Error> {
self.inner.next_request(txn_id)
}
Expand Down Expand Up @@ -343,7 +346,7 @@ impl SlidingSyncListInner {
}

/// Update the state to the next request, and return it.
fn next_request(&self, txn_id: &TransactionId) -> Result<v4::SyncRequestList, Error> {
fn next_request(&self, txn_id: &mut LazyTransactionId) -> Result<v4::SyncRequestList, Error> {
let ranges = {
// Use a dedicated scope to ensure the lock is released before continuing.
let mut request_generator = self.request_generator.write().unwrap();
Expand All @@ -358,7 +361,7 @@ impl SlidingSyncListInner {
/// Build a [`SyncRequestList`][v4::SyncRequestList] based on the current
/// state of the request generator.
#[instrument(skip(self), fields(name = self.name))]
fn request(&self, ranges: Ranges, txn_id: &TransactionId) -> v4::SyncRequestList {
fn request(&self, ranges: Ranges, txn_id: &mut LazyTransactionId) -> v4::SyncRequestList {
use ruma::UInt;
let ranges =
ranges.into_iter().map(|r| (UInt::from(*r.start()), UInt::from(*r.end()))).collect();
Expand Down Expand Up @@ -989,7 +992,7 @@ mod tests {
let room1 = room_id!("!room1:bar.org");

// Simulate a request.
let _ = list.next_request("tid".into());
let _ = list.next_request(&mut LazyTransactionId::new());

// A new response.
let sync0: v4::SyncOp = serde_json::from_value(json!({
Expand Down Expand Up @@ -1025,7 +1028,7 @@ mod tests {
$(
{
// Generate a new request.
let request = $list.next_request("tid".into()).unwrap();
let request = $list.next_request(&mut LazyTransactionId::new()).unwrap();

assert_eq!(
request.ranges,
Expand Down Expand Up @@ -1482,7 +1485,7 @@ mod tests {
// Initial range.
for _ in 0..=1 {
// Simulate a request.
let _ = list.next_request("tid".into());
let _ = list.next_request(&mut LazyTransactionId::new());

// A new response.
let sync: v4::SyncOp = serde_json::from_value(json!({
Expand Down Expand Up @@ -1525,7 +1528,7 @@ mod tests {
});

// Simulate a request.
let _ = list.next_request("tid".into());
let _ = list.next_request(&mut LazyTransactionId::new());

// A new response.
let sync: v4::SyncOp = serde_json::from_value(json!({
Expand Down
53 changes: 34 additions & 19 deletions crates/matrix-sdk/src/sliding_sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use ruma::{
error::ErrorKind,
sync::sync_events::v4::{self, ExtensionsConfig},
},
assign, OwnedRoomId, OwnedTransactionId, RoomId, TransactionId,
assign, OwnedRoomId, RoomId,
};
use serde::{Deserialize, Serialize};
use tokio::{
Expand All @@ -56,7 +56,7 @@ use tokio::{
use tracing::{debug, error, instrument, warn, Instrument, Span};
use url::Url;

use self::sticky_parameters::{SlidingSyncStickyManager, StickyData};
use self::sticky_parameters::{LazyTransactionId, SlidingSyncStickyManager, StickyData};
use crate::{config::RequestConfig, Client, Result};

/// Number of times a Sliding Sync session can expire before raising an error.
Expand Down Expand Up @@ -376,7 +376,7 @@ impl SlidingSync {

async fn generate_sync_request(
&self,
txn_id: OwnedTransactionId,
txn_id: &mut LazyTransactionId,
) -> Result<(v4::Request, RequestConfig, BTreeSet<OwnedRoomId>)> {
// Collect requests for lists.
let mut requests_lists = BTreeMap::new();
Expand All @@ -385,7 +385,7 @@ impl SlidingSync {
let lists = self.inner.lists.read().await;

for (name, list) in lists.iter() {
requests_lists.insert(name.clone(), list.next_request(&txn_id)?);
requests_lists.insert(name.clone(), list.next_request(txn_id)?);
}
}

Expand All @@ -403,7 +403,6 @@ impl SlidingSync {
let timeout = Duration::from_secs(30);

let mut request = assign!(v4::Request::new(), {
txn_id: Some(txn_id.to_string()),
pos,
delta_token,
timeout: Some(timeout),
Expand All @@ -414,7 +413,7 @@ impl SlidingSync {
{
let mut sticky_params = self.inner.sticky.write().unwrap();

sticky_params.maybe_apply(&mut request, &txn_id);
sticky_params.maybe_apply(&mut request, txn_id);

// Set the to_device token if the extension is enabled.
if sticky_params.data().extensions.to_device.enabled == Some(true) {
Expand All @@ -423,6 +422,11 @@ impl SlidingSync {
}
}

// Apply the transaction id if one was generated.
if let Some(txn_id) = txn_id.get() {
request.txn_id = Some(txn_id.to_string());
}

Ok((
// The request itself.
request,
Expand All @@ -436,7 +440,7 @@ impl SlidingSync {
#[instrument(skip_all, fields(pos))]
async fn sync_once(&self) -> Result<UpdateSummary> {
let (request, request_config, requested_room_unsubscriptions) =
self.generate_sync_request(TransactionId::new()).await?;
self.generate_sync_request(&mut LazyTransactionId::new()).await?;

debug!("Sending the sliding sync request");

Expand Down Expand Up @@ -775,7 +779,7 @@ impl StickyData for SlidingSyncStickyParameters {
mod tests {
use assert_matches::assert_matches;
use futures_util::{pin_mut, StreamExt};
use ruma::{api::client::sync::sync_events::v4::ToDeviceConfig, room_id};
use ruma::{api::client::sync::sync_events::v4::ToDeviceConfig, room_id, TransactionId};
use serde_json::json;
use wiremock::{Match, MockServer};

Expand Down Expand Up @@ -918,7 +922,7 @@ mod tests {
let mut request = v4::Request::default();
request.txn_id = Some(txn_id.to_string());

sticky.maybe_apply(&mut request, txn_id);
sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));

assert!(request.txn_id.is_some());
assert_eq!(request.room_subscriptions.len(), 1);
Expand All @@ -944,7 +948,7 @@ mod tests {
let txn_id1: &TransactionId = "tid456".into();
let mut request1 = v4::Request::default();
request1.txn_id = Some(txn_id1.to_string());
sticky.maybe_apply(&mut request1, txn_id1);
sticky.maybe_apply(&mut request1, &mut LazyTransactionId::from_owned(txn_id1.to_owned()));

assert!(sticky.is_invalidated());
assert_eq!(request1.room_subscriptions.len(), 2);
Expand All @@ -953,7 +957,7 @@ mod tests {
let mut request2 = v4::Request::default();
request2.txn_id = Some(txn_id2.to_string());

sticky.maybe_apply(&mut request2, txn_id2);
sticky.maybe_apply(&mut request2, &mut LazyTransactionId::from_owned(txn_id2.to_owned()));
assert!(sticky.is_invalidated());
assert_eq!(request2.room_subscriptions.len(), 2);

Expand Down Expand Up @@ -993,7 +997,7 @@ mod tests {
let txn_id: &TransactionId = "tid123".into();
let mut request = v4::Request::default();
request.txn_id = Some(txn_id.to_string());
sticky.maybe_apply(&mut request, txn_id);
sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
assert!(sticky.is_invalidated());
assert_eq!(request.extensions.to_device.enabled, None);
assert_eq!(request.extensions.to_device.since, None);
Expand Down Expand Up @@ -1023,7 +1027,9 @@ mod tests {
// Even without a since token, the first request will contain the extensions
// configuration, at least.
let txn_id = TransactionId::new();
let (request, _, _) = sync.generate_sync_request(txn_id.clone()).await?;
let (request, _, _) = sync
.generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
.await?;

assert_eq!(request.extensions.e2ee.enabled, Some(true));
assert_eq!(request.extensions.to_device.enabled, Some(true));
Expand All @@ -1041,7 +1047,9 @@ mod tests {

// Regenerating a request will yield the same one.
let txn_id2 = TransactionId::new();
let (request, _, _) = sync.generate_sync_request(txn_id2.clone()).await?;
let (request, _, _) = sync
.generate_sync_request(&mut LazyTransactionId::from_owned(txn_id2.to_owned()))
.await?;

assert_eq!(request.extensions.e2ee.enabled, Some(true));
assert_eq!(request.extensions.to_device.enabled, Some(true));
Expand All @@ -1059,7 +1067,9 @@ mod tests {

// The next request should contain no sticky parameters.
let txn_id = TransactionId::new();
let (request, _, _) = sync.generate_sync_request(txn_id).await?;
let (request, _, _) = sync
.generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
.await?;
assert!(request.extensions.e2ee.enabled.is_none());
assert!(request.extensions.to_device.enabled.is_none());
assert!(request.extensions.to_device.since.is_none());
Expand All @@ -1071,7 +1081,9 @@ mod tests {
sync.inner.position.write().unwrap().to_device_token = Some(since_token.to_owned());

let txn_id = TransactionId::new();
let (request, _, _) = sync.generate_sync_request(txn_id).await?;
let (request, _, _) = sync
.generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
.await?;

assert!(request.extensions.e2ee.enabled.is_none());
assert!(request.extensions.to_device.enabled.is_none());
Expand All @@ -1092,7 +1104,8 @@ mod tests {
.await?;

// First request asks to enable the extension.
let (request, _, _) = sliding_sync.generate_sync_request(TransactionId::new()).await?;
let (request, _, _) =
sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
assert!(request.extensions.to_device.enabled.is_some());

let sync = sliding_sync.sync();
Expand Down Expand Up @@ -1129,7 +1142,8 @@ mod tests {
assert_matches!(next, Some(Ok(_update_summary)));

// Next request doesn't ask to enable the extension.
let (request, _, _) = sliding_sync.generate_sync_request(TransactionId::new()).await?;
let (request, _, _) =
sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
assert!(request.extensions.to_device.enabled.is_none());

let next = sync.next().await;
Expand All @@ -1154,7 +1168,8 @@ mod tests {
assert_matches!(next, Some(Err(err)) if err.client_api_error_kind() == Some(&ErrorKind::UnknownPos));

// Next request asks to enable the extension again.
let (request, _, _) = sliding_sync.generate_sync_request(TransactionId::new()).await?;
let (request, _, _) =
sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
assert!(request.extensions.to_device.enabled.is_some());

Ok(())
Expand Down
53 changes: 48 additions & 5 deletions crates/matrix-sdk/src/sliding_sync/sticky_parameters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,40 @@
use ruma::{OwnedTransactionId, TransactionId};

/// An `OwnedTransactionId` that is either initialized at creation, or
/// lazily-generated once.
#[derive(Debug)]
pub struct LazyTransactionId {
txn_id: Option<OwnedTransactionId>,
}

impl LazyTransactionId {
/// Create a new `LazyTransactionId`, not set.
pub fn new() -> Self {
Self { txn_id: None }
}

/// Get (or create it, if never set) a `TransactionId`.
pub fn get_or_create(&mut self) -> &TransactionId {
self.txn_id.get_or_insert_with(TransactionId::new)
}

/// Attempt to get the underlying `TransactionId` without creating it, if
/// missing.
pub fn get(&self) -> Option<&TransactionId> {
self.txn_id.as_deref()
}
}

#[cfg(test)]
impl LazyTransactionId {
/// Create a `LazyTransactionId` for a given known transaction id. For
/// testing only.
pub fn from_owned(owned: OwnedTransactionId) -> Self {
Self { txn_id: Some(owned) }
}
}

/// A trait to implement for data that can be sticky, given a context.
pub trait StickyData {
/// Request type that will be applied to, if the sticky parameters have been
Expand Down Expand Up @@ -72,8 +106,12 @@ impl<D: StickyData> SlidingSyncStickyManager<D> {
/// After receiving the response from this sliding sync, the caller MUST
/// also call [`Self::maybe_commit`] with the transaction id from the
/// server's response.
pub fn maybe_apply(&mut self, req: &mut D::Request, txn_id: &TransactionId) {
///
/// If no `txn_id` is provided, it will generate one that can be reused
/// later.
pub fn maybe_apply(&mut self, req: &mut D::Request, txn_id: &mut LazyTransactionId) {
if self.invalidated {
let txn_id = txn_id.get_or_create();
self.txn_id = Some(txn_id.to_owned());
self.data.apply(req);
}
Expand Down Expand Up @@ -116,23 +154,28 @@ mod tests {
assert!(sticky.is_invalidated());

let mut applied = false;
sticky.maybe_apply(&mut applied, "tid123".into());
let mut txn_id = LazyTransactionId::new();
sticky.maybe_apply(&mut applied, &mut txn_id);
assert!(applied);
assert!(sticky.is_invalidated());
assert!(txn_id.get().is_some(), "a transaction id was lazily generated");

// Committing with the wrong transaction id won't commit.
sticky.maybe_commit("tid456".into());
assert!(sticky.is_invalidated());

// Providing the correct transaction id will commit.
sticky.maybe_commit("tid123".into());
sticky.maybe_commit(txn_id.get().unwrap());
assert!(!sticky.is_invalidated());

// Applying without being invalidated won't do anything.
// Applying without being invalidated won't do anything, and not generate a
// transaction id.
let mut txn_id = LazyTransactionId::new();
let mut applied = false;
sticky.maybe_apply(&mut applied, "tid123".into());
sticky.maybe_apply(&mut applied, &mut txn_id);

assert!(!applied);
assert!(!sticky.is_invalidated());
assert!(txn_id.get().is_none());
}
}

0 comments on commit 1fd039c

Please sign in to comment.