Skip to content

Commit

Permalink
sdk: Remove HttpSend in favor of allowing reqwest customization
Browse files Browse the repository at this point in the history
  • Loading branch information
jplatte committed Jun 20, 2023
1 parent ba9d829 commit 4c1a351
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 168 deletions.
2 changes: 1 addition & 1 deletion crates/matrix-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
and listen to changes in the state of the `VerificationRequest`. This removes
the need to listen to individual matrix events once the `VerificationRequest`
object has been acquired.

- `Common::members` and `Common::members_no_sync` take a `RoomMemberships` to be able to filter the
results by any membership state.
- `Common::active_members(_no_sync)` and `Common::joined_members(_no_sync)` are deprecated.
Expand All @@ -17,6 +16,7 @@
- Add `Client::rooms_filtered`
- Replace `Client::authentication_issuer` with `Client::authentication_server_info` that contains
all the fields discovered from the homeserver for authenticating with OIDC
- Remove `HttpSend` trait in favor of allowing a custom `reqwest::Client` instance to be supplied

# 0.6.2

Expand Down
45 changes: 21 additions & 24 deletions crates/matrix-sdk/src/client/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,9 @@ use tracing::{debug, field::debug, instrument, Span};
use url::Url;

use super::{Client, ClientInner};
use crate::{
config::RequestConfig,
error::RumaApiError,
http_client::{HttpClient, HttpSend, HttpSettings},
HttpError,
};
#[cfg(not(target_arch = "wasm32"))]
use crate::http_client::HttpSettings;
use crate::{config::RequestConfig, error::RumaApiError, http_client::HttpClient, HttpError};

/// Builder that allows creating and configuring various parts of a [`Client`].
///
Expand Down Expand Up @@ -70,7 +67,7 @@ use crate::{
/// .user_agent("MyApp/v3.0");
///
/// let client_builder =
/// Client::builder().http_client(Arc::new(reqwest_builder.build()?));
/// Client::builder().http_client(reqwest_builder.build()?);
/// # anyhow::Ok(())
/// ```
#[must_use]
Expand Down Expand Up @@ -226,15 +223,13 @@ impl ClientBuilder {
self
}

/// Specify an HTTP client to handle sending requests and receiving
/// responses.
/// Specify a [`reqwest::Client`] instance to handle sending requests and
/// receiving responses.
///
/// Any type that implements the `HttpSend` trait can be used to send /
/// receive `http` types.
///
/// This method is mutually exclusive with
/// [`user_agent()`][Self::user_agent],
pub fn http_client(mut self, client: Arc<dyn HttpSend>) -> Self {
/// This method is mutually exclusive with [`proxy()`][Self::proxy],
/// [`disable_ssl_verification`][Self::disable_ssl_verification] and
/// [`user_agent()`][Self::user_agent].
pub fn http_client(mut self, client: reqwest::Client) -> Self {
self.http_cfg = Some(HttpConfig::Custom(client));
self
}
Expand Down Expand Up @@ -325,15 +320,12 @@ impl ClientBuilder {
let homeserver_cfg = self.homeserver_cfg.ok_or(ClientBuildError::MissingHomeserver)?;
Span::current().record("homeserver", debug(&homeserver_cfg));

#[cfg_attr(target_arch = "wasm32", allow(clippy::infallible_destructuring_match))]
let inner_http_client = match self.http_cfg.unwrap_or_default() {
#[allow(unused_mut)]
#[cfg(not(target_arch = "wasm32"))]
HttpConfig::Settings(mut settings) => {
#[cfg(not(target_arch = "wasm32"))]
{
settings.timeout = self.request_config.timeout;
}

Arc::new(settings.make_client()?)
settings.timeout = self.request_config.timeout;
settings.make_client()?
}
HttpConfig::Custom(c) => c,
};
Expand Down Expand Up @@ -449,8 +441,9 @@ enum HomeserverConfig {

#[derive(Clone, Debug)]
enum HttpConfig {
#[cfg(not(target_arch = "wasm32"))]
Settings(HttpSettings),
Custom(Arc<dyn HttpSend>),
Custom(reqwest::Client),
}

#[cfg(not(target_arch = "wasm32"))]
Expand All @@ -471,7 +464,11 @@ impl HttpConfig {

impl Default for HttpConfig {
fn default() -> Self {
Self::Settings(HttpSettings::default())
#[cfg(not(target_arch = "wasm32"))]
return Self::Settings(HttpSettings::default());

#[cfg(target_arch = "wasm32")]
return Self::Custom(reqwest::Client::new());
}
}

Expand Down
199 changes: 57 additions & 142 deletions crates/matrix-sdk/src/http_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,9 @@ use std::{
time::Duration,
};

use async_trait::async_trait;
use bytes::{Bytes, BytesMut};
use bytesize::ByteSize;
use eyeball::shared::Observable as SharedObservable;
use matrix_sdk_common::AsyncTraitDeps;
use ruma::{
api::{
error::{FromHttpResponseError, IntoHttpError},
Expand All @@ -41,82 +39,15 @@ use crate::{config::RequestConfig, error::HttpError};

pub(crate) const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(30);

/// Abstraction around the http layer. The allows implementors to use different
/// http libraries.
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
pub trait HttpSend: AsyncTraitDeps {
/// The method abstracting sending request types and receiving response
/// types.
///
/// This is called by the client every time it wants to send anything to a
/// homeserver.
///
/// # Arguments
///
/// * `request` - The http request that has been converted from a ruma
/// `Request`.
///
/// * `timeout` - A timeout for the full request > response cycle.
///
/// # Examples
///
/// ```
/// use std::time::Duration;
///
/// use eyeball::shared::Observable as SharedObservable;
/// use matrix_sdk::{
/// async_trait, bytes::Bytes, HttpError, HttpSend, TransmissionProgress,
/// };
///
/// #[derive(Debug)]
/// struct Client(reqwest::Client);
///
/// impl Client {
/// async fn response_to_http_response(
/// &self,
/// mut response: reqwest::Response,
/// ) -> Result<http::Response<Bytes>, HttpError> {
/// // Convert the reqwest response to a http one.
/// todo!()
/// }
/// }
///
/// #[async_trait]
/// impl HttpSend for Client {
/// async fn send_request(
/// &self,
/// request: http::Request<Bytes>,
/// timeout: Duration,
/// _send_progress: SharedObservable<TransmissionProgress>,
/// ) -> Result<http::Response<Bytes>, HttpError> {
/// Ok(self
/// .response_to_http_response(
/// self.0
/// .execute(reqwest::Request::try_from(request)?)
/// .await?,
/// )
/// .await?)
/// }
/// }
/// ```
async fn send_request(
&self,
request: http::Request<Bytes>,
timeout: Duration,
send_progress: SharedObservable<TransmissionProgress>,
) -> Result<http::Response<Bytes>, HttpError>;
}

#[derive(Debug)]
pub(crate) struct HttpClient {
pub(crate) inner: Arc<dyn HttpSend>,
pub(crate) inner: reqwest::Client,
pub(crate) request_config: RequestConfig,
next_request_id: Arc<AtomicU64>,
}

impl HttpClient {
pub(crate) fn new(inner: Arc<dyn HttpSend>, request_config: RequestConfig) -> Self {
pub(crate) fn new(inner: reqwest::Client, request_config: RequestConfig) -> Self {
HttpClient { inner, request_config, next_request_id: AtomicU64::new(0).into() }
}

Expand Down Expand Up @@ -192,7 +123,11 @@ impl HttpClient {

#[cfg(target_arch = "wasm32")]
let response = {
let response = self.inner.send_request(request, config.timeout, send_progress).await?;
// Silence unused warning
_ = (config, send_progress);

let request = reqwest::Request::try_from(request)?;
let response = response_to_http_response(self.inner.execute(request).await?).await?;

let status_code = response.status();
let response_size = ByteSize(response.body().len().try_into().unwrap_or(u64::MAX));
Expand Down Expand Up @@ -271,11 +206,14 @@ impl HttpClient {
}
};

let response = self
.inner
.send_request(clone_request(&request), config.timeout, send_progress)
.await
.map_err(error_type)?;
let response = send_request(
&self.inner,
clone_request(&request),
config.timeout,
send_progress,
)
.await
.map_err(error_type)?;

let status_code = response.status();
let response_size = ByteSize(response.body().len().try_into().unwrap_or(u64::MAX));
Expand Down Expand Up @@ -389,55 +327,42 @@ impl HttpClient {
}
}

#[cfg(not(target_arch = "wasm32"))]
#[derive(Clone, Debug)]
pub(crate) struct HttpSettings {
#[cfg(not(target_arch = "wasm32"))]
pub(crate) disable_ssl_verification: bool,
#[cfg(not(target_arch = "wasm32"))]
pub(crate) proxy: Option<String>,
#[cfg(not(target_arch = "wasm32"))]
pub(crate) user_agent: Option<String>,
#[cfg(not(target_arch = "wasm32"))]
pub(crate) timeout: Duration,
}

#[allow(clippy::derivable_impls)]
#[cfg(not(target_arch = "wasm32"))]
impl Default for HttpSettings {
fn default() -> Self {
Self {
#[cfg(not(target_arch = "wasm32"))]
disable_ssl_verification: false,
#[cfg(not(target_arch = "wasm32"))]
proxy: None,
#[cfg(not(target_arch = "wasm32"))]
user_agent: None,
#[cfg(not(target_arch = "wasm32"))]
timeout: DEFAULT_REQUEST_TIMEOUT,
}
}
}

#[cfg(not(target_arch = "wasm32"))]
impl HttpSettings {
/// Build a client with the specified configuration.
pub(crate) fn make_client(&self) -> Result<reqwest::Client, HttpError> {
#[allow(unused_mut)]
let mut http_client = reqwest::Client::builder();
let user_agent = self.user_agent.clone().unwrap_or_else(|| "matrix-rust-sdk".to_owned());
let mut http_client =
reqwest::Client::builder().user_agent(user_agent).timeout(self.timeout);

#[cfg(not(target_arch = "wasm32"))]
{
if self.disable_ssl_verification {
http_client = http_client.danger_accept_invalid_certs(true)
}

if let Some(p) = &self.proxy {
http_client = http_client.proxy(reqwest::Proxy::all(p.as_str())?);
}

let user_agent =
self.user_agent.clone().unwrap_or_else(|| "matrix-rust-sdk".to_owned());
if self.disable_ssl_verification {
http_client = http_client.danger_accept_invalid_certs(true)
}

http_client = http_client.user_agent(user_agent).timeout(self.timeout);
};
if let Some(p) = &self.proxy {
http_client = http_client.proxy(reqwest::Proxy::all(p.as_str())?);
}

Ok(http_client.build()?)
}
Expand Down Expand Up @@ -483,49 +408,39 @@ async fn response_to_http_response(
Ok(http_builder.body(body).expect("Can't construct a response using the given body"))
}

#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl HttpSend for reqwest::Client {
async fn send_request(
&self,
request: http::Request<Bytes>,
_timeout: Duration,
_send_progress: SharedObservable<TransmissionProgress>,
) -> Result<http::Response<Bytes>, HttpError> {
#[cfg(not(target_arch = "wasm32"))]
let request = {
use std::convert::Infallible;

use futures_util::stream;

let mut request = if _send_progress.subscriber_count() != 0 {
_send_progress.update(|p| p.total += request.body().len());
reqwest::Request::try_from(request.map(|body| {
let chunks = stream::iter(BytesChunks::new(body, 8192).map(
move |chunk| -> Result<_, Infallible> {
_send_progress.update(|p| p.current += chunk.len());
Ok(chunk)
},
));
reqwest::Body::wrap_stream(chunks)
}))?
} else {
reqwest::Request::try_from(request)?
};

*request.timeout_mut() = Some(_timeout);
request
#[cfg(not(target_arch = "wasm32"))]
async fn send_request(
client: &reqwest::Client,
request: http::Request<Bytes>,
timeout: Duration,
send_progress: SharedObservable<TransmissionProgress>,
) -> Result<http::Response<Bytes>, HttpError> {
use std::convert::Infallible;

use futures_util::stream;

let request = {
let mut request = if send_progress.subscriber_count() != 0 {
send_progress.update(|p| p.total += request.body().len());
reqwest::Request::try_from(request.map(|body| {
let chunks = stream::iter(BytesChunks::new(body, 8192).map(
move |chunk| -> Result<_, Infallible> {
send_progress.update(|p| p.current += chunk.len());
Ok(chunk)
},
));
reqwest::Body::wrap_stream(chunks)
}))?
} else {
reqwest::Request::try_from(request)?
};

// Both reqwest::Body::wrap_stream and the timeout functionality are
// not available on WASM
#[cfg(target_arch = "wasm32")]
let request = reqwest::Request::try_from(request)?;

let response = self.execute(request).await?;
*request.timeout_mut() = Some(timeout);
request
};

Ok(response_to_http_response(response).await?)
}
let response = client.execute(request).await?;
Ok(response_to_http_response(response).await?)
}

#[cfg(not(target_arch = "wasm32"))]
Expand Down
2 changes: 1 addition & 1 deletion crates/matrix-sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ pub use client::{
#[cfg(feature = "image-proc")]
pub use error::ImageError;
pub use error::{Error, HttpError, HttpResult, RefreshTokenError, Result, RumaApiError};
pub use http_client::{HttpSend, TransmissionProgress};
pub use http_client::TransmissionProgress;
pub use media::Media;
pub use ruma::{IdParseError, OwnedServerName, ServerName};
#[cfg(feature = "experimental-sliding-sync")]
Expand Down

0 comments on commit 4c1a351

Please sign in to comment.