Skip to content

Commit

Permalink
Rewrite wait_if_user_pending to fix races
Browse files Browse the repository at this point in the history
  • Loading branch information
richvdh committed Mar 9, 2023
1 parent 508f1bc commit 16871d1
Show file tree
Hide file tree
Showing 6 changed files with 354 additions and 12 deletions.
164 changes: 164 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/matrix-sdk-crypto/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ testing = ["dep:http"]
[dependencies]
aes = "0.8.1"
atomic = "0.5.1"
async-std = { version = "1.12.0", features = ["unstable"] }
async-trait = { workspace = true }
base64 = { workspace = true }
bs58 = { version = "0.4.0", optional = true }
Expand Down
3 changes: 3 additions & 0 deletions crates/matrix-sdk-crypto/src/identities/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ pub(crate) struct KeysQueryListener {
pub(crate) enum UserKeyQueryResult {
WasPending,
WasNotPending,

/// A query was pending, but we gave up waiting
TimeoutExpired,
}

impl KeysQueryListener {
Expand Down
4 changes: 1 addition & 3 deletions crates/matrix-sdk-crypto/src/machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1259,9 +1259,7 @@ impl OlmMachine {

async fn wait_if_user_pending(&self, user_id: &UserId, timeout: Option<Duration>) {
if let Some(timeout) = timeout {
let listener = self.identity_manager.listen_for_received_queries();

let _ = listener.wait_if_user_pending(timeout, user_id).await;
self.store.wait_if_user_key_query_pending(timeout, user_id).await;
}
}

Expand Down
67 changes: 62 additions & 5 deletions crates/matrix-sdk-crypto/src/session_manager/sessions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,11 +176,11 @@ impl SessionManager {

let user_devices = if user_devices.is_empty() {
match self
.keys_query_listener
.wait_if_user_pending(Self::KEYS_QUERY_WAIT_TIME, user_id)
.store
.wait_if_user_key_query_pending(Self::KEYS_QUERY_WAIT_TIME, user_id)
.await
{
Ok(WasPending) => self.store.get_readonly_devices_filtered(user_id).await?,
WasPending => self.store.get_readonly_devices_filtered(user_id).await?,
_ => user_devices,
}
} else {
Expand Down Expand Up @@ -404,15 +404,22 @@ mod tests {
use matrix_sdk_common::locks::Mutex;
use matrix_sdk_test::{async_test, response_from_file};
use ruma::{
api::{client::keys::claim_keys::v3::Response as KeyClaimResponse, IncomingResponse},
api::{
client::keys::{
claim_keys::v3::Response as KeyClaimResponse,
get_keys::v3::Response as KeysQueryResponse,
},
IncomingResponse,
},
device_id, user_id, DeviceId, UserId,
};
use serde_json::json;
use tracing::info;

use super::SessionManager;
use crate::{
gossiping::GossipMachine,
identities::{KeysQueryListener, ReadOnlyDevice},
identities::{IdentityManager, KeysQueryListener, ReadOnlyDevice},
olm::{Account, PrivateCrossSigningIdentity, ReadOnlyAccount},
session_manager::GroupSessionCache,
store::{IntoCryptoStore, MemoryStore, Store},
Expand Down Expand Up @@ -528,6 +535,56 @@ mod tests {
assert!(manager.get_missing_sessions(iter::once(bob.user_id())).await.unwrap().is_none());
}

#[async_test]
async fn session_creation_waits_for_keys_query() {
let manager = session_manager().await;
let identity_manager = IdentityManager::new(
manager.account.user_id.clone(),
manager.account.device_id.clone(),
manager.store.clone(),
);

// start a keys query request. At this point, we are only interested in our own
// devices.
let (key_query_txn_id, key_query_request) =
identity_manager.users_for_key_query().await.unwrap().pop().unwrap();
info!("Initial key query: {:?}", key_query_request);

// now bob turns up, and we start tracking his devices...
let bob = bob_account();
let bob_device = ReadOnlyDevice::from_account(&bob).await;
manager.store.update_tracked_users(iter::once(bob.user_id())).await.unwrap();

// ... and start off an attempt to get the missing sessions. This should block
// for now.
let missing_sessions_future = manager.get_missing_sessions(iter::once(bob.user_id()));

// the initial keys query completes, and we start another
let response_json = json!({ "device_keys": { manager.account.user_id(): {}}});
let response =
KeysQueryResponse::try_from_http_response(response_from_file(&response_json)).unwrap();
identity_manager.receive_keys_query_response(&key_query_txn_id, &response).await.unwrap();

let (key_query_txn_id, key_query_request) =
identity_manager.users_for_key_query().await.unwrap().pop().unwrap();
info!("Second key query: {:?}", key_query_request);

// that second request completes with info on bob's device
let response_json = json!({ "device_keys": { bob.user_id(): {
bob_device.device_id(): bob_device.as_device_keys()
}}});
let response =
KeysQueryResponse::try_from_http_response(response_from_file(&response_json)).unwrap();
identity_manager.receive_keys_query_response(&key_query_txn_id, &response).await.unwrap();

// the missing_sessions_future should now finally complete, with a claim
// including bob's device
let (_, keys_claim_request) = missing_sessions_future.await.unwrap().unwrap();
//info!("Key claim: {:?}", keys_claim_request);
let bob_key_claims = keys_claim_request.one_time_keys.get(bob.user_id()).unwrap();
assert!(bob_key_claims.contains_key(bob_device.device_id()));
}

// This test doesn't run on macos because we're modifying the session
// creation time so we can get around the UNWEDGING_INTERVAL.
#[async_test]
Expand Down
Loading

0 comments on commit 16871d1

Please sign in to comment.