Skip to content

Commit

Permalink
Make upload progress observable
Browse files Browse the repository at this point in the history
  • Loading branch information
jplatte committed Jun 20, 2023
1 parent a668822 commit ac140c1
Show file tree
Hide file tree
Showing 14 changed files with 731 additions and 238 deletions.
27 changes: 27 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

69 changes: 69 additions & 0 deletions crates/matrix-sdk-ui/src/timeline/futures.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
use std::{
fs,
future::{Future, IntoFuture},
path::Path,
pin::Pin,
};

use eyeball::{shared::Observable as SharedObservable, Subscriber};
use matrix_sdk::{attachment::AttachmentConfig, room::Room, TransmissionProgress};
use mime::Mime;

use super::{Error, Timeline};

pub struct SendAttachment<'a> {
timeline: &'a Timeline,
url: String,
mime_type: Mime,
config: AttachmentConfig,
pub(crate) send_progress: SharedObservable<TransmissionProgress>,
}

impl<'a> SendAttachment<'a> {
pub(crate) fn new(
timeline: &'a Timeline,
url: String,
mime_type: Mime,
config: AttachmentConfig,
) -> Self {
Self { timeline, url, mime_type, config, send_progress: Default::default() }
}

/// Get a subscriber to observe the progress of sending the request
/// body.
#[cfg(not(target_arch = "wasm32"))]
pub fn subscribe_to_send_progress(&self) -> Subscriber<TransmissionProgress> {
self.send_progress.subscribe()
}
}

impl<'a> IntoFuture for SendAttachment<'a> {
type Output = Result<(), Error>;
#[cfg(target_arch = "wasm32")]
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + 'a>>;
#[cfg(not(target_arch = "wasm32"))]
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + 'a>>;

fn into_future(self) -> Self::IntoFuture {
let Self { timeline, url, mime_type, config, send_progress } = self;
Box::pin(async move {
let Room::Joined(room) = Room::from(timeline.room().clone()) else {
return Err(Error::RoomNotJoined);
};

let body = Path::new(&url)
.file_name()
.ok_or(Error::InvalidAttachmentFileName)?
.to_str()
.expect("path was created from UTF-8 string, hence filename part is UTF-8 too");
let data = fs::read(&url).map_err(|_| Error::InvalidAttachmentData)?;

room.send_attachment(body, &mime_type, data, config)
.with_send_progress_observable(send_progress)
.await
.map_err(|_| Error::FailedSendingAttachment)?;

Ok(())
})
}
}
22 changes: 6 additions & 16 deletions crates/matrix-sdk-ui/src/timeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
//!
//! See [`Timeline`] for details.
use std::{fs, path::Path, pin::Pin, sync::Arc, task::Poll, time::Duration};
use std::{pin::Pin, sync::Arc, task::Poll, time::Duration};

use async_std::sync::{Condvar, Mutex};
use eyeball_im::VectorDiff;
Expand Down Expand Up @@ -47,6 +47,7 @@ use tracing::{debug, error, info, instrument, warn};
mod builder;
mod event_handler;
mod event_item;
mod futures;
mod inner;
mod pagination;
mod read_receipts;
Expand All @@ -70,6 +71,7 @@ pub use self::{
OtherState, Profile, ReactionGroup, RepliedToEvent, RoomMembershipChange, Sticker,
TimelineDetails, TimelineItemContent,
},
futures::SendAttachment,
pagination::{PaginationOptions, PaginationOutcome},
traits::RoomExt,
virtual_item::VirtualTimelineItem,
Expand Down Expand Up @@ -346,25 +348,13 @@ impl Timeline {
/// * `config` - An attachment configuration object containing details about
/// the attachment
/// like a thumbnail, its size, duration etc.
pub async fn send_attachment(
pub fn send_attachment(
&self,
url: String,
mime_type: Mime,
config: AttachmentConfig,
) -> Result<(), Error> {
let Room::Joined(room) = Room::from(self.room().clone()) else {
return Err(Error::RoomNotJoined);
};

let body =
Path::new(&url).file_name().ok_or(Error::InvalidAttachmentFileName)?.to_str().unwrap();
let data = fs::read(&url).map_err(|_| Error::InvalidAttachmentData)?;

room.send_attachment(body, &mime_type, data, config)
.await
.map_err(|_| Error::FailedSendingAttachment)?;

Ok(())
) -> SendAttachment<'_> {
SendAttachment::new(self, url, mime_type, config)
}

/// Retry sending a message that previously failed to send.
Expand Down
27 changes: 12 additions & 15 deletions crates/matrix-sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,7 @@ features = ["docsrs"]
rustdoc-args = ["--cfg", "docsrs"]

[features]
default = [
"e2e-encryption",
"automatic-room-key-forwarding",
"sqlite",
"native-tls",
]
default = ["e2e-encryption", "automatic-room-key-forwarding", "sqlite", "native-tls"]
testing = []

e2e-encryption = [
Expand All @@ -47,23 +42,22 @@ appservice = ["ruma/appservice-api-s"]
image-proc = ["dep:image"]
image-rayon = ["image-proc", "image?/jpeg_rayon"]

experimental-sliding-sync = ["matrix-sdk-base/experimental-sliding-sync", "reqwest/gzip", "dep:eyeball-im-util"]

docsrs = [
"e2e-encryption",
"sqlite",
"sso-login",
"qrcode",
"image-proc",
experimental-sliding-sync = [
"matrix-sdk-base/experimental-sliding-sync",
"reqwest/gzip",
"dep:eyeball-im-util",
]

docsrs = ["e2e-encryption", "sqlite", "sso-login", "qrcode", "image-proc"]

[dependencies]
anyhow = { workspace = true, optional = true }
anymap2 = "0.13.0"
async-stream = { workspace = true }
async-trait = { workspace = true }
bytes = "1.1.0"
bytesize = "1.1"
cfg-vis = "0.3.0"
dashmap = { workspace = true }
event-listener = "2.5.2"
eyeball = { workspace = true }
Expand All @@ -82,7 +76,6 @@ matrix-sdk-sqlite = { version = "0.1.0", path = "../matrix-sdk-sqlite", default-
mime = "0.3.16"
mime2ext = "0.1.52"
rand = { version = "0.8.5", optional = true }
reqwest = { version = "0.11.10", default_features = false }
ruma = { workspace = true, features = ["rand", "unstable-msc2448", "unstable-msc2965"] }
serde = { workspace = true }
serde_html_form = { workspace = true }
Expand Down Expand Up @@ -116,10 +109,14 @@ optional = true

[target.'cfg(target_arch = "wasm32")'.dependencies]
gloo-timers = { version = "0.2.6", features = ["futures"] }
reqwest = { version = "0.11.10", default_features = false }
tokio = { workspace = true }

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
backoff = { version = "0.4.0", features = ["tokio"] }
# only activate reqwest's stream feature on non-wasm, the wasm part seems to not
# support *sending* streams, which makes it useless for us.
reqwest = { version = "0.11.10", default_features = false, features = ["stream"] }
tokio = { workspace = true, features = ["fs", "rt", "macros"] }

[dev-dependencies]
Expand Down
1 change: 1 addition & 0 deletions crates/matrix-sdk/src/client/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,7 @@ impl ClientBuilder {
None,
None,
&[MatrixVersion::V1_0],
Default::default(),
)
.await
.map_err(|e| match e {
Expand Down
99 changes: 99 additions & 0 deletions crates/matrix-sdk/src/client/futures.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
use std::{
fmt::Debug,
future::{Future, IntoFuture},
pin::Pin,
};

use cfg_vis::cfg_vis;
use eyeball::shared::Observable as SharedObservable;
#[cfg(not(target_arch = "wasm32"))]
use eyeball::Subscriber;
use ruma::api::{client::error::ErrorKind, error::FromHttpResponseError, OutgoingRequest};

use super::super::Client;
use crate::{
config::RequestConfig,
error::{HttpError, HttpResult},
RefreshTokenError, TransmissionProgress,
};

/// `IntoFuture` returned by [`Client::send`].
#[allow(missing_debug_implementations)]
pub struct SendRequest<R> {
pub(crate) client: Client,
pub(crate) request: R,
pub(crate) config: Option<RequestConfig>,
pub(crate) send_progress: SharedObservable<TransmissionProgress>,
}

impl<R> SendRequest<R> {
/// Replace the default `SharedObservable` used for tracking upload
/// progress.
///
/// Note that any subscribers obtained from
/// [`subscribe_to_send_progress`][Self::subscribe_to_send_progress]
/// will be invalidated by this.
#[cfg_vis(target_arch = "wasm32", pub(crate))]
pub fn with_send_progress_observable(
mut self,
send_progress: SharedObservable<TransmissionProgress>,
) -> Self {
self.send_progress = send_progress;
self
}

/// Get a subscriber to observe the progress of sending the request
/// body.
#[cfg(not(target_arch = "wasm32"))]
pub fn subscribe_to_send_progress(&self) -> Subscriber<TransmissionProgress> {
self.send_progress.subscribe()
}
}

impl<R> IntoFuture for SendRequest<R>
where
R: OutgoingRequest + Clone + Debug + Send + Sync + 'static,
R::IncomingResponse: Send + Sync,
HttpError: From<FromHttpResponseError<R::EndpointError>>,
{
type Output = HttpResult<R::IncomingResponse>;
#[cfg(target_arch = "wasm32")]
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output>>>;
#[cfg(not(target_arch = "wasm32"))]
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send>>;

fn into_future(self) -> Self::IntoFuture {
let Self { client, request, config, send_progress } = self;
Box::pin(async move {
let res =
Box::pin(client.send_inner(request.clone(), config, None, send_progress.clone()))
.await;

// If this is an `M_UNKNOWN_TOKEN` error and refresh token handling is active,
// try to refresh the token and retry the request.
if client.inner.handle_refresh_tokens {
if let Err(Some(ErrorKind::UnknownToken { .. })) =
res.as_ref().map_err(HttpError::client_api_error_kind)
{
if let Err(refresh_error) = client.refresh_access_token().await {
match &refresh_error {
HttpError::RefreshToken(RefreshTokenError::RefreshTokenRequired) => {
// Refreshing access tokens is not supported
// by
// this `Session`, ignore.
}
_ => {
return Err(refresh_error);
}
}
} else {
return Box::pin(client.send_inner(request, config, None, send_progress))
.await;
}
}
}

res
})
}
}
Loading

0 comments on commit ac140c1

Please sign in to comment.