From 36b1286ac323de25deacb91063074607d35749d3 Mon Sep 17 00:00:00 2001 From: Constantin Nickel <constantin.nickel@gmail.com> Date: Thu, 7 Dec 2023 20:08:30 +0100 Subject: [PATCH] Update `server-custom-accept` example to `hyper` v1 --- Cargo.toml | 4 +- examples/server-custom-accept.rs | 63 +++++++++++++++++++++++--------- 2 files changed, 48 insertions(+), 19 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index f2e84e6d..b170ffee 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -67,7 +67,9 @@ version = "0.26.0" [dev-dependencies] futures-channel = "0.3.28" -hyper = { version = "0.14.25", default-features = false, features = ["http1", "server", "tcp"] } +hyper = { version = "1.0", default-features = false, features = ["http1", "server"] } +hyper-util = { version = "0.1", features = ["tokio"] } +http-body-util = "0.1" tokio = { version = "1.27.0", default-features = false, features = ["io-std", "macros", "net", "rt-multi-thread", "time"] } url = "2.3.1" env_logger = "0.10.0" diff --git a/examples/server-custom-accept.rs b/examples/server-custom-accept.rs index 67ea04ff..5b013a64 100644 --- a/examples/server-custom-accept.rs +++ b/examples/server-custom-accept.rs @@ -26,15 +26,18 @@ use std::{ }; use hyper::{ + body::Incoming, header::{ HeaderValue, CONNECTION, SEC_WEBSOCKET_ACCEPT, SEC_WEBSOCKET_KEY, SEC_WEBSOCKET_VERSION, UPGRADE, }, - server::conn::AddrStream, - service::{make_service_fn, service_fn}, + server::conn::http1, + service::service_fn, upgrade::Upgraded, - Body, Method, Request, Response, Server, StatusCode, Version, + Method, Request, Response, StatusCode, Version, }; +use hyper_util::rt::TokioIo; +use tokio::net::TcpListener; use futures_channel::mpsc::{unbounded, UnboundedSender}; use futures_util::{future, pin_mut, stream::TryStreamExt, StreamExt}; @@ -50,9 +53,25 @@ use tokio_tungstenite::{ type Tx = UnboundedSender<Message>; type PeerMap = Arc<Mutex<HashMap<SocketAddr, Tx>>>; +/// Helper methods to create responses. +mod body { + use http_body_util::{Either, Empty, Full}; + use hyper::body::Bytes; + + pub type Body = Either<Empty<Bytes>, Full<Bytes>>; + + pub fn empty() -> Body { + Either::Left(Empty::new()) + } + + pub fn bytes<B: Into<Bytes>>(chunk: B) -> Body { + Either::Right(Full::from(chunk.into())) + } +} + async fn handle_connection( peer_map: PeerMap, - ws_stream: WebSocketStream<Upgraded>, + ws_stream: WebSocketStream<TokioIo<Upgraded>>, addr: SocketAddr, ) { println!("WebSocket connection established: {}", addr); @@ -89,9 +108,9 @@ async fn handle_connection( async fn handle_request( peer_map: PeerMap, - mut req: Request<Body>, + mut req: Request<Incoming>, addr: SocketAddr, -) -> Result<Response<Body>, Infallible> { +) -> Result<Response<body::Body>, Infallible> { println!("Received a new, potentially ws handshake"); println!("The request's path is: {}", req.uri().path()); println!("The request's headers are:"); @@ -122,12 +141,13 @@ async fn handle_request( || key.is_none() || req.uri() != "/socket" { - return Ok(Response::new(Body::from("Hello World!"))); + return Ok(Response::new(body::bytes("Hello World!"))); } let ver = req.version(); tokio::task::spawn(async move { match hyper::upgrade::on(&mut req).await { Ok(upgraded) => { + let upgraded = TokioIo::new(upgraded); handle_connection( peer_map, WebSocketStream::from_raw_socket(upgraded, Role::Server, None).await, @@ -138,7 +158,7 @@ async fn handle_request( Err(e) => println!("upgrade error: {}", e), } }); - let mut res = Response::new(Body::empty()); + let mut res = Response::new(body::empty()); *res.status_mut() = StatusCode::SWITCHING_PROTOCOLS; *res.version_mut() = ver; res.headers_mut().append(CONNECTION, upgrade); @@ -151,21 +171,28 @@ async fn handle_request( } #[tokio::main] -async fn main() -> Result<(), hyper::Error> { +async fn main() -> Result<(), Box<dyn std::error::Error>> { let state = PeerMap::new(Mutex::new(HashMap::new())); - let addr = env::args().nth(1).unwrap_or_else(|| "127.0.0.1:8080".to_string()).parse().unwrap(); + let addr = + env::args().nth(1).unwrap_or_else(|| "127.0.0.1:8080".to_string()).parse::<SocketAddr>()?; + + let listener = TcpListener::bind(addr).await?; - let make_svc = make_service_fn(move |conn: &AddrStream| { - let remote_addr = conn.remote_addr(); + loop { + let (stream, remote_addr) = listener.accept().await?; let state = state.clone(); - let service = service_fn(move |req| handle_request(state.clone(), req, remote_addr)); - async { Ok::<_, Infallible>(service) } - }); - let server = Server::bind(&addr).serve(make_svc); + tokio::spawn(async move { + let io = TokioIo::new(stream); - server.await?; + let service = service_fn(move |req| handle_request(state.clone(), req, remote_addr)); - Ok::<_, hyper::Error>(()) + let conn = http1::Builder::new().serve_connection(io, service).with_upgrades(); + + if let Err(err) = conn.await { + eprintln!("failed to serve connection: {err:?}"); + } + }); + } }