Skip to content

Commit

Permalink
feat(!): Make read_dir failable (#31)
Browse files Browse the repository at this point in the history
* feat: Make read_dir failable

Signed-off-by: Xuanwo <[email protected]>

* Handle error correctly

Signed-off-by: Xuanwo <[email protected]>

---------

Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo authored Jun 18, 2024
1 parent 0e39d10 commit f668f29
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 5 deletions.
2 changes: 1 addition & 1 deletion src/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ pub struct DavProp {
/// Future returned by almost all of the DavFileSystem methods.
pub type FsFuture<'a, T> = Pin<Box<dyn Future<Output = FsResult<T>> + Send + 'a>>;
/// Convenience alias for a boxed Stream.
pub type FsStream<T> = Pin<Box<dyn Stream<Item = T> + Send>>;
pub type FsStream<T> = Pin<Box<dyn Stream<Item = FsResult<T>> + Send>>;

/// Used as argument to the read_dir() method.
/// It is:
Expand Down
5 changes: 5 additions & 0 deletions src/handle_copymove.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ impl crate::DavInner {
// Last seen error is returned from function.
let mut retval = Ok::<_, DavError>(());
while let Some(dirent) = entries.next().await {
let dirent = match dirent {
Ok(dirent) => dirent,
Err(e) => return add_status(multierror, source, e).await,
};

// NOTE: dirent.metadata() behaves like symlink_metadata()
let meta = match dirent.metadata().await {
Ok(meta) => meta,
Expand Down
8 changes: 8 additions & 0 deletions src/handle_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,14 @@ impl crate::DavInner {

let mut result = Ok(());
while let Some(dirent) = entries.next().await {
let dirent = match dirent {
Ok(dirent) => dirent,
Err(e) => {
result = Err(add_status(res, path, e).await);
continue;
}
};

// if metadata() fails, skip to next entry.
// NOTE: dirent.metadata == symlink_metadata (!)
let meta = match dirent.metadata().await {
Expand Down
8 changes: 8 additions & 0 deletions src/handle_gethead.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,14 @@ impl crate::DavInner {

let mut dirents: Vec<Dirent> = Vec::new();
while let Some(dirent) = entries.next().await {
let dirent = match dirent {
Ok(dirent) => dirent,
Err(e) => {
trace!("next dir entry error happened. Skipping {:?}", e);
continue;
}
};

let mut name = dirent.name();
if name.starts_with(b".") {
continue;
Expand Down
8 changes: 8 additions & 0 deletions src/handle_props.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,14 @@ impl DavInner {
};

while let Some(dirent) = entries.next().await {
let dirent = match dirent {
Ok(dirent) => dirent,
Err(e) => {
trace!("next dir entry error happened. Skipping {:?}", e);
continue;
}
};

let mut npath = path.clone();
npath.push_segment(&dirent.name());
let meta = match dirent.metadata().await {
Expand Down
16 changes: 13 additions & 3 deletions src/localfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ impl LocalFsReadDir {

// The stream implementation tries to be smart and batch I/O operations
impl<'a> Stream for LocalFsReadDir {
type Item = Box<dyn DavDirEntry>;
type Item = FsResult<Box<dyn DavDirEntry>>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = Pin::into_inner(self);
Expand Down Expand Up @@ -569,8 +569,18 @@ impl<'a> Stream for LocalFsReadDir {

// we filled the buffer, now pop from the buffer.
match this.buffer.pop_front() {
Some(Ok(item)) => Poll::Ready(Some(Box::new(item))),
Some(Err(_)) | None => {
Some(Ok(item)) => Poll::Ready(Some(Ok(Box::new(item)))),
Some(Err(err)) => {
// fuse the iterator.
this.iterator.take();
// finish the cache.
if let Some(ref mut nb) = this.dir_cache {
nb.finish();
}
// return error of stream.
Poll::Ready(Some(Err(err.into())))
}
None => {
// fuse the iterator.
this.iterator.take();
// finish the cache.
Expand Down
3 changes: 2 additions & 1 deletion src/memfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use bytes::{Buf, Bytes};
use futures_util::{
future,
future::{BoxFuture, FutureExt},
StreamExt,
};
use http::StatusCode;

Expand Down Expand Up @@ -152,7 +153,7 @@ impl DavFileSystem for MemFs {
v.push(Box::new(node.as_dirent(&name)));
}
}
let strm = futures_util::stream::iter(v.into_iter());
let strm = futures_util::stream::iter(v.into_iter()).map(Ok);
Ok(Box::pin(strm) as FsStream<Box<dyn DavDirEntry>>)
}
.boxed()
Expand Down

0 comments on commit f668f29

Please sign in to comment.