From cc69efc3f8e306a88c853c8542f63963955cdcfe Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Thu, 9 Mar 2023 15:12:11 +0100 Subject: [PATCH 1/9] feat(sdk): Move `Client.root_span` inside `ClientInner`. --- crates/matrix-sdk/src/client/builder.rs | 5 ++-- crates/matrix-sdk/src/client/login_builder.rs | 4 +-- crates/matrix-sdk/src/client/mod.rs | 23 ++++++++------- crates/matrix-sdk/src/room/joined.rs | 28 +++++++++---------- crates/matrix-sdk/src/room/timeline/mod.rs | 2 +- crates/matrix-sdk/src/sliding_sync/mod.rs | 2 +- crates/matrix-sdk/src/sliding_sync/room.rs | 2 +- 7 files changed, 35 insertions(+), 31 deletions(-) diff --git a/crates/matrix-sdk/src/client/builder.rs b/crates/matrix-sdk/src/client/builder.rs index 98d4e411ee3..33b836fa4e3 100644 --- a/crates/matrix-sdk/src/client/builder.rs +++ b/crates/matrix-sdk/src/client/builder.rs @@ -445,6 +445,7 @@ impl ClientBuilder { handle_refresh_tokens: self.handle_refresh_tokens, refresh_token_lock: Mutex::new(Ok(())), unknown_token_error_sender, + root_span: self.root_span, }); debug!("Done building the Client"); @@ -453,9 +454,9 @@ impl ClientBuilder { // only uploaded to a OpenTelemetry collector if the span gets dropped. // We still want it around so other methods that get called by this // client instance are connected to it, so we clone. - drop(self.root_span.clone()); + // drop(self.root_span.clone()); - Ok(Client { inner, root_span: self.root_span }) + Ok(Client { inner }) } } diff --git a/crates/matrix-sdk/src/client/login_builder.rs b/crates/matrix-sdk/src/client/login_builder.rs index b589c372e91..ee83b5ed44e 100644 --- a/crates/matrix-sdk/src/client/login_builder.rs +++ b/crates/matrix-sdk/src/client/login_builder.rs @@ -158,7 +158,7 @@ impl LoginBuilder { /// Instead of calling this function and `.await`ing its return value, you /// can also `.await` the `LoginBuilder` directly. #[instrument( - parent = &self.client.root_span, + parent = &self.client.inner.root_span, target = "matrix_sdk::client", name = "login", skip_all, @@ -296,7 +296,7 @@ where /// Instead of calling this function and `.await`ing its return value, you /// can also `.await` the `SsoLoginBuilder` directly. #[instrument( - parent = &self.client.root_span, + parent = &self.client.inner.root_span, target = "matrix_sdk::client", name = "login", skip_all, diff --git a/crates/matrix-sdk/src/client/mod.rs b/crates/matrix-sdk/src/client/mod.rs index 1696b7f4d24..d5de7647acd 100644 --- a/crates/matrix-sdk/src/client/mod.rs +++ b/crates/matrix-sdk/src/client/mod.rs @@ -140,7 +140,6 @@ pub struct UnknownToken { #[derive(Clone)] pub struct Client { pub(crate) inner: Arc, - pub(crate) root_span: Span, } pub(crate) struct ClientInner { @@ -193,6 +192,8 @@ pub(crate) struct ClientInner { /// Client API UnknownToken error publisher. Allows the subscriber logout /// the user when any request fails because of an invalid access token pub(crate) unknown_token_error_sender: broadcast::Sender, + /// Root span for `tracing`. + pub(crate) root_span: Span, } #[cfg(not(tarpaulin_include))] @@ -1208,13 +1209,14 @@ impl Client { } } - self.root_span + self.inner + .root_span .record("user_id", display(&response.user_id)) .record("device_id", display(&response.device_id)); #[cfg(feature = "e2e-encryption")] if let Some(key) = self.encryption().ed25519_key().await { - self.root_span.record("ed25519_key", key); + self.inner.root_span.record("ed25519_key", key); } self.inner.base_client.receive_login_response(response).await?; @@ -1281,13 +1283,14 @@ impl Client { /// ``` /// /// [`login`]: #method.login - #[instrument(skip_all, parent = &self.root_span)] + #[instrument(skip_all, parent = &self.inner.root_span)] pub async fn restore_session(&self, session: Session) -> Result<()> { debug!("Restoring session"); let (meta, tokens) = session.into_parts(); - self.root_span + self.inner + .root_span .record("user_id", display(&meta.user_id)) .record("device_id", display(&meta.device_id)); @@ -1296,7 +1299,7 @@ impl Client { #[cfg(feature = "e2e-encryption")] if let Some(key) = self.encryption().ed25519_key().await { - self.root_span.record("ed25519_key", key); + self.inner.root_span.record("ed25519_key", key); } debug!("Done restoring session"); @@ -1483,7 +1486,7 @@ impl Client { /// client.register(request).await; /// # }) /// ``` - #[instrument(skip_all, parent = &self.root_span)] + #[instrument(skip_all, parent = &self.inner.root_span)] pub async fn register( &self, request: register::v3::Request, @@ -1546,7 +1549,7 @@ impl Client { /// /// let response = client.sync_once(sync_settings).await.unwrap(); /// # }); - #[instrument(skip(self, definition), parent = &self.root_span)] + #[instrument(skip(self, definition), parent = &self.inner.root_span)] pub async fn get_or_upload_filter( &self, filter_name: &str, @@ -2273,7 +2276,7 @@ impl Client { /// .await; /// }) /// ``` - #[instrument(skip_all, parent = &self.root_span)] + #[instrument(skip_all, parent = &self.inner.root_span)] pub async fn sync_with_callback( &self, sync_settings: crate::config::SyncSettings, @@ -2428,7 +2431,7 @@ impl Client { /// /// # anyhow::Ok(()) }); /// ``` - #[instrument(skip(self), parent = &self.root_span)] + #[instrument(skip(self), parent = &self.inner.root_span)] pub async fn sync_stream( &self, mut sync_settings: crate::config::SyncSettings, diff --git a/crates/matrix-sdk/src/room/joined.rs b/crates/matrix-sdk/src/room/joined.rs index 8fdd6d17151..86eb8eb8bbe 100644 --- a/crates/matrix-sdk/src/room/joined.rs +++ b/crates/matrix-sdk/src/room/joined.rs @@ -85,7 +85,7 @@ impl Joined { } /// Leave this room. - #[instrument(skip_all, parent = &self.client.root_span)] + #[instrument(skip_all, parent = &self.client.inner.root_span)] pub async fn leave(&self) -> Result { self.inner.leave().await } @@ -97,7 +97,7 @@ impl Joined { /// * `user_id` - The user to ban with `UserId`. /// /// * `reason` - The reason for banning this user. - #[instrument(skip_all, parent = &self.client.root_span)] + #[instrument(skip_all, parent = &self.client.inner.root_span)] pub async fn ban_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> { let request = assign!( ban_user::v3::Request::new(self.inner.room_id().to_owned(), user_id.to_owned()), @@ -115,7 +115,7 @@ impl Joined { /// room. /// /// * `reason` - Optional reason why the room member is being kicked out. - #[instrument(skip_all, parent = &self.client.root_span)] + #[instrument(skip_all, parent = &self.client.inner.root_span)] pub async fn kick_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> { let request = assign!( kick_user::v3::Request::new(self.inner.room_id().to_owned(), user_id.to_owned()), @@ -130,7 +130,7 @@ impl Joined { /// # Arguments /// /// * `user_id` - The `UserId` of the user to invite to the room. - #[instrument(skip_all, parent = &self.client.root_span)] + #[instrument(skip_all, parent = &self.client.inner.root_span)] pub async fn invite_user_by_id(&self, user_id: &UserId) -> Result<()> { let recipient = InvitationRecipient::UserId { user_id: user_id.to_owned() }; @@ -145,7 +145,7 @@ impl Joined { /// # Arguments /// /// * `invite_id` - A third party id of a user to invite to the room. - #[instrument(skip_all, parent = &self.client.root_span)] + #[instrument(skip_all, parent = &self.client.inner.root_span)] pub async fn invite_user_by_3pid(&self, invite_id: Invite3pid) -> Result<()> { let recipient = InvitationRecipient::ThirdPartyId(invite_id); let request = invite_user::v3::Request::new(self.inner.room_id().to_owned(), recipient); @@ -219,7 +219,7 @@ impl Joined { Ok(()) } - #[instrument(name = "typing_notice", skip(self), parent = &self.client.root_span)] + #[instrument(name = "typing_notice", skip(self), parent = &self.client.inner.root_span)] async fn send_typing_notice(&self, typing: bool) -> Result<()> { let typing = if typing { self.client @@ -256,7 +256,7 @@ impl Joined { /// [`ReceiptType::FullyRead`]. /// /// * `event_id` - The `EventId` of the event to set the receipt on. - #[instrument(skip_all, parent = &self.client.root_span)] + #[instrument(skip_all, parent = &self.client.inner.root_span)] pub async fn send_single_receipt( &self, receipt_type: ReceiptType, @@ -281,7 +281,7 @@ impl Joined { /// * `receipts` - The `Receipts` to send. /// /// If `receipts` is empty, this is a no-op. - #[instrument(skip_all, parent = &self.client.root_span)] + #[instrument(skip_all, parent = &self.client.inner.root_span)] pub async fn send_multiple_receipts(&self, receipts: Receipts) -> Result<()> { if receipts.is_empty() { return Ok(()); @@ -329,7 +329,7 @@ impl Joined { /// } /// # anyhow::Ok(()) }); /// ``` - #[instrument(skip_all, parent = &self.client.root_span)] + #[instrument(skip_all, parent = &self.client.inner.root_span)] pub async fn enable_encryption(&self) -> Result<()> { use ruma::{ events::room::encryption::RoomEncryptionEventContent, EventEncryptionAlgorithm, @@ -430,7 +430,7 @@ impl Joined { /// Warning: This waits until a sync happens and does not return if no sync /// is happening! It can also return early when the room is not a joined /// room anymore! - #[instrument(skip_all, parent = &self.client.root_span)] + #[instrument(skip_all, parent = &self.client.inner.root_span)] pub async fn sync_up(&self) { while !self.is_synced() && self.room_type() == RoomType::Joined { self.client.inner.sync_beat.listen().wait_timeout(Duration::from_secs(1)); @@ -701,7 +701,7 @@ impl Joined { /// } /// # anyhow::Ok(()) }); /// ``` - #[instrument(skip_all, parent = &self.client.root_span)] + #[instrument(skip_all, parent = &self.client.inner.root_span)] pub async fn send_attachment( &self, body: &str, @@ -901,7 +901,7 @@ impl Joined { /// joined_room.send_state_event(content).await?; /// # anyhow::Ok(()) }; /// ``` - #[instrument(skip_all, parent = &self.client.root_span)] + #[instrument(skip_all, parent = &self.client.inner.root_span)] pub async fn send_state_event( &self, content: impl StateEventContent, @@ -1001,7 +1001,7 @@ impl Joined { /// } /// # anyhow::Ok(()) }); /// ``` - #[instrument(skip_all, parent = &self.client.root_span)] + #[instrument(skip_all, parent = &self.client.inner.root_span)] pub async fn send_state_event_raw( &self, content: Value, @@ -1052,7 +1052,7 @@ impl Joined { /// } /// # anyhow::Ok(()) }); /// ``` - #[instrument(skip_all, parent = &self.client.root_span)] + #[instrument(skip_all, parent = &self.client.inner.root_span)] pub async fn redact( &self, event_id: &EventId, diff --git a/crates/matrix-sdk/src/room/timeline/mod.rs b/crates/matrix-sdk/src/room/timeline/mod.rs index fa92d594bfa..6108270fc1b 100644 --- a/crates/matrix-sdk/src/room/timeline/mod.rs +++ b/crates/matrix-sdk/src/room/timeline/mod.rs @@ -263,7 +263,7 @@ impl Timeline { /// /// [`MessageLikeUnsigned`]: ruma::events::MessageLikeUnsigned /// [`SyncMessageLikeEvent`]: ruma::events::SyncMessageLikeEvent - #[instrument(skip(self, content), parent = &self.inner.room().client.root_span, fields(room_id = ?self.room().room_id()))] + #[instrument(skip(self, content), parent = &self.inner.room().client.inner.root_span, fields(room_id = ?self.room().room_id()))] pub async fn send(&self, content: AnyMessageLikeEventContent, txn_id: Option<&TransactionId>) { let txn_id = txn_id.map_or_else(TransactionId::new, ToOwned::to_owned); self.inner.handle_local_event(txn_id.clone(), content.clone()).await; diff --git a/crates/matrix-sdk/src/sliding_sync/mod.rs b/crates/matrix-sdk/src/sliding_sync/mod.rs index 567c52ac49b..2c6726998bc 100644 --- a/crates/matrix-sdk/src/sliding_sync/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/mod.rs @@ -1139,7 +1139,7 @@ impl SlidingSync { /// /// This stream will send requests and will handle responses automatically, /// hence updating the lists. - #[instrument(name = "sync_stream", skip_all, parent = &self.inner.client.root_span)] + #[instrument(name = "sync_stream", skip_all, parent = &self.inner.client.inner.root_span)] pub fn stream(&self) -> impl Stream> + '_ { // Collect all the lists that need to be updated. let list_generators = { diff --git a/crates/matrix-sdk/src/sliding_sync/room.rs b/crates/matrix-sdk/src/sliding_sync/room.rs index 7da145844f8..75d8c47912c 100644 --- a/crates/matrix-sdk/src/sliding_sync/room.rs +++ b/crates/matrix-sdk/src/sliding_sync/room.rs @@ -100,7 +100,7 @@ impl SlidingSyncRoom { /// /// Use `Timeline::latest_event` instead if you already have a timeline for /// this `SlidingSyncRoom`. - #[instrument(skip_all, parent = &self.client.root_span)] + #[instrument(skip_all, parent = &self.client.inner.root_span)] pub async fn latest_event(&self) -> Option { self.timeline_builder()?.build().await.latest_event().await } From 7f52292e921e2094162f7f087957cd15d02d9a31 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Thu, 9 Mar 2023 16:43:41 +0100 Subject: [PATCH 2/9] chore(sdk): Remove useless comment. --- crates/matrix-sdk/src/client/builder.rs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/crates/matrix-sdk/src/client/builder.rs b/crates/matrix-sdk/src/client/builder.rs index 33b836fa4e3..a4ec7370a68 100644 --- a/crates/matrix-sdk/src/client/builder.rs +++ b/crates/matrix-sdk/src/client/builder.rs @@ -450,12 +450,6 @@ impl ClientBuilder { debug!("Done building the Client"); - // We drop the root span here so it gets pushed to the subscribers, i.e. it gets - // only uploaded to a OpenTelemetry collector if the span gets dropped. - // We still want it around so other methods that get called by this - // client instance are connected to it, so we clone. - // drop(self.root_span.clone()); - Ok(Client { inner }) } } From 3511a4a053a95f9936ce6e26b345d46871170406 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Thu, 9 Mar 2023 16:40:26 +0100 Subject: [PATCH 3/9] fix(sdk): `SlidingSync` has a lock for both `pos` _and_ `delta_token`. This patch updates `SlidingSync.pos` and `.delta_token`. Each field has their own lock. It's annoying because they must updated at the same time to not be out-of-sync. So a new field `SlidingSync.position` of type `SlidingSyncPositionMarkers` is created. It holds `pos` and `delta_token. This new field is behind a single `RwLock`. --- crates/matrix-sdk/src/sliding_sync/builder.rs | 26 ++++++----- crates/matrix-sdk/src/sliding_sync/mod.rs | 46 +++++++++++++------ 2 files changed, 46 insertions(+), 26 deletions(-) diff --git a/crates/matrix-sdk/src/sliding_sync/builder.rs b/crates/matrix-sdk/src/sliding_sync/builder.rs index 62d83974132..27e203c127c 100644 --- a/crates/matrix-sdk/src/sliding_sync/builder.rs +++ b/crates/matrix-sdk/src/sliding_sync/builder.rs @@ -17,7 +17,7 @@ use url::Url; use super::{ Error, FrozenSlidingSync, FrozenSlidingSyncList, SlidingSync, SlidingSyncInner, - SlidingSyncList, SlidingSyncListBuilder, SlidingSyncRoom, + SlidingSyncList, SlidingSyncListBuilder, SlidingSyncPositionMarkers, SlidingSyncRoom, }; use crate::{Client, Result}; @@ -232,7 +232,7 @@ impl SlidingSyncBuilder { pub async fn build(mut self) -> Result { let client = self.client.ok_or(Error::BuildMissingField("client"))?; - let mut delta_token_inner = None; + let mut delta_token = None; let mut rooms_found: BTreeMap = BTreeMap::new(); if let Some(storage_key) = &self.storage_key { @@ -261,12 +261,13 @@ impl SlidingSyncBuilder { } } - if let Some(FrozenSlidingSync { to_device_since, delta_token }) = client - .store() - .get_custom_value(storage_key.as_bytes()) - .await? - .map(|v| serde_json::from_slice::(&v)) - .transpose()? + if let Some(FrozenSlidingSync { to_device_since, delta_token: frozen_delta_token }) = + client + .store() + .get_custom_value(storage_key.as_bytes()) + .await? + .map(|v| serde_json::from_slice::(&v)) + .transpose()? { trace!("frozen for generic found"); @@ -278,7 +279,7 @@ impl SlidingSyncBuilder { } } - delta_token_inner = delta_token; + delta_token = frozen_delta_token; } trace!("sync unfrozen done"); @@ -300,8 +301,11 @@ impl SlidingSyncBuilder { extensions: Mutex::new(self.extensions), reset_counter: Default::default(), - pos: StdRwLock::new(Observable::new(None)), - delta_token: StdRwLock::new(Observable::new(delta_token_inner)), + position: StdRwLock::new(SlidingSyncPositionMarkers { + pos: Observable::new(None), + delta_token: Observable::new(delta_token), + }), + subscriptions: StdRwLock::new(self.subscriptions), unsubscribe: Default::default(), })) diff --git a/crates/matrix-sdk/src/sliding_sync/mod.rs b/crates/matrix-sdk/src/sliding_sync/mod.rs index 567c52ac49b..b19ececa92e 100644 --- a/crates/matrix-sdk/src/sliding_sync/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/mod.rs @@ -666,10 +666,8 @@ pub(super) struct SlidingSyncInner { /// The storage key to keep this cache at and load it from storage_key: Option, - /// The `pos` marker. - pos: StdRwLock>>, - - delta_token: StdRwLock>>, + /// The `pos` and `delta_token` markers. + position: StdRwLock, /// The lists of this Sliding Sync instance. lists: StdRwLock>, @@ -918,11 +916,11 @@ impl SlidingSync { mut sync_response: SyncResponse, list_generators: &mut BTreeMap, ) -> Result { - Observable::set(&mut self.inner.pos.write().unwrap(), Some(sliding_sync_response.pos)); - Observable::set( - &mut self.inner.delta_token.write().unwrap(), - sliding_sync_response.delta_token, - ); + { + let mut position_lock = self.inner.position.write().unwrap(); + Observable::set(&mut position_lock.pos, Some(sliding_sync_response.pos)); + Observable::set(&mut position_lock.delta_token, sliding_sync_response.delta_token); + } let update_summary = { let mut rooms = Vec::new(); @@ -1018,8 +1016,12 @@ impl SlidingSync { } } - let pos = self.inner.pos.read().unwrap().clone(); - let delta_token = self.inner.delta_token.read().unwrap().clone(); + let (pos, delta_token) = { + let position_lock = self.inner.position.read().unwrap(); + + (position_lock.pos.clone(), position_lock.delta_token.clone()) + }; + let room_subscriptions = self.inner.subscriptions.read().unwrap().clone(); let unsubscribe_rooms = mem::take(&mut *self.inner.unsubscribe.write().unwrap()); let timeout = Duration::from_secs(30); @@ -1198,7 +1200,11 @@ impl SlidingSync { warn!("Session expired. Restarting Sliding Sync."); // To “restart” a Sliding Sync session, we set `pos` to its initial value. - Observable::set(&mut self.inner.pos.write().unwrap(), None); + { + let mut position_lock = self.inner.position.write().unwrap(); + + Observable::set(&mut position_lock.pos, None); + } debug!(?self.inner.extensions, "Sliding Sync has been reset"); }); @@ -1218,15 +1224,25 @@ impl SlidingSync { impl SlidingSync { /// Get a copy of the `pos` value. pub fn pos(&self) -> Option { - self.inner.pos.read().unwrap().clone() + let position_lock = self.inner.position.read().unwrap(); + + position_lock.pos.clone() } /// Set a new value for `pos`. pub fn set_pos(&self, new_pos: String) { - Observable::set(&mut self.inner.pos.write().unwrap(), Some(new_pos)); + let mut position_lock = self.inner.position.write().unwrap(); + + Observable::set(&mut position_lock.pos, Some(new_pos)); } } +#[derive(Debug)] +pub(super) struct SlidingSyncPositionMarkers { + pos: Observable>, + delta_token: Observable>, +} + #[derive(Serialize, Deserialize)] struct FrozenSlidingSync { #[serde(skip_serializing_if = "Option::is_none")] @@ -1238,7 +1254,7 @@ struct FrozenSlidingSync { impl From<&SlidingSync> for FrozenSlidingSync { fn from(sliding_sync: &SlidingSync) -> Self { FrozenSlidingSync { - delta_token: sliding_sync.inner.delta_token.read().unwrap().clone(), + delta_token: sliding_sync.inner.position.read().unwrap().delta_token.clone(), to_device_since: sliding_sync .inner .extensions From 2bb66f0f2e0b7ec82c887e42c7f78093dca7603d Mon Sep 17 00:00:00 2001 From: Mauro <34335419+Velin92@users.noreply.github.com> Date: Fri, 10 Mar 2023 18:07:13 +0100 Subject: [PATCH 4/9] bindings: Expose `leave` and `reject_invitation` functions to FFI --- bindings/matrix-sdk-ffi/src/api.udl | 6 ++++++ bindings/matrix-sdk-ffi/src/room.rs | 30 +++++++++++++++++++++++++++++ 2 files changed, 36 insertions(+) diff --git a/bindings/matrix-sdk-ffi/src/api.udl b/bindings/matrix-sdk-ffi/src/api.udl index a85047e5765..83304018304 100644 --- a/bindings/matrix-sdk-ffi/src/api.udl +++ b/bindings/matrix-sdk-ffi/src/api.udl @@ -282,6 +282,12 @@ interface Room { [Throws=ClientError] void send_reaction(string event_id, string key); + + [Throws=ClientError] + void leave(); + + [Throws=ClientError] + void reject_invitation(); }; callback interface TimelineListener { diff --git a/bindings/matrix-sdk-ffi/src/room.rs b/bindings/matrix-sdk-ffi/src/room.rs index f011e63aa08..dc397b24074 100644 --- a/bindings/matrix-sdk-ffi/src/room.rs +++ b/bindings/matrix-sdk-ffi/src/room.rs @@ -501,6 +501,36 @@ impl Room { Ok(()) }) } + + /// Leaves the joined room. + /// + /// Will throw an error if used on an room that isn't in a joined state + pub fn leave(&self) -> Result<()> { + let room = match &self.room { + SdkRoom::Joined(j) => j.clone(), + _ => bail!("Can't leave a room that isn't in joined state"), + }; + + RUNTIME.block_on(async move { + room.leave().await?; + Ok(()) + }) + } + + /// Rejects invitation for the invited room. + /// + /// Will throw an error if used on an room that isn't in an invited state + pub fn reject_invitation(&self) -> Result<()> { + let room = match &self.room { + SdkRoom::Invited(i) => i.clone(), + _ => bail!("Can't reject an invite for a room that isn't in invited state"), + }; + + RUNTIME.block_on(async move { + room.reject_invitation().await?; + Ok(()) + }) + } } impl std::ops::Deref for Room { From b148ea2fb1ae9e2d9b03403e8f238407a129ceb7 Mon Sep 17 00:00:00 2001 From: Jonas Platte Date: Fri, 10 Mar 2023 10:57:46 +0100 Subject: [PATCH 5/9] Bump locked version of clap To work around this clippy bug: https://github.com/rust-lang/rust-clippy/issues/10421 --- Cargo.lock | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 94baec5b631..6fe0fdac899 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -700,12 +700,12 @@ dependencies = [ [[package]] name = "clap" -version = "4.1.6" +version = "4.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec0b0588d44d4d63a87dbd75c136c166bbfd9a86a31cb89e09906521c7d3f5e3" +checksum = "c3d7ae14b20b94cb02149ed21a86c423859cbe18dc7ed69845cace50e52b40a5" dependencies = [ "bitflags", - "clap_derive 4.1.0", + "clap_derive 4.1.8", "clap_lex 0.3.1", "is-terminal", "once_cell", @@ -728,9 +728,9 @@ dependencies = [ [[package]] name = "clap_derive" -version = "4.1.0" +version = "4.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "684a277d672e91966334af371f1a7b5833f9aa00b07c84e92fbce95e00208ce8" +checksum = "44bec8e5c9d09e439c4335b1af0abaab56dcf3b94999a936e1bb47b9134288f0" dependencies = [ "heck", "proc-macro-error", @@ -1391,7 +1391,7 @@ name = "example-emoji-verification" version = "0.1.0" dependencies = [ "anyhow", - "clap 4.1.6", + "clap 4.1.8", "futures", "matrix-sdk", "tokio", @@ -1463,7 +1463,7 @@ name = "example-timeline" version = "0.1.0" dependencies = [ "anyhow", - "clap 4.1.6", + "clap 4.1.8", "futures", "matrix-sdk", "tokio", @@ -2286,7 +2286,7 @@ version = "0.2.0" dependencies = [ "app_dirs2", "chrono", - "clap 4.1.6", + "clap 4.1.8", "dialoguer", "eyeball", "eyeball-im", @@ -6254,7 +6254,7 @@ name = "xtask" version = "0.1.0" dependencies = [ "camino", - "clap 4.1.6", + "clap 4.1.8", "fs_extra", "serde", "serde_json", From cc35ee72b80a520385405b288f3112b9c861ba77 Mon Sep 17 00:00:00 2001 From: Jonas Platte Date: Fri, 10 Mar 2023 12:09:08 +0100 Subject: [PATCH 6/9] Upgrade eyeball to 0.4.0 --- Cargo.lock | 10 ++-- Cargo.toml | 2 +- bindings/matrix-sdk-ffi/src/sliding_sync.rs | 2 +- crates/matrix-sdk-base/src/client.rs | 13 ++++-- crates/matrix-sdk-base/src/store/mod.rs | 10 ++-- .../src/verification/qrcode.rs | 36 +++++++-------- .../src/verification/requests.rs | 46 +++++++++---------- .../src/verification/sas/mod.rs | 18 ++++---- crates/matrix-sdk/src/client/mod.rs | 6 +-- crates/matrix-sdk/src/sliding_sync/builder.rs | 2 +- .../src/sliding_sync/list/builder.rs | 2 +- .../matrix-sdk/src/sliding_sync/list/mod.rs | 2 +- .../sliding_sync/list/request_generator.rs | 2 +- crates/matrix-sdk/src/sliding_sync/mod.rs | 2 +- crates/matrix-sdk/src/sliding_sync/room.rs | 2 +- labs/jack-in/src/app/model.rs | 2 +- labs/jack-in/src/client/state.rs | 2 +- .../sliding-sync-integration-test/src/lib.rs | 2 +- 18 files changed, 81 insertions(+), 80 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6fe0fdac899..232ca8abb44 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1484,14 +1484,12 @@ dependencies = [ [[package]] name = "eyeball" -version = "0.1.5" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3609348664c9c1b07d9ff3933a466beaa4d197fb393b5d41ffda349458867626" +checksum = "3c7be1d67275032c662cadf525a79aef6909469579c5d81c69c148f7257257af" dependencies = [ "futures-core", "readlock", - "tokio", - "tokio-stream", ] [[package]] @@ -4133,9 +4131,9 @@ dependencies = [ [[package]] name = "readlock" -version = "0.1.1" +version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a8f0cb425ba44d6bde0d063097aae68a2ce31e1d5359e96427704f33f4f73d9" +checksum = "35c8a22130504d1f661d1bc373b424f2d45910fa5319132d903a4074e1527b2e" [[package]] name = "redox_syscall" diff --git a/Cargo.toml b/Cargo.toml index 56fff0ad7e5..e87e1db89ad 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,7 +27,7 @@ base64 = "0.21.0" byteorder = "1.4.3" ctor = "0.1.26" dashmap = "5.2.0" -eyeball = "0.1.4" +eyeball = "0.4.0" eyeball-im = "0.1.0" futures-util = { version = "0.3.26", default-features = false, features = ["alloc"] } http = "0.2.6" diff --git a/bindings/matrix-sdk-ffi/src/sliding_sync.rs b/bindings/matrix-sdk-ffi/src/sliding_sync.rs index 072bee6fc76..fe372da9239 100644 --- a/bindings/matrix-sdk-ffi/src/sliding_sync.rs +++ b/bindings/matrix-sdk-ffi/src/sliding_sync.rs @@ -1,7 +1,7 @@ use std::sync::{Arc, RwLock}; use anyhow::Context; -use eyeball::Observable; +use eyeball::unique::Observable; use eyeball_im::VectorDiff; use futures_util::{future::join, pin_mut, StreamExt}; use matrix_sdk::ruma::{ diff --git a/crates/matrix-sdk-base/src/client.rs b/crates/matrix-sdk-base/src/client.rs index c46b3ae023f..2afdd6f026a 100644 --- a/crates/matrix-sdk-base/src/client.rs +++ b/crates/matrix-sdk-base/src/client.rs @@ -17,12 +17,11 @@ use std::{ borrow::Borrow, collections::{BTreeMap, BTreeSet}, fmt, - sync::RwLockReadGuard as StdRwLockReadGuard, }; #[cfg(feature = "e2e-encryption")] use std::{ops::Deref, sync::Arc}; -use eyeball::Observable; +use eyeball::Subscriber; use matrix_sdk_common::{instant::Instant, locks::RwLock}; #[cfg(feature = "e2e-encryption")] use matrix_sdk_crypto::{ @@ -134,10 +133,14 @@ impl BaseClient { /// Get the session tokens. /// - /// If the client is currently logged in, this will return a + /// This returns a subscriber object that you can use both to + /// [`get`](SharedSubscriber::get) the current value as well as to react to + /// changes to the tokens. + /// + /// If the client is currently logged in, the inner value is a /// [`SessionTokens`] object which contains the access token and optional - /// refresh token. Otherwise it returns `None`. - pub fn session_tokens(&self) -> StdRwLockReadGuard<'_, Observable>> { + /// refresh token. Otherwise it is `None`. + pub fn session_tokens(&self) -> Subscriber> { self.store.session_tokens() } diff --git a/crates/matrix-sdk-base/src/store/mod.rs b/crates/matrix-sdk-base/src/store/mod.rs index 88104e53933..11dc6b5121b 100644 --- a/crates/matrix-sdk-base/src/store/mod.rs +++ b/crates/matrix-sdk-base/src/store/mod.rs @@ -27,10 +27,10 @@ use std::{ pin::Pin, result::Result as StdResult, str::Utf8Error, - sync::{Arc, RwLockReadGuard as StdRwLockReadGuard}, + sync::Arc, }; -use eyeball::{Observable, SharedObservable}; +use eyeball::{shared::Observable as SharedObservable, Subscriber}; use once_cell::sync::OnceCell; #[cfg(any(test, feature = "testing"))] @@ -210,8 +210,8 @@ impl Store { /// The [`SessionTokens`] containing our access token and optional refresh /// token. - pub fn session_tokens(&self) -> StdRwLockReadGuard<'_, Observable>> { - self.session_tokens.read() + pub fn session_tokens(&self) -> Subscriber> { + self.session_tokens.subscribe() } /// Set the current [`SessionTokens`]. @@ -223,7 +223,7 @@ impl Store { /// token and optional refresh token. pub fn session(&self) -> Option { let meta = self.session_meta.get()?; - let tokens = self.session_tokens().clone()?; + let tokens = self.session_tokens().get()?; Some(Session::from_parts(meta.to_owned(), tokens)) } diff --git a/crates/matrix-sdk-crypto/src/verification/qrcode.rs b/crates/matrix-sdk-crypto/src/verification/qrcode.rs index 721da93ee05..f9f999dd1b5 100644 --- a/crates/matrix-sdk-crypto/src/verification/qrcode.rs +++ b/crates/matrix-sdk-crypto/src/verification/qrcode.rs @@ -14,7 +14,7 @@ use std::sync::Arc; -use eyeball::{Observable, SharedObservable}; +use eyeball::shared::{Observable as SharedObservable, ObservableWriteGuard}; use futures_core::Stream; use futures_util::StreamExt; use matrix_sdk_qrcode::{ @@ -157,12 +157,12 @@ impl QrVerification { /// When the verification object is in this state it's required that the /// user confirms that the other side has scanned the QR code. pub fn has_been_scanned(&self) -> bool { - matches!(**self.state.read(), InnerState::Scanned(_)) + matches!(*self.state.read(), InnerState::Scanned(_)) } /// Has the scanning of the QR code been confirmed by us. pub fn has_been_confirmed(&self) -> bool { - matches!(**self.state.read(), InnerState::Confirmed(_)) + matches!(*self.state.read(), InnerState::Confirmed(_)) } /// Get our own user id. @@ -189,7 +189,7 @@ impl QrVerification { /// Get info about the cancellation if the verification flow has been /// cancelled. pub fn cancel_info(&self) -> Option { - if let InnerState::Cancelled(c) = &**self.state.read() { + if let InnerState::Cancelled(c) = &*self.state.read() { Some(c.state.clone().into()) } else { None @@ -198,12 +198,12 @@ impl QrVerification { /// Has the verification flow completed. pub fn is_done(&self) -> bool { - matches!(**self.state.read(), InnerState::Done(_)) + matches!(*self.state.read(), InnerState::Done(_)) } /// Has the verification flow been cancelled. pub fn is_cancelled(&self) -> bool { - matches!(**self.state.read(), InnerState::Cancelled(_)) + matches!(*self.state.read(), InnerState::Cancelled(_)) } /// Is this a verification that is veryfying one of our own devices @@ -214,7 +214,7 @@ impl QrVerification { /// Have we successfully scanned the QR code and are able to send a /// reciprocation event. pub fn reciprocated(&self) -> bool { - matches!(**self.state.read(), InnerState::Reciprocated(_)) + matches!(*self.state.read(), InnerState::Reciprocated(_)) } /// Get the unique ID that identifies this QR code verification flow. @@ -277,13 +277,13 @@ impl QrVerification { let new_state = QrState::::new(true, code); let content = new_state.as_content(self.flow_id()); - match &**state { + match &*state { InnerState::Confirmed(_) | InnerState::Created(_) | InnerState::Scanned(_) | InnerState::Reciprocated(_) | InnerState::Done(_) => { - Observable::set(&mut state, InnerState::Cancelled(new_state)); + ObservableWriteGuard::set(&mut state, InnerState::Cancelled(new_state)); Some(self.content_to_request(content)) } InnerState::Cancelled(_) => None, @@ -296,7 +296,7 @@ impl QrVerification { /// This will return some `OutgoingContent` if the object is in the correct /// state to start the verification flow, otherwise `None`. pub fn reciprocate(&self) -> Option { - match &**self.state.read() { + match &*self.state.read() { InnerState::Reciprocated(s) => { Some(self.content_to_request(s.as_content(self.flow_id()))) } @@ -312,11 +312,11 @@ impl QrVerification { pub fn confirm_scanning(&self) -> Option { let mut state = self.state.write(); - match &**state { + match &*state { InnerState::Scanned(s) => { let new_state = s.clone().confirm_scanning(); let content = new_state.as_content(&self.flow_id); - Observable::set(&mut state, InnerState::Confirmed(new_state)); + ObservableWriteGuard::set(&mut state, InnerState::Confirmed(new_state)); Some(self.content_to_request(content)) } @@ -432,15 +432,15 @@ impl QrVerification { ) -> Option { let mut state = self.state.write(); - match &**state { + match &*state { InnerState::Created(s) => match s.clone().receive_reciprocate(content) { Ok(s) => { - Observable::set(&mut state, InnerState::Scanned(s)); + ObservableWriteGuard::set(&mut state, InnerState::Scanned(s)); None } Err(s) => { let content = s.as_content(self.flow_id()); - Observable::set(&mut state, InnerState::Cancelled(s)); + ObservableWriteGuard::set(&mut state, InnerState::Cancelled(s)); Some(self.content_to_request(content)) } }, @@ -456,7 +456,7 @@ impl QrVerification { if sender == self.other_user_id() { let mut state = self.state.write(); - let new_state = match &**state { + let new_state = match &*state { InnerState::Created(s) => s.clone().into_cancelled(content), InnerState::Scanned(s) => s.clone().into_cancelled(content), InnerState::Confirmed(s) => s.clone().into_cancelled(content), @@ -470,7 +470,7 @@ impl QrVerification { "Cancelling a QR verification, other user has cancelled" ); - Observable::set(&mut state, InnerState::Cancelled(new_state)); + ObservableWriteGuard::set(&mut state, InnerState::Cancelled(new_state)); } } @@ -670,7 +670,7 @@ impl QrVerification { /// To listen to changes to the [`QrVerificationState`] use the /// [`QrVerification::changes`] method. pub fn state(&self) -> QrVerificationState { - (&**self.state.read()).into() + (&*self.state.read()).into() } } diff --git a/crates/matrix-sdk-crypto/src/verification/requests.rs b/crates/matrix-sdk-crypto/src/verification/requests.rs index c2a199139cb..e63bcd5e325 100644 --- a/crates/matrix-sdk-crypto/src/verification/requests.rs +++ b/crates/matrix-sdk-crypto/src/verification/requests.rs @@ -14,7 +14,7 @@ use std::{sync::Arc, time::Duration}; -use eyeball::{Observable, SharedObservable}; +use eyeball::shared::{Observable as SharedObservable, ObservableWriteGuard}; use futures_core::Stream; use futures_util::StreamExt; use matrix_sdk_common::instant::Instant; @@ -159,7 +159,7 @@ impl RequestHandle { pub fn cancel_with_code(&self, cancel_code: &CancelCode) { let mut guard = self.inner.write(); if let Some(updated) = guard.cancel(true, cancel_code) { - Observable::set(&mut guard, updated); + ObservableWriteGuard::set(&mut guard, updated); } } } @@ -207,7 +207,7 @@ impl VerificationRequest { pub(crate) fn request_to_device(&self) -> ToDeviceRequest { let inner = self.inner.read(); - let methods = if let InnerRequest::Created(c) = &**inner { + let methods = if let InnerRequest::Created(c) = &*inner { c.state.our_methods.clone() } else { SUPPORTED_METHODS.to_vec() @@ -263,7 +263,7 @@ impl VerificationRequest { /// The id of the other device that is participating in this verification. pub fn other_device_id(&self) -> Option { - match &**self.inner.read() { + match &*self.inner.read() { InnerRequest::Requested(r) => Some(r.state.other_device_id.clone()), InnerRequest::Ready(r) => Some(r.state.other_device_id.clone()), InnerRequest::Created(_) @@ -284,7 +284,7 @@ impl VerificationRequest { /// Get info about the cancellation if the verification request has been /// cancelled. pub fn cancel_info(&self) -> Option { - if let InnerRequest::Cancelled(c) = &**self.inner.read() { + if let InnerRequest::Cancelled(c) = &*self.inner.read() { Some(c.state.clone().into()) } else { None @@ -293,12 +293,12 @@ impl VerificationRequest { /// Has the verification request been answered by another device. pub fn is_passive(&self) -> bool { - matches!(**self.inner.read(), InnerRequest::Passive(_)) + matches!(*self.inner.read(), InnerRequest::Passive(_)) } /// Is the verification request ready to start a verification flow. pub fn is_ready(&self) -> bool { - matches!(**self.inner.read(), InnerRequest::Ready(_)) + matches!(*self.inner.read(), InnerRequest::Ready(_)) } /// Has the verification flow timed out. @@ -311,7 +311,7 @@ impl VerificationRequest { /// Will be present only if the other side requested the verification or if /// we're in the ready state. pub fn their_supported_methods(&self) -> Option> { - match &**self.inner.read() { + match &*self.inner.read() { InnerRequest::Requested(r) => Some(r.state.their_methods.clone()), InnerRequest::Ready(r) => Some(r.state.their_methods.clone()), InnerRequest::Created(_) @@ -326,7 +326,7 @@ impl VerificationRequest { /// Will be present only we requested the verification or if we're in the /// ready state. pub fn our_supported_methods(&self) -> Option> { - match &**self.inner.read() { + match &*self.inner.read() { InnerRequest::Created(r) => Some(r.state.our_methods.clone()), InnerRequest::Ready(r) => Some(r.state.our_methods.clone()), InnerRequest::Requested(_) @@ -353,20 +353,20 @@ impl VerificationRequest { /// Has the verification flow that was started with this request finished. pub fn is_done(&self) -> bool { - matches!(**self.inner.read(), InnerRequest::Done(_)) + matches!(*self.inner.read(), InnerRequest::Done(_)) } /// Has the verification flow that was started with this request been /// cancelled. pub fn is_cancelled(&self) -> bool { - matches!(**self.inner.read(), InnerRequest::Cancelled(_)) + matches!(*self.inner.read(), InnerRequest::Cancelled(_)) } /// Generate a QR code that can be used by another client to start a QR code /// based verification. #[cfg(feature = "qrcode")] pub async fn generate_qr_code(&self) -> Result, CryptoStoreError> { - let inner = self.inner.get(); + let inner = self.inner.read(); inner.generate_qr_code(self.we_started, self.inner.clone().into()).await } @@ -383,7 +383,7 @@ impl VerificationRequest { &self, data: QrVerificationData, ) -> Result, ScanError> { - let future = if let InnerRequest::Ready(r) = &**self.inner.read() { + let future = if let InnerRequest::Ready(r) = &*self.inner.read() { QrVerification::from_scan( r.store.clone(), r.other_user_id.clone(), @@ -465,7 +465,7 @@ impl VerificationRequest { return None; }; - Observable::set(&mut guard, updated); + ObservableWriteGuard::set(&mut guard, updated); let request = match content { OutgoingContent::ToDevice(content) => ToDeviceRequest::with_id( @@ -507,14 +507,14 @@ impl VerificationRequest { fn cancel_with_code(&self, cancel_code: CancelCode) -> Option { let mut guard = self.inner.write(); - let send_to_everyone = self.we_started() && matches!(**guard, InnerRequest::Created(_)); + let send_to_everyone = self.we_started() && matches!(*guard, InnerRequest::Created(_)); let other_device = guard.other_device_id(); if let Some(updated) = guard.cancel(true, &cancel_code) { - Observable::set(&mut guard, updated); + ObservableWriteGuard::set(&mut guard, updated); } - let content = if let InnerRequest::Cancelled(c) = &**guard { + let content = if let InnerRequest::Cancelled(c) = &*guard { Some(c.state.as_content(self.flow_id())) } else { None @@ -632,10 +632,10 @@ impl VerificationRequest { pub(crate) fn receive_ready(&self, sender: &UserId, content: &ReadyContent<'_>) { let mut guard = self.inner.write(); - match &**guard { + match &*guard { InnerRequest::Created(s) => { let new_value = InnerRequest::Ready(s.clone().into_ready(sender, content)); - Observable::set(&mut guard, new_value); + ObservableWriteGuard::set(&mut guard, new_value); if let Some(request) = self.cancel_for_other_devices(CancelCode::Accepted, Some(content.from_device())) @@ -647,7 +647,7 @@ impl VerificationRequest { if sender == self.own_user_id() && content.from_device() != self.account.device_id() { let new_value = InnerRequest::Passive(s.clone().into_passive(content)); - Observable::set(&mut guard, new_value); + ObservableWriteGuard::set(&mut guard, new_value); } } InnerRequest::Ready(_) @@ -686,7 +686,7 @@ impl VerificationRequest { let mut guard = self.inner.write(); if let Some(updated) = guard.receive_done(content) { - Observable::set(&mut guard, updated); + ObservableWriteGuard::set(&mut guard, updated); } } } @@ -703,7 +703,7 @@ impl VerificationRequest { ); let mut guard = self.inner.write(); if let Some(updated) = guard.cancel(false, content.cancel_code()) { - Observable::set(&mut guard, updated); + ObservableWriteGuard::set(&mut guard, updated); } if self.we_started() { @@ -781,7 +781,7 @@ impl VerificationRequest { /// To listen to changes to the [`VerificationRequestState`] use the /// [`VerificationRequest::changes`] method. pub fn state(&self) -> VerificationRequestState { - (&**self.inner.read()).into() + (&*self.inner.read()).into() } } diff --git a/crates/matrix-sdk-crypto/src/verification/sas/mod.rs b/crates/matrix-sdk-crypto/src/verification/sas/mod.rs index 87f4717f81f..efdf366c2b3 100644 --- a/crates/matrix-sdk-crypto/src/verification/sas/mod.rs +++ b/crates/matrix-sdk-crypto/src/verification/sas/mod.rs @@ -18,7 +18,7 @@ mod sas_state; use std::sync::Arc; -use eyeball::{Observable, SharedObservable}; +use eyeball::shared::{Observable as SharedObservable, ObservableWriteGuard}; use futures_core::Stream; use futures_util::StreamExt; use inner_sas::InnerSas; @@ -294,7 +294,7 @@ impl Sas { /// Get info about the cancellation if the verification flow has been /// cancelled. pub fn cancel_info(&self) -> Option { - if let InnerSas::Cancelled(c) = &**self.inner.read() { + if let InnerSas::Cancelled(c) = &*self.inner.read() { Some(c.state.as_ref().clone().into()) } else { None @@ -452,7 +452,7 @@ impl Sas { let methods = settings.allowed_methods; if let Some((sas, content)) = sas.accept(methods) { - Observable::set(&mut guard, sas); + ObservableWriteGuard::set(&mut guard, sas); Some(match content { OwnedAcceptContent::ToDevice(c) => { @@ -500,7 +500,7 @@ impl Sas { let sas: InnerSas = (*guard).clone(); let (sas, contents) = sas.confirm(); - Observable::set(&mut guard, sas); + ObservableWriteGuard::set(&mut guard, sas); (contents, guard.is_done()) }; @@ -575,7 +575,7 @@ impl Sas { let sas: InnerSas = (*guard).clone(); let (sas, content) = sas.cancel(true, code); - Observable::set(&mut guard, sas); + ObservableWriteGuard::set(&mut guard, sas); content.map(|c| match c { OutgoingContent::Room(room_id, content) => { @@ -739,11 +739,11 @@ impl Sas { /// Get the current state of the verification process. pub fn state(&self) -> SasState { - (&**self.inner.read()).into() + (&*self.inner.read()).into() } fn state_debug(&self) -> State { - (&**self.inner.read()).into() + (&*self.inner.read()).into() } pub(crate) fn receive_any_event( @@ -758,7 +758,7 @@ impl Sas { let sas: InnerSas = (*guard).clone(); let (sas, content) = sas.receive_any_event(sender, content); - Observable::set(&mut guard, sas); + ObservableWriteGuard::set(&mut guard, sas); content }; @@ -783,7 +783,7 @@ impl Sas { let sas: InnerSas = (*guard).clone(); if let Some(sas) = sas.mark_request_as_sent(request_id) { - Observable::set(&mut guard, sas); + ObservableWriteGuard::set(&mut guard, sas); } else { error!( flow_id = self.flow_id().as_str(), diff --git a/crates/matrix-sdk/src/client/mod.rs b/crates/matrix-sdk/src/client/mod.rs index d5de7647acd..d1934173cf4 100644 --- a/crates/matrix-sdk/src/client/mod.rs +++ b/crates/matrix-sdk/src/client/mod.rs @@ -25,7 +25,6 @@ use std::{ #[cfg(target_arch = "wasm32")] use async_once_cell::OnceCell; use dashmap::DashMap; -use eyeball::Observable; use futures_core::Stream; use futures_util::StreamExt; use matrix_sdk_base::{ @@ -367,7 +366,7 @@ impl Client { /// /// [refreshing access tokens]: https://spec.matrix.org/v1.3/client-server-api/#refreshing-access-tokens pub fn session_tokens(&self) -> Option { - self.base_client().session_tokens().clone() + self.base_client().session_tokens().get() } /// Get the current access token for this session. @@ -502,7 +501,7 @@ impl Client { /// /// [refreshing access tokens]: https://spec.matrix.org/v1.3/client-server-api/#refreshing-access-tokens pub fn session_tokens_stream(&self) -> impl Stream> { - Observable::subscribe(&self.base_client().session_tokens()) + self.base_client().session_tokens() } /// Get the whole session info of this client. @@ -2431,6 +2430,7 @@ impl Client { /// /// # anyhow::Ok(()) }); /// ``` + #[allow(unknown_lints, clippy::let_with_type_underscore)] // triggered by instrument macro #[instrument(skip(self), parent = &self.inner.root_span)] pub async fn sync_stream( &self, diff --git a/crates/matrix-sdk/src/sliding_sync/builder.rs b/crates/matrix-sdk/src/sliding_sync/builder.rs index 27e203c127c..1147874876d 100644 --- a/crates/matrix-sdk/src/sliding_sync/builder.rs +++ b/crates/matrix-sdk/src/sliding_sync/builder.rs @@ -4,7 +4,7 @@ use std::{ sync::{Mutex, RwLock as StdRwLock}, }; -use eyeball::Observable; +use eyeball::unique::Observable; use ruma::{ api::client::sync::sync_events::v4::{ self, AccountDataConfig, E2EEConfig, ExtensionsConfig, ReceiptsConfig, ToDeviceConfig, diff --git a/crates/matrix-sdk/src/sliding_sync/list/builder.rs b/crates/matrix-sdk/src/sliding_sync/list/builder.rs index 36cd52f2e08..772e4d6fcd5 100644 --- a/crates/matrix-sdk/src/sliding_sync/list/builder.rs +++ b/crates/matrix-sdk/src/sliding_sync/list/builder.rs @@ -5,7 +5,7 @@ use std::{ sync::{atomic::AtomicBool, Arc, RwLock as StdRwLock}, }; -use eyeball::Observable; +use eyeball::unique::Observable; use eyeball_im::ObservableVector; use im::Vector; use ruma::{api::client::sync::sync_events::v4, events::StateEventType, UInt}; diff --git a/crates/matrix-sdk/src/sliding_sync/list/mod.rs b/crates/matrix-sdk/src/sliding_sync/list/mod.rs index d221a7530bc..af7c706d3e0 100644 --- a/crates/matrix-sdk/src/sliding_sync/list/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/list/mod.rs @@ -12,7 +12,7 @@ use std::{ }; pub use builder::*; -use eyeball::Observable; +use eyeball::unique::Observable; use eyeball_im::{ObservableVector, VectorDiff}; use futures_core::Stream; use im::Vector; diff --git a/crates/matrix-sdk/src/sliding_sync/list/request_generator.rs b/crates/matrix-sdk/src/sliding_sync/list/request_generator.rs index b7dc95b15b5..76281e0b5c0 100644 --- a/crates/matrix-sdk/src/sliding_sync/list/request_generator.rs +++ b/crates/matrix-sdk/src/sliding_sync/list/request_generator.rs @@ -1,6 +1,6 @@ use std::cmp::min; -use eyeball::Observable; +use eyeball::unique::Observable; use ruma::{api::client::sync::sync_events::v4, assign, OwnedRoomId, UInt}; use tracing::{error, instrument, trace}; diff --git a/crates/matrix-sdk/src/sliding_sync/mod.rs b/crates/matrix-sdk/src/sliding_sync/mod.rs index b00cf0d9b36..7fdfc5755b7 100644 --- a/crates/matrix-sdk/src/sliding_sync/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/mod.rs @@ -611,7 +611,7 @@ use std::{ pub use builder::*; pub use client::*; pub use error::*; -use eyeball::Observable; +use eyeball::unique::Observable; use futures_core::stream::Stream; pub use list::*; use matrix_sdk_base::sync::SyncResponse; diff --git a/crates/matrix-sdk/src/sliding_sync/room.rs b/crates/matrix-sdk/src/sliding_sync/room.rs index 75d8c47912c..19c1e272801 100644 --- a/crates/matrix-sdk/src/sliding_sync/room.rs +++ b/crates/matrix-sdk/src/sliding_sync/room.rs @@ -7,7 +7,7 @@ use std::{ }, }; -use eyeball::Observable; +use eyeball::unique::Observable; use eyeball_im::ObservableVector; use im::Vector; use matrix_sdk_base::deserialized_responses::SyncTimelineEvent; diff --git a/labs/jack-in/src/app/model.rs b/labs/jack-in/src/app/model.rs index 1c626db602a..2f833ec996e 100644 --- a/labs/jack-in/src/app/model.rs +++ b/labs/jack-in/src/app/model.rs @@ -212,7 +212,7 @@ impl Update for Model { None } Msg::SendMessage(m) => { - if let Some(tl) = &**self.sliding_sync.room_timeline.read() { + if let Some(tl) = &*self.sliding_sync.room_timeline.read() { block_on(async move { // fire and forget tl.send(RoomMessageEventContent::text_plain(m).into(), None).await; diff --git a/labs/jack-in/src/client/state.rs b/labs/jack-in/src/client/state.rs index 01ddd9bed84..5f60af891ca 100644 --- a/labs/jack-in/src/client/state.rs +++ b/labs/jack-in/src/client/state.rs @@ -3,7 +3,7 @@ use std::{ time::{Duration, Instant}, }; -use eyeball::SharedObservable; +use eyeball::shared::Observable as SharedObservable; use eyeball_im::{ObservableVector, VectorDiff}; use futures::{pin_mut, StreamExt}; use matrix_sdk::{ diff --git a/testing/sliding-sync-integration-test/src/lib.rs b/testing/sliding-sync-integration-test/src/lib.rs index 8aa4a883f07..90bd8b35481 100644 --- a/testing/sliding-sync-integration-test/src/lib.rs +++ b/testing/sliding-sync-integration-test/src/lib.rs @@ -73,7 +73,7 @@ mod tests { use anyhow::{bail, Context}; use assert_matches::assert_matches; - use eyeball::Observable; + use eyeball::unique::Observable; use eyeball_im::VectorDiff; use futures::{pin_mut, stream::StreamExt}; use matrix_sdk::{ From 595e34aad5b839039faf1943c18144b279ecc92c Mon Sep 17 00:00:00 2001 From: Jonas Platte Date: Sat, 11 Mar 2023 12:31:24 +0100 Subject: [PATCH 7/9] Silence new clippy lint for generated code --- crates/matrix-sdk-base/src/client.rs | 2 +- crates/matrix-sdk/src/sliding_sync/mod.rs | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/crates/matrix-sdk-base/src/client.rs b/crates/matrix-sdk-base/src/client.rs index 2afdd6f026a..2fdf4735604 100644 --- a/crates/matrix-sdk-base/src/client.rs +++ b/crates/matrix-sdk-base/src/client.rs @@ -134,7 +134,7 @@ impl BaseClient { /// Get the session tokens. /// /// This returns a subscriber object that you can use both to - /// [`get`](SharedSubscriber::get) the current value as well as to react to + /// [`get`](Subscriber::get) the current value as well as to react to /// changes to the tokens. /// /// If the client is currently logged in, the inner value is a diff --git a/crates/matrix-sdk/src/sliding_sync/mod.rs b/crates/matrix-sdk/src/sliding_sync/mod.rs index 7fdfc5755b7..815db31b882 100644 --- a/crates/matrix-sdk/src/sliding_sync/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/mod.rs @@ -1141,8 +1141,9 @@ impl SlidingSync { /// /// This stream will send requests and will handle responses automatically, /// hence updating the lists. + #[allow(unknown_lints, clippy::let_with_type_underscore)] // triggered by instrument macro #[instrument(name = "sync_stream", skip_all, parent = &self.inner.client.inner.root_span)] - pub fn stream(&self) -> impl Stream> + '_ { + pub fn stream<'a>(&'a self) -> impl Stream> + 'a { // Collect all the lists that need to be updated. let list_generators = { let mut list_generators = BTreeMap::new(); From 8d83a996a69ffcfca786dbc343dc9b862a9965df Mon Sep 17 00:00:00 2001 From: Jonas Platte Date: Sat, 11 Mar 2023 12:45:09 +0100 Subject: [PATCH 8/9] Handle another new clippy lint Issue about false positives: https://github.com/rust-lang/rust-clippy/issues/10482 --- bindings/matrix-sdk-ffi/src/authentication_service.rs | 2 +- bindings/matrix-sdk-ffi/src/client.rs | 3 ++- bindings/matrix-sdk-ffi/src/room.rs | 3 ++- bindings/matrix-sdk-ffi/src/sliding_sync.rs | 4 ++-- crates/matrix-sdk-crypto/src/identities/manager.rs | 1 + crates/matrix-sdk/tests/integration/room/joined.rs | 1 + crates/matrix-sdk/tests/integration/room/timeline.rs | 1 + 7 files changed, 10 insertions(+), 5 deletions(-) diff --git a/bindings/matrix-sdk-ffi/src/authentication_service.rs b/bindings/matrix-sdk-ffi/src/authentication_service.rs index bce8f761e2d..b6313cdc693 100644 --- a/bindings/matrix-sdk-ffi/src/authentication_service.rs +++ b/bindings/matrix-sdk-ffi/src/authentication_service.rs @@ -147,7 +147,7 @@ impl AuthenticationService { }) .map_err(AuthenticationError::from)?; - let details = RUNTIME.block_on(async { self.details_from_client(&client).await })?; + let details = RUNTIME.block_on(self.details_from_client(&client))?; // Now we've verified that it's a valid homeserver, make sure // there's a sliding sync proxy available one way or another. diff --git a/bindings/matrix-sdk-ffi/src/client.rs b/bindings/matrix-sdk-ffi/src/client.rs index 7176bd88abb..77499e41338 100644 --- a/bindings/matrix-sdk-ffi/src/client.rs +++ b/bindings/matrix-sdk-ffi/src/client.rs @@ -377,7 +377,8 @@ impl Client { impl Client { /// The homeserver this client is configured to use. pub fn homeserver(&self) -> String { - RUNTIME.block_on(async move { self.async_homeserver().await }) + #[allow(unknown_lints, clippy::redundant_async_block)] // false positive + RUNTIME.block_on(self.async_homeserver()) } pub fn rooms(&self) -> Vec> { diff --git a/bindings/matrix-sdk-ffi/src/room.rs b/bindings/matrix-sdk-ffi/src/room.rs index dc397b24074..7f8ec161b82 100644 --- a/bindings/matrix-sdk-ffi/src/room.rs +++ b/bindings/matrix-sdk-ffi/src/room.rs @@ -250,7 +250,8 @@ impl Room { .unwrap() .get_or_insert_with(|| { let room = self.room.clone(); - let timeline = RUNTIME.block_on(async move { room.timeline().await }); + #[allow(unknown_lints, clippy::redundant_async_block)] // false positive + let timeline = RUNTIME.block_on(room.timeline()); Arc::new(timeline) }) .clone(); diff --git a/bindings/matrix-sdk-ffi/src/sliding_sync.rs b/bindings/matrix-sdk-ffi/src/sliding_sync.rs index fe372da9239..4a70a269523 100644 --- a/bindings/matrix-sdk-ffi/src/sliding_sync.rs +++ b/bindings/matrix-sdk-ffi/src/sliding_sync.rs @@ -194,8 +194,8 @@ impl SlidingSyncRoom { } }; - let (timeline_items, timeline_stream) = - RUNTIME.block_on(async { timeline.subscribe().await }); + #[allow(unknown_lints, clippy::redundant_async_block)] // false positive + let (timeline_items, timeline_stream) = RUNTIME.block_on(timeline.subscribe()); let handle_events = async move { let listener: Arc = listener.into(); diff --git a/crates/matrix-sdk-crypto/src/identities/manager.rs b/crates/matrix-sdk-crypto/src/identities/manager.rs index 1943e4d2309..41bedaca7b1 100644 --- a/crates/matrix-sdk-crypto/src/identities/manager.rs +++ b/crates/matrix-sdk-crypto/src/identities/manager.rs @@ -1066,6 +1066,7 @@ pub(crate) mod tests { let listener = manager.listen_for_received_queries(); + #[allow(unknown_lints, clippy::redundant_async_block)] // false positive let task = tokio::task::spawn(async move { listener.wait(Duration::from_secs(10)).await }); manager diff --git a/crates/matrix-sdk/tests/integration/room/joined.rs b/crates/matrix-sdk/tests/integration/room/joined.rs index 9affccf8eb0..5df69e9c6ad 100644 --- a/crates/matrix-sdk/tests/integration/room/joined.rs +++ b/crates/matrix-sdk/tests/integration/room/joined.rs @@ -543,6 +543,7 @@ async fn fetch_members_deduplication() { // Create N tasks that try to fetch the members. for _ in 0..5 { + #[allow(unknown_lints, clippy::redundant_async_block)] // false positive let task = tokio::spawn({ let room = room.clone(); async move { room.sync_members().await } diff --git a/crates/matrix-sdk/tests/integration/room/timeline.rs b/crates/matrix-sdk/tests/integration/room/timeline.rs index b06fa4eb1d5..e56533050d6 100644 --- a/crates/matrix-sdk/tests/integration/room/timeline.rs +++ b/crates/matrix-sdk/tests/integration/room/timeline.rs @@ -180,6 +180,7 @@ async fn echo() { // Don't move the original timeline, it must live until the end of the test let timeline = timeline.clone(); + #[allow(unknown_lints, clippy::redundant_async_block)] // false positive let send_hdl = spawn(async move { timeline .send(RoomMessageEventContent::text_plain("Hello, World!").into(), Some(txn_id)) From 3c25ad1489727a53f3c4dfeb581869806417ba6f Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Mon, 13 Mar 2023 09:05:32 +0100 Subject: [PATCH 9/9] doc(sdk): Remove a `FIXME` that is actually fixed. The #1386 issue is closed. Yepee. --- crates/matrix-sdk/src/sliding_sync/mod.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/crates/matrix-sdk/src/sliding_sync/mod.rs b/crates/matrix-sdk/src/sliding_sync/mod.rs index 815db31b882..a3b0a94a715 100644 --- a/crates/matrix-sdk/src/sliding_sync/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/mod.rs @@ -1056,10 +1056,6 @@ impl SlidingSync { // Sending the `/sync` request out when end-to-end encryption is enabled means // that we need to also send out any outgoing e2ee related request out // coming from the `OlmMachine::outgoing_requests()` method. - // - // FIXME: Processing outgiong requests at the same time while a `/sync` is in - // flight is currently not supported. - // More info: [#1386](https://github.com/matrix-org/matrix-rust-sdk/issues/1386). #[cfg(feature = "e2e-encryption")] let response = { let (e2ee_uploads, response) =