diff --git a/src/fs.rs b/src/fs.rs index e05474a..036988c 100644 --- a/src/fs.rs +++ b/src/fs.rs @@ -115,7 +115,7 @@ pub struct DavProp { /// Future returned by almost all of the DavFileSystem methods. pub type FsFuture<'a, T> = Pin> + Send + 'a>>; /// Convenience alias for a boxed Stream. -pub type FsStream = Pin + Send>>; +pub type FsStream = Pin> + Send>>; /// Used as argument to the read_dir() method. /// It is: diff --git a/src/handle_copymove.rs b/src/handle_copymove.rs index b7b105f..6daf20e 100644 --- a/src/handle_copymove.rs +++ b/src/handle_copymove.rs @@ -85,6 +85,11 @@ impl crate::DavInner { // Last seen error is returned from function. let mut retval = Ok::<_, DavError>(()); while let Some(dirent) = entries.next().await { + let dirent = match dirent { + Ok(dirent) => dirent, + Err(e) => return add_status(multierror, source, e).await, + }; + // NOTE: dirent.metadata() behaves like symlink_metadata() let meta = match dirent.metadata().await { Ok(meta) => meta, diff --git a/src/handle_delete.rs b/src/handle_delete.rs index e0801ff..f191b93 100644 --- a/src/handle_delete.rs +++ b/src/handle_delete.rs @@ -66,6 +66,14 @@ impl crate::DavInner { let mut result = Ok(()); while let Some(dirent) = entries.next().await { + let dirent = match dirent { + Ok(dirent) => dirent, + Err(e) => { + result = Err(add_status(res, path, e).await); + continue; + } + }; + // if metadata() fails, skip to next entry. // NOTE: dirent.metadata == symlink_metadata (!) let meta = match dirent.metadata().await { diff --git a/src/handle_gethead.rs b/src/handle_gethead.rs index cf303e7..6395436 100644 --- a/src/handle_gethead.rs +++ b/src/handle_gethead.rs @@ -325,6 +325,14 @@ impl crate::DavInner { let mut dirents: Vec = Vec::new(); while let Some(dirent) = entries.next().await { + let dirent = match dirent { + Ok(dirent) => dirent, + Err(e) => { + trace!("next dir entry error happened. Skipping {:?}", e); + continue; + } + }; + let mut name = dirent.name(); if name.starts_with(b".") { continue; diff --git a/src/handle_props.rs b/src/handle_props.rs index 74c0916..6d6304c 100644 --- a/src/handle_props.rs +++ b/src/handle_props.rs @@ -239,6 +239,14 @@ impl DavInner { }; while let Some(dirent) = entries.next().await { + let dirent = match dirent { + Ok(dirent) => dirent, + Err(e) => { + trace!("next dir entry error happened. Skipping {:?}", e); + continue; + } + }; + let mut npath = path.clone(); npath.push_segment(&dirent.name()); let meta = match dirent.metadata().await { diff --git a/src/localfs.rs b/src/localfs.rs index 0198ee7..1e37e1d 100644 --- a/src/localfs.rs +++ b/src/localfs.rs @@ -532,7 +532,7 @@ impl LocalFsReadDir { // The stream implementation tries to be smart and batch I/O operations impl<'a> Stream for LocalFsReadDir { - type Item = Box; + type Item = FsResult>; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = Pin::into_inner(self); @@ -569,8 +569,18 @@ impl<'a> Stream for LocalFsReadDir { // we filled the buffer, now pop from the buffer. match this.buffer.pop_front() { - Some(Ok(item)) => Poll::Ready(Some(Box::new(item))), - Some(Err(_)) | None => { + Some(Ok(item)) => Poll::Ready(Some(Ok(Box::new(item)))), + Some(Err(err)) => { + // fuse the iterator. + this.iterator.take(); + // finish the cache. + if let Some(ref mut nb) = this.dir_cache { + nb.finish(); + } + // return error of stream. + Poll::Ready(Some(Err(err.into()))) + } + None => { // fuse the iterator. this.iterator.take(); // finish the cache. diff --git a/src/memfs.rs b/src/memfs.rs index 1386b9a..935921e 100644 --- a/src/memfs.rs +++ b/src/memfs.rs @@ -15,6 +15,7 @@ use bytes::{Buf, Bytes}; use futures_util::{ future, future::{BoxFuture, FutureExt}, + StreamExt, }; use http::StatusCode; @@ -152,7 +153,7 @@ impl DavFileSystem for MemFs { v.push(Box::new(node.as_dirent(&name))); } } - let strm = futures_util::stream::iter(v.into_iter()); + let strm = futures_util::stream::iter(v.into_iter()).map(Ok); Ok(Box::pin(strm) as FsStream>) } .boxed()