diff --git a/Cargo.toml b/Cargo.toml index b6bb72a..a01107d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,10 +37,11 @@ required-features = [ "warp-compat" ] bytes = "1.0.1" futures-util = "0.3.16" futures-channel = "0.3.16" -headers = "0.3.0" +headers = "0.4.0" htmlescape = "0.3.1" -http = "0.2.3" -http-body = "0.4.0" +http = "1.0.0" +http-body = "1.0.0" +http-body-util = "0.1.0" lazy_static = "1.4.0" libc = { version = "0.2.0", optional = true } log = "0.4.0" @@ -58,13 +59,14 @@ uuid = { version = "1.1.2", features = ["v4"] } xml-rs = "0.8.0" xmltree = "0.10.0" -hyper = {version = "0.14.0", optional = true } +hyper = {version = "1.1.0", optional = true } warp = { version = "0.3.0", optional = true } actix-web = { version = "4.0.0-beta.15", optional = true } [dev-dependencies] clap = { version = "4.0.0", features = ["derive"] } env_logger = "0.10.0" -hyper = { version = "0.14.0", features = [ "http1", "http2", "server", "stream", "runtime" ] } +hyper = { version = "1.1.0", features = [ "http1", "server" ] } +hyper-util = { version = "0.1.2", features = ["tokio"] } tokio = { version = "1.3.0", features = ["full"] } diff --git a/examples/hyper.rs b/examples/hyper.rs index 0b3feb5..c7f5967 100644 --- a/examples/hyper.rs +++ b/examples/hyper.rs @@ -1,31 +1,53 @@ +use std::{convert::Infallible, net::SocketAddr}; + +use hyper::{server::conn::http1, service::service_fn}; +use hyper_util::rt::TokioIo; +use tokio::net::TcpListener; + use dav_server::{fakels::FakeLs, localfs::LocalFs, DavHandler}; -use std::convert::Infallible; #[tokio::main] async fn main() { env_logger::init(); let dir = "/tmp"; - let addr = ([127, 0, 0, 1], 4918).into(); + let addr = SocketAddr::from(([127, 0, 0, 1], 4918)); let dav_server = DavHandler::builder() .filesystem(LocalFs::new(dir, false, false, false)) .locksystem(FakeLs::new()) .build_handler(); - let make_service = hyper::service::make_service_fn(move |_| { + let listener = TcpListener::bind(addr).await.unwrap(); + + println!("Serving on {:?}", addr); + + // We start a loop to continuously accept incoming connections + loop { + let (stream, _) = listener.accept().await.unwrap(); let dav_server = dav_server.clone(); - async move { - let func = move |req| { - let dav_server = dav_server.clone(); - async move { Ok::<_, Infallible>(dav_server.handle(req).await) } - }; - Ok::<_, Infallible>(hyper::service::service_fn(func)) - } - }); - - println!("hyper example: listening on {:?} serving {}", addr, dir); - let _ = hyper::Server::bind(&addr) - .serve(make_service) - .await - .map_err(|e| eprintln!("server error: {}", e)); + + // Use an adapter to access something implementing `tokio::io` traits as if they implement + // `hyper::rt` IO traits. + let io = TokioIo::new(stream); + + // Spawn a tokio task to serve multiple connections concurrently + tokio::task::spawn(async move { + // Finally, we bind the incoming connection to our `hello` service + if let Err(err) = http1::Builder::new() + // `service_fn` converts our function in a `Service` + .serve_connection( + io, + service_fn({ + move |req| { + let dav_server = dav_server.clone(); + async move { Ok::<_, Infallible>(dav_server.handle(req).await) } + } + }), + ) + .await + { + println!("Error serving connection: {:?}", err); + } + }); + } } diff --git a/examples/sample-litmus-server.rs b/examples/sample-litmus-server.rs index b38277c..7060ccf 100644 --- a/examples/sample-litmus-server.rs +++ b/examples/sample-litmus-server.rs @@ -5,14 +5,13 @@ // Connect to http://localhost:4918/ // -use std::convert::Infallible; -use std::error::Error; -use std::net::SocketAddr; -use std::str::FromStr; +use std::{convert::Infallible, error::Error, net::SocketAddr, str::FromStr}; use clap::Parser; -use futures_util::future::TryFutureExt; use headers::{authorization::Basic, Authorization, HeaderMapExt}; +use hyper::{server::conn::http1, service::service_fn}; +use hyper_util::rt::TokioIo; +use tokio::net::TcpListener; use dav_server::{body::Body, fakels, localfs, memfs, memls, DavConfig, DavHandler}; @@ -45,7 +44,7 @@ impl Server { async fn handle( &self, - req: hyper::Request, + req: hyper::Request, ) -> Result, Infallible> { let user = if self.auth { // we want the client to authenticate. @@ -112,26 +111,42 @@ async fn main() -> Result<(), Box> { let fakels = args.fakels; let dav_server = Server::new(dir.to_string(), memls, fakels, auth); - let make_service = hyper::service::make_service_fn(|_| { - let dav_server = dav_server.clone(); - async move { - let func = move |req| { - let dav_server = dav_server.clone(); - async move { dav_server.clone().handle(req).await } - }; - Ok::<_, hyper::Error>(hyper::service::service_fn(func)) - } - }); let port = args.port; let addr = format!("0.0.0.0:{}", port); let addr = SocketAddr::from_str(&addr)?; - let server = hyper::Server::try_bind(&addr)? - .serve(make_service) - .map_err(|e| eprintln!("server error: {}", e)); + let listener = TcpListener::bind(addr).await?; println!("Serving {} on {}", name, port); - let _ = server.await; - Ok(()) + + // We start a loop to continuously accept incoming connections + loop { + let (stream, _) = listener.accept().await?; + let dav_server = dav_server.clone(); + + // Use an adapter to access something implementing `tokio::io` traits as if they implement + // `hyper::rt` IO traits. + let io = TokioIo::new(stream); + + // Spawn a tokio task to serve multiple connections concurrently + tokio::task::spawn(async move { + // Finally, we bind the incoming connection to our `hello` service + if let Err(err) = http1::Builder::new() + // `service_fn` converts our function in a `Service` + .serve_connection( + io, + service_fn({ + move |req| { + let dav_server = dav_server.clone(); + async move { dav_server.clone().handle(req).await } + } + }), + ) + .await + { + println!("Error serving connection: {:?}", err); + } + }); + } } diff --git a/src/body.rs b/src/body.rs index 9279632..ee5103b 100644 --- a/src/body.rs +++ b/src/body.rs @@ -7,7 +7,6 @@ use std::task::{Context, Poll}; use bytes::{Buf, Bytes}; use futures_util::stream::Stream; -use http::header::HeaderMap; use http_body::Body as HttpBody; use crate::async_stream::AsyncStream; @@ -52,18 +51,11 @@ impl HttpBody for Body { type Data = Bytes; type Error = io::Error; - fn poll_data( + fn poll_frame( self: Pin<&mut Self>, - cx: &mut Context, - ) -> Poll>> { - self.poll_next(cx) - } - - fn poll_trailers( - self: Pin<&mut Self>, - _cx: &mut Context, - ) -> Poll, Self::Error>> { - Poll::Ready(Ok(None)) + cx: &mut Context<'_>, + ) -> Poll, Self::Error>>> { + self.poll_next(cx).map_ok(http_body::Frame::data) } } @@ -119,19 +111,12 @@ where type Data = ReqData; type Error = ReqError; - fn poll_data( + fn poll_frame( self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll>> { + ) -> Poll, Self::Error>>> { let this = self.project(); - this.body.poll_next(cx) - } - - fn poll_trailers( - self: Pin<&mut Self>, - _cx: &mut Context, - ) -> Poll, Self::Error>> { - Poll::Ready(Ok(None)) + this.body.poll_next(cx).map_ok(http_body::Frame::data) } } diff --git a/src/davhandler.rs b/src/davhandler.rs index 6f5de20..f074031 100644 --- a/src/davhandler.rs +++ b/src/davhandler.rs @@ -11,6 +11,7 @@ use futures_util::stream::Stream; use headers::HeaderMapExt; use http::{Request, Response, StatusCode}; use http_body::Body as HttpBody; +use http_body_util::BodyExt; use crate::body::{Body, StreamBody}; use crate::davheaders; @@ -375,13 +376,19 @@ impl DavInner { { let mut data = Vec::new(); pin_utils::pin_mut!(body); - while let Some(res) = body.data().await { - let mut buf = res.map_err(|_| { + + while let Some(res) = body.frame().await { + let mut data_frame = res.map_err(|_| { DavError::IoError(io::Error::new( io::ErrorKind::UnexpectedEof, "UnexpectedEof", )) })?; + + let Some(buf) = data_frame.data_mut() else { + continue; + }; + while buf.has_remaining() { if data.len() + buf.remaining() > max_size { return Err(StatusCode::PAYLOAD_TOO_LARGE.into()); diff --git a/src/handle_gethead.rs b/src/handle_gethead.rs index cd7e630..cf303e7 100644 --- a/src/handle_gethead.rs +++ b/src/handle_gethead.rs @@ -127,7 +127,7 @@ impl crate::DavInner { if let Some(r) = req.headers().typed_get::() { trace!("handle_gethead: range header {:?}", r); use std::ops::Bound::*; - for range in r.iter() { + for range in r.satisfiable_ranges(len) { let (start, mut count, valid) = match range { (Included(s), Included(e)) if e >= s => (s, e - s + 1, true), (Included(s), Unbounded) if s <= len => (s, len - s, true), diff --git a/src/handle_put.rs b/src/handle_put.rs index 0b1dd36..e003c8f 100644 --- a/src/handle_put.rs +++ b/src/handle_put.rs @@ -7,6 +7,7 @@ use headers::HeaderMapExt; use http::StatusCode as SC; use http::{self, Request, Response}; use http_body::Body as HttpBody; +use http_body_util::BodyExt; use crate::body::Body; use crate::conditional::if_match_get_tokens; @@ -230,10 +231,14 @@ impl crate::DavInner { // loop, read body, write to file. let mut total = 0u64; - while let Some(data) = body.data().await { - let mut buf = data.map_err(|e| to_ioerror(e))?; - let buflen = buf.remaining(); - total += buflen as u64; + while let Some(data) = body.frame().await { + let data_frame = data.map_err(|e| to_ioerror(e))?; + + let Ok(mut buf) = data_frame.into_data() else { + continue; + }; + + total += buf.remaining() as u64; // consistency check. if have_count && total > count { break; diff --git a/src/lib.rs b/src/lib.rs index 43df22e..1fa8d5e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -70,36 +70,57 @@ //! //! ```no_run //! use std::convert::Infallible; +//! use std::net::SocketAddr; //! use dav_server::{fakels::FakeLs, localfs::LocalFs, DavHandler}; +//! use hyper::server::conn::http1; +//! use hyper::service::service_fn; +//! use hyper_util::rt::TokioIo; +//! use tokio::net::TcpListener; //! //! #[tokio::main] //! async fn main() { +//! env_logger::init(); //! let dir = "/tmp"; -//! let addr = ([127, 0, 0, 1], 4918).into(); +//! let addr = SocketAddr::from(([127, 0, 0, 1], 4918)); //! //! let dav_server = DavHandler::builder() //! .filesystem(LocalFs::new(dir, false, false, false)) //! .locksystem(FakeLs::new()) //! .build_handler(); //! -//! let make_service = hyper::service::make_service_fn(move |_| { +//! let listener = TcpListener::bind(addr).await.unwrap(); +//! +//! println!("Serving on {:?}", addr); +//! +//! // We start a loop to continuously accept incoming connections +//! loop { +//! let (stream, _) = listener.accept().await.unwrap(); //! let dav_server = dav_server.clone(); -//! async move { -//! let func = move |req| { -//! let dav_server = dav_server.clone(); -//! async move { -//! Ok::<_, Infallible>(dav_server.handle(req).await) -//! } -//! }; -//! Ok::<_, Infallible>(hyper::service::service_fn(func)) -//! } -//! }); -//! -//! println!("Serving {} on {}", dir, addr); -//! let _ = hyper::Server::bind(&addr) -//! .serve(make_service) -//! .await -//! .map_err(|e| eprintln!("server error: {}", e)); +//! +//! // Use an adapter to access something implementing `tokio::io` traits as if they implement +//! // `hyper::rt` IO traits. +//! let io = TokioIo::new(stream); +//! +//! // Spawn a tokio task to serve multiple connections concurrently +//! tokio::task::spawn(async move { +//! // Finally, we bind the incoming connection to our `hello` service +//! if let Err(err) = http1::Builder::new() +//! // `service_fn` converts our function in a `Service` +//! .serve_connection( +//! io, +//! service_fn({ +//! move |req| { +//! let dav_server = dav_server.clone(); +//! async move { Ok::<_, Infallible>(dav_server.handle(req).await) } +//! } +//! }), +//! ) +//! .await +//! { +//! println!("Error serving connection: {:?}", err); +//! } +//! }); +//! } //! } //! ``` //! [DavHandler]: struct.DavHandler.html