From 4ce7949f4e298791081c5e683e3384a32da3e67d Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Mon, 9 Nov 2020 14:59:35 +0100 Subject: [PATCH 01/10] refactor generics in `listener` submodule --- src/listener/concurrent_listener.rs | 15 ++++- src/listener/failover_listener.rs | 20 +++++-- src/listener/to_listener_impls.rs | 88 ++++++++++++++++++++++------- src/server.rs | 10 +++- 4 files changed, 104 insertions(+), 29 deletions(-) diff --git a/src/listener/concurrent_listener.rs b/src/listener/concurrent_listener.rs index 1cff9b0d7..4fdb014bb 100644 --- a/src/listener/concurrent_listener.rs +++ b/src/listener/concurrent_listener.rs @@ -55,7 +55,10 @@ impl ConcurrentListener { /// # std::mem::drop(tide::new().listen(listener)); // for the State generic /// # Ok(()) } /// ``` - pub fn add>(&mut self, listener: TL) -> io::Result<()> { + pub fn add(&mut self, listener: L) -> io::Result<()> + where + L: ToListener, + { self.0.push(Box::new(listener.to_listener()?)); Ok(()) } @@ -71,14 +74,20 @@ impl ConcurrentListener { /// .with_listener(async_std::net::TcpListener::bind("127.0.0.1:8081").await?), /// ).await?; /// # Ok(()) }) } - pub fn with_listener>(mut self, listener: TL) -> Self { + pub fn with_listener(mut self, listener: L) -> Self + where + L: ToListener, + { self.add(listener).expect("Unable to add listener"); self } } #[async_trait::async_trait] -impl Listener for ConcurrentListener { +impl Listener for ConcurrentListener +where + State: Clone + Send + Sync + 'static, +{ async fn listen(&mut self, app: Server) -> io::Result<()> { let mut futures_unordered = FuturesUnordered::new(); diff --git a/src/listener/failover_listener.rs b/src/listener/failover_listener.rs index 4ab1bd242..340d2c72a 100644 --- a/src/listener/failover_listener.rs +++ b/src/listener/failover_listener.rs @@ -35,7 +35,10 @@ use async_std::io; #[derive(Default)] pub struct FailoverListener(Vec>>); -impl FailoverListener { +impl FailoverListener +where + State: Clone + Send + Sync + 'static, +{ /// creates a new FailoverListener pub fn new() -> Self { Self(vec![]) @@ -57,7 +60,10 @@ impl FailoverListener { /// # std::mem::drop(tide::new().listen(listener)); // for the State generic /// # Ok(()) } /// ``` - pub fn add>(&mut self, listener: TL) -> io::Result<()> { + pub fn add(&mut self, listener: L) -> io::Result<()> + where + L: ToListener, + { self.0.push(Box::new(listener.to_listener()?)); Ok(()) } @@ -73,14 +79,20 @@ impl FailoverListener { /// .with_listener(("localhost", 8081)), /// ).await?; /// # Ok(()) }) } - pub fn with_listener>(mut self, listener: TL) -> Self { + pub fn with_listener(mut self, listener: L) -> Self + where + L: ToListener, + { self.add(listener).expect("Unable to add listener"); self } } #[async_trait::async_trait] -impl Listener for FailoverListener { +impl Listener for FailoverListener +where + State: Clone + Send + Sync + 'static, +{ async fn listen(&mut self, app: Server) -> io::Result<()> { for listener in self.0.iter_mut() { let app = app.clone(); diff --git a/src/listener/to_listener_impls.rs b/src/listener/to_listener_impls.rs index a1b406d1f..de7bb5fa7 100644 --- a/src/listener/to_listener_impls.rs +++ b/src/listener/to_listener_impls.rs @@ -5,7 +5,10 @@ use crate::http::url::Url; use async_std::io; use std::net::ToSocketAddrs; -impl ToListener for Url { +impl ToListener for Url +where + State: Clone + Send + Sync + 'static, +{ type Listener = ParsedListener; fn to_listener(self) -> io::Result { @@ -48,14 +51,20 @@ impl ToListener for Url { } } -impl ToListener for String { +impl ToListener for String +where + State: Clone + Send + Sync + 'static, +{ type Listener = ParsedListener; fn to_listener(self) -> io::Result { ToListener::::to_listener(self.as_str()) } } -impl ToListener for &str { +impl ToListener for &str +where + State: Clone + Send + Sync + 'static, +{ type Listener = ParsedListener; fn to_listener(self) -> io::Result { @@ -75,7 +84,10 @@ impl ToListener for &str { } #[cfg(unix)] -impl ToListener for async_std::path::PathBuf { +impl ToListener for async_std::path::PathBuf +where + State: Clone + Send + Sync + 'static, +{ type Listener = UnixListener; fn to_listener(self) -> io::Result { Ok(UnixListener::from_path(self)) @@ -83,28 +95,40 @@ impl ToListener for async_std::path } #[cfg(unix)] -impl ToListener for std::path::PathBuf { +impl ToListener for std::path::PathBuf +where + State: Clone + Send + Sync + 'static, +{ type Listener = UnixListener; fn to_listener(self) -> io::Result { Ok(UnixListener::from_path(self)) } } -impl ToListener for async_std::net::TcpListener { +impl ToListener for async_std::net::TcpListener +where + State: Clone + Send + Sync + 'static, +{ type Listener = TcpListener; fn to_listener(self) -> io::Result { Ok(TcpListener::from_listener(self)) } } -impl ToListener for std::net::TcpListener { +impl ToListener for std::net::TcpListener +where + State: Clone + Send + Sync + 'static, +{ type Listener = TcpListener; fn to_listener(self) -> io::Result { Ok(TcpListener::from_listener(self)) } } -impl ToListener for (&str, u16) { +impl ToListener for (&str, u16) +where + State: Clone + Send + Sync + 'static, +{ type Listener = TcpListener; fn to_listener(self) -> io::Result { @@ -113,8 +137,9 @@ impl ToListener for (&str, u16) { } #[cfg(unix)] -impl ToListener - for async_std::os::unix::net::UnixListener +impl ToListener for async_std::os::unix::net::UnixListener +where + State: Clone + Send + Sync + 'static, { type Listener = UnixListener; fn to_listener(self) -> io::Result { @@ -123,14 +148,20 @@ impl ToListener } #[cfg(unix)] -impl ToListener for std::os::unix::net::UnixListener { +impl ToListener for std::os::unix::net::UnixListener +where + State: Clone + Send + Sync + 'static, +{ type Listener = UnixListener; fn to_listener(self) -> io::Result { Ok(UnixListener::from_listener(self)) } } -impl ToListener for TcpListener { +impl ToListener for TcpListener +where + State: Clone + Send + Sync + 'static, +{ type Listener = Self; fn to_listener(self) -> io::Result { Ok(self) @@ -138,42 +169,61 @@ impl ToListener for TcpListener { } #[cfg(unix)] -impl ToListener for UnixListener { +impl ToListener for UnixListener +where + State: Clone + Send + Sync + 'static, +{ type Listener = Self; fn to_listener(self) -> io::Result { Ok(self) } } -impl ToListener for ConcurrentListener { +impl ToListener for ConcurrentListener +where + State: Clone + Send + Sync + 'static, +{ type Listener = Self; fn to_listener(self) -> io::Result { Ok(self) } } -impl ToListener for ParsedListener { +impl ToListener for ParsedListener +where + State: Clone + Send + Sync + 'static, +{ type Listener = Self; fn to_listener(self) -> io::Result { Ok(self) } } -impl ToListener for FailoverListener { +impl ToListener for FailoverListener +where + State: Clone + Send + Sync + 'static, +{ type Listener = Self; fn to_listener(self) -> io::Result { Ok(self) } } -impl ToListener for std::net::SocketAddr { +impl ToListener for std::net::SocketAddr +where + State: Clone + Send + Sync + 'static, +{ type Listener = TcpListener; fn to_listener(self) -> io::Result { Ok(TcpListener::from_addrs(vec![self])) } } -impl, State: Clone + Send + Sync + 'static> ToListener for Vec { +impl ToListener for Vec +where + L: ToListener, + State: Clone + Send + Sync + 'static, +{ type Listener = ConcurrentListener; fn to_listener(self) -> io::Result { let mut concurrent_listener = ConcurrentListener::new(); @@ -188,7 +238,7 @@ impl, State: Clone + Send + Sync + 'static> ToListener>(listener: TL) -> io::Result { + fn listen>(listener: L) -> io::Result { listener.to_listener() } diff --git a/src/server.rs b/src/server.rs index 853d39e34..f7825f18b 100644 --- a/src/server.rs +++ b/src/server.rs @@ -67,7 +67,10 @@ impl Default for Server<()> { } } -impl Server { +impl Server +where + State: Clone + Send + Sync + 'static, +{ /// Create a new Tide server with shared application scoped state. /// /// Application scoped state is useful for storing items @@ -185,8 +188,9 @@ impl Server { self } - /// Asynchronously serve the app with the supplied listener. For more details, see [Listener] and [ToListener] - pub async fn listen>(self, listener: TL) -> io::Result<()> { + /// Asynchronously serve the app with the supplied listener. For more + /// details, see [Listener] and [ToListener] + pub async fn listen>(self, listener: L) -> io::Result<()> { listener.to_listener()?.listen(self).await } From 1cb21cf0eab7920c0adc4dd0bc19f825708f3e2d Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Mon, 9 Nov 2020 16:43:38 +0100 Subject: [PATCH 02/10] Add bind to Listener trait --- src/listener/concurrent_listener.rs | 11 +++++-- src/listener/failover_listener.rs | 51 +++++++++++++++++++++-------- src/listener/mod.rs | 22 ++++++++++++- src/listener/parsed_listener.rs | 18 +++++++--- src/listener/tcp_listener.rs | 19 +++++------ src/server.rs | 4 ++- 6 files changed, 94 insertions(+), 31 deletions(-) diff --git a/src/listener/concurrent_listener.rs b/src/listener/concurrent_listener.rs index 4fdb014bb..c2b167118 100644 --- a/src/listener/concurrent_listener.rs +++ b/src/listener/concurrent_listener.rs @@ -88,12 +88,19 @@ impl Listener for ConcurrentListener where State: Clone + Send + Sync + 'static, { - async fn listen(&mut self, app: Server) -> io::Result<()> { + async fn bind(&mut self) -> io::Result<()> { + for listener in self.0.iter_mut() { + listener.bind().await?; + } + Ok(()) + } + + async fn accept(&mut self, app: Server) -> io::Result<()> { let mut futures_unordered = FuturesUnordered::new(); for listener in self.0.iter_mut() { let app = app.clone(); - futures_unordered.push(listener.listen(app)); + futures_unordered.push(listener.accept(app)); } while let Some(result) = futures_unordered.next().await { diff --git a/src/listener/failover_listener.rs b/src/listener/failover_listener.rs index 340d2c72a..c1ee98c3e 100644 --- a/src/listener/failover_listener.rs +++ b/src/listener/failover_listener.rs @@ -31,9 +31,11 @@ use async_std::io; /// }) ///} ///``` - #[derive(Default)] -pub struct FailoverListener(Vec>>); +pub struct FailoverListener { + listeners: Vec>>>, + index: Option, +} impl FailoverListener where @@ -41,7 +43,10 @@ where { /// creates a new FailoverListener pub fn new() -> Self { - Self(vec![]) + Self { + listeners: vec![], + index: None, + } } /// Adds any [`ToListener`](crate::listener::ToListener) to this @@ -64,7 +69,7 @@ where where L: ToListener, { - self.0.push(Box::new(listener.to_listener()?)); + self.listeners.push(Some(Box::new(listener.to_listener()?))); Ok(()) } @@ -93,13 +98,16 @@ impl Listener for FailoverListener where State: Clone + Send + Sync + 'static, { - async fn listen(&mut self, app: Server) -> io::Result<()> { - for listener in self.0.iter_mut() { - let app = app.clone(); - match listener.listen(app).await { - Ok(_) => return Ok(()), + async fn bind(&mut self) -> io::Result<()> { + for (index, listener) in self.listeners.iter_mut().enumerate() { + let listener = listener.as_deref_mut().expect("bind called twice"); + match listener.bind().await { + Ok(_) => { + self.index = Some(index); + return Ok(()); + } Err(e) => { - crate::log::info!("unable to listen", { + crate::log::info!("unable to bind", { listener: listener.to_string(), error: e.to_string() }); @@ -112,20 +120,37 @@ where "unable to bind to any supplied listener spec", )) } + + async fn accept(&mut self, app: Server) -> io::Result<()> { + match self.index { + Some(index) => { + let mut listener = self.listeners[index].take().expect("accept called twice"); + listener.accept(app).await?; + Ok(()) + } + None => Err(io::Error::new( + io::ErrorKind::AddrNotAvailable, + "unable to listen to any supplied listener spec", + )), + } + } } impl Debug for FailoverListener { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - write!(f, "{:?}", self.0) + write!(f, "{:?}", self.listeners) } } impl Display for FailoverListener { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { let string = self - .0 + .listeners .iter() - .map(|l| l.to_string()) + .map(|l| match l { + Some(l) => l.to_string(), + None => String::new(), + }) .collect::>() .join(", "); diff --git a/src/listener/mod.rs b/src/listener/mod.rs index 82033de15..47df23bb8 100644 --- a/src/listener/mod.rs +++ b/src/listener/mod.rs @@ -34,10 +34,30 @@ pub(crate) use unix_listener::UnixListener; pub trait Listener: std::fmt::Debug + std::fmt::Display + Send + Sync + 'static { + /// Bind the listener to the specified address. + async fn bind(&mut self) -> io::Result<()>; + /// This is the primary entrypoint for the Listener trait. listen /// is called exactly once, and is expected to spawn tasks for /// each incoming connection. - async fn listen(&mut self, app: Server) -> io::Result<()>; + /// + /// Accept a new incoming connection from this listener. + async fn accept(&mut self, app: Server) -> io::Result<()>; +} + +#[async_trait::async_trait] +impl Listener for Box +where + L: Listener, + State: Send + Sync + 'static, +{ + async fn bind(&mut self) -> io::Result<()> { + self.as_mut().bind().await + } + + async fn accept(&mut self, app: Server) -> io::Result<()> { + self.as_mut().accept(app).await + } } /// crate-internal shared logic used by tcp and unix listeners to diff --git a/src/listener/parsed_listener.rs b/src/listener/parsed_listener.rs index 4b0e186a9..8b8b86888 100644 --- a/src/listener/parsed_listener.rs +++ b/src/listener/parsed_listener.rs @@ -31,12 +31,22 @@ impl Display for ParsedListener { } #[async_trait::async_trait] -impl Listener for ParsedListener { - async fn listen(&mut self, app: Server) -> io::Result<()> { +impl Listener for ParsedListener +where + State: Clone + Send + Sync + 'static, +{ + async fn bind(&mut self) -> io::Result<()> { match self { #[cfg(unix)] - Self::Unix(u) => u.listen(app).await, - Self::Tcp(t) => t.listen(app).await, + Self::Unix(u) => u.bind().await, + Self::Tcp(t) => t.bind().await, + } + } + async fn accept(&mut self, app: Server) -> io::Result<()> { + match self { + #[cfg(unix)] + Self::Unix(u) => u.accept(app).await, + Self::Tcp(t) => t.accept(app).await, } } } diff --git a/src/listener/tcp_listener.rs b/src/listener/tcp_listener.rs index db68530ee..6b54dfa9d 100644 --- a/src/listener/tcp_listener.rs +++ b/src/listener/tcp_listener.rs @@ -42,13 +42,6 @@ impl TcpListener { )), } } - - async fn connect(&mut self) -> io::Result<()> { - if let Self::FromAddrs(addrs, listener @ None) = self { - *listener = Some(net::TcpListener::bind(addrs.as_slice()).await?); - } - Ok(()) - } } fn handle_tcp(app: Server, stream: TcpStream) { @@ -70,10 +63,16 @@ fn handle_tcp(app: Server, stream: #[async_trait::async_trait] impl Listener for TcpListener { - async fn listen(&mut self, app: Server) -> io::Result<()> { - self.connect().await?; - let listener = self.listener()?; + async fn bind(&mut self) -> io::Result<()> { + if let Self::FromAddrs(addrs, listener @ None) = self { + *listener = Some(net::TcpListener::bind(addrs.as_slice()).await?); + } crate::log::info!("Server listening on {}", self); + Ok(()) + } + + async fn accept(&mut self, app: Server) -> io::Result<()> { + let listener = self.listener()?; let mut incoming = listener.incoming(); diff --git a/src/server.rs b/src/server.rs index f7825f18b..91c5598bd 100644 --- a/src/server.rs +++ b/src/server.rs @@ -191,7 +191,9 @@ where /// Asynchronously serve the app with the supplied listener. For more /// details, see [Listener] and [ToListener] pub async fn listen>(self, listener: L) -> io::Result<()> { - listener.to_listener()?.listen(self).await + let mut listener = listener.to_listener()?; + listener.bind().await?; + listener.accept(self).await } /// Respond to a `Request` with a `Response`. From 2e96b0f65544ba126a10762064b76428a3481dd1 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Wed, 11 Nov 2020 13:32:53 +0100 Subject: [PATCH 03/10] First rewrite works --- src/listener/concurrent_listener.rs | 25 ++++--- src/listener/failover_listener.rs | 8 +- src/listener/mod.rs | 19 +++-- src/listener/parsed_listener.rs | 34 ++++++--- src/listener/tcp_listener.rs | 110 ++++++++++++++++++---------- src/listener/to_listener_impls.rs | 56 +++----------- src/server.rs | 19 ++++- 7 files changed, 148 insertions(+), 123 deletions(-) diff --git a/src/listener/concurrent_listener.rs b/src/listener/concurrent_listener.rs index c2b167118..c06462c63 100644 --- a/src/listener/concurrent_listener.rs +++ b/src/listener/concurrent_listener.rs @@ -33,12 +33,14 @@ use futures_util::stream::{futures_unordered::FuturesUnordered, StreamExt}; ///``` #[derive(Default)] -pub struct ConcurrentListener(Vec>>); +pub struct ConcurrentListener { + listeners: Vec>>, +} impl ConcurrentListener { /// creates a new ConcurrentListener pub fn new() -> Self { - Self(vec![]) + Self { listeners: vec![] } } /// Adds any [`ToListener`](crate::listener::ToListener) to this @@ -59,7 +61,7 @@ impl ConcurrentListener { where L: ToListener, { - self.0.push(Box::new(listener.to_listener()?)); + self.listeners.push(Box::new(listener.to_listener()?)); Ok(()) } @@ -88,19 +90,18 @@ impl Listener for ConcurrentListener where State: Clone + Send + Sync + 'static, { - async fn bind(&mut self) -> io::Result<()> { - for listener in self.0.iter_mut() { - listener.bind().await?; + async fn bind(&mut self, app: Server) -> io::Result<()> { + for listener in self.listeners.iter_mut() { + listener.bind(app.clone()).await?; } Ok(()) } - async fn accept(&mut self, app: Server) -> io::Result<()> { + async fn accept(&mut self) -> io::Result<()> { let mut futures_unordered = FuturesUnordered::new(); - for listener in self.0.iter_mut() { - let app = app.clone(); - futures_unordered.push(listener.accept(app)); + for listener in self.listeners.iter_mut() { + futures_unordered.push(listener.accept()); } while let Some(result) = futures_unordered.next().await { @@ -112,14 +113,14 @@ where impl Debug for ConcurrentListener { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - write!(f, "{:?}", self.0) + write!(f, "{:?}", self.listeners) } } impl Display for ConcurrentListener { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { let string = self - .0 + .listeners .iter() .map(|l| l.to_string()) .collect::>() diff --git a/src/listener/failover_listener.rs b/src/listener/failover_listener.rs index c1ee98c3e..7bf6f891d 100644 --- a/src/listener/failover_listener.rs +++ b/src/listener/failover_listener.rs @@ -98,10 +98,10 @@ impl Listener for FailoverListener where State: Clone + Send + Sync + 'static, { - async fn bind(&mut self) -> io::Result<()> { + async fn bind(&mut self, app: Server) -> io::Result<()> { for (index, listener) in self.listeners.iter_mut().enumerate() { let listener = listener.as_deref_mut().expect("bind called twice"); - match listener.bind().await { + match listener.bind(app.clone()).await { Ok(_) => { self.index = Some(index); return Ok(()); @@ -121,11 +121,11 @@ where )) } - async fn accept(&mut self, app: Server) -> io::Result<()> { + async fn accept(&mut self) -> io::Result<()> { match self.index { Some(index) => { let mut listener = self.listeners[index].take().expect("accept called twice"); - listener.accept(app).await?; + listener.accept().await?; Ok(()) } None => Err(io::Error::new( diff --git a/src/listener/mod.rs b/src/listener/mod.rs index 47df23bb8..07749679c 100644 --- a/src/listener/mod.rs +++ b/src/listener/mod.rs @@ -12,6 +12,8 @@ mod to_listener_impls; #[cfg(all(unix, feature = "h1-server"))] mod unix_listener; +use std::fmt::{Debug, Display}; + use crate::Server; use async_std::io; @@ -31,18 +33,19 @@ pub(crate) use unix_listener::UnixListener; /// you will also need to implement at least one [`ToListener`](crate::listener::ToListener) that /// outputs your Listener type. #[async_trait::async_trait] -pub trait Listener: - std::fmt::Debug + std::fmt::Display + Send + Sync + 'static +pub trait Listener: Debug + Display + Send + Sync + 'static +where + State: Send + Sync + 'static, { /// Bind the listener to the specified address. - async fn bind(&mut self) -> io::Result<()>; + async fn bind(&mut self, app: Server) -> io::Result<()>; /// This is the primary entrypoint for the Listener trait. listen /// is called exactly once, and is expected to spawn tasks for /// each incoming connection. /// /// Accept a new incoming connection from this listener. - async fn accept(&mut self, app: Server) -> io::Result<()>; + async fn accept(&mut self) -> io::Result<()>; } #[async_trait::async_trait] @@ -51,12 +54,12 @@ where L: Listener, State: Send + Sync + 'static, { - async fn bind(&mut self) -> io::Result<()> { - self.as_mut().bind().await + async fn bind(&mut self, app: Server) -> io::Result<()> { + self.as_mut().bind(app).await } - async fn accept(&mut self, app: Server) -> io::Result<()> { - self.as_mut().accept(app).await + async fn accept(&mut self) -> io::Result<()> { + self.as_mut().accept().await } } diff --git a/src/listener/parsed_listener.rs b/src/listener/parsed_listener.rs index 8b8b86888..e58b02a30 100644 --- a/src/listener/parsed_listener.rs +++ b/src/listener/parsed_listener.rs @@ -4,7 +4,7 @@ use super::{Listener, TcpListener}; use crate::Server; use async_std::io; -use std::fmt::{self, Display, Formatter}; +use std::fmt::{self, Debug, Display, Formatter}; /// This is an enum that contains variants for each of the listeners /// that can be parsed from a string. This is used as the associated @@ -13,14 +13,23 @@ use std::fmt::{self, Display, Formatter}; /// /// This is currently crate-visible only, and tide users are expected /// to create these through [ToListener](crate::ToListener) conversions. -#[derive(Debug)] -pub enum ParsedListener { +pub enum ParsedListener { #[cfg(unix)] Unix(UnixListener), - Tcp(TcpListener), + Tcp(TcpListener), } -impl Display for ParsedListener { +impl fmt::Debug for ParsedListener { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + match self { + #[cfg(unix)] + ParsedListener::Unix(unix) => Debug::fmt(tcp, f), + ParsedListener::Tcp(tcp) => Debug::fmt(tcp, f), + } + } +} + +impl Display for ParsedListener { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { match self { #[cfg(unix)] @@ -31,22 +40,23 @@ impl Display for ParsedListener { } #[async_trait::async_trait] -impl Listener for ParsedListener +impl Listener for ParsedListener where State: Clone + Send + Sync + 'static, { - async fn bind(&mut self) -> io::Result<()> { + async fn bind(&mut self, server: Server) -> io::Result<()> { match self { #[cfg(unix)] - Self::Unix(u) => u.bind().await, - Self::Tcp(t) => t.bind().await, + Self::Unix(u) => u.bind(server).await, + Self::Tcp(t) => t.bind(server).await, } } - async fn accept(&mut self, app: Server) -> io::Result<()> { + + async fn accept(&mut self) -> io::Result<()> { match self { #[cfg(unix)] - Self::Unix(u) => u.accept(app).await, - Self::Tcp(t) => t.accept(app).await, + Self::Unix(u) => u.accept().await, + Self::Tcp(t) => t.accept().await, } } } diff --git a/src/listener/tcp_listener.rs b/src/listener/tcp_listener.rs index 6b54dfa9d..1c6a0e2df 100644 --- a/src/listener/tcp_listener.rs +++ b/src/listener/tcp_listener.rs @@ -17,31 +17,56 @@ use async_std::{io, task}; /// /// This is currently crate-visible only, and tide users are expected /// to create these through [ToListener](crate::ToListener) conversions. -#[derive(Debug)] -pub enum TcpListener { - FromListener(net::TcpListener), - FromAddrs(Vec, Option), +pub struct TcpListener { + addrs: Option>, + listener: Option, + server: Option>, } -impl TcpListener { - pub fn from_addrs(addrs: Vec) -> Self { - Self::FromAddrs(addrs, None) +impl fmt::Debug for TcpListener { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("TcpListener") + .field(&"listener", &self.listener) + .field(&"addrs", &self.addrs) + .field( + &"server", + if self.server.is_some() { + &"Some(Server)" + } else { + &"None" + }, + ) + .finish() } +} - pub fn from_listener(tcp_listener: impl Into) -> Self { - Self::FromListener(tcp_listener.into()) +impl TcpListener { + pub fn from_addrs(addrs: Vec) -> Self { + Self { + addrs: Some(addrs), + listener: None, + server: None, + } } - fn listener(&self) -> io::Result<&net::TcpListener> { - match self { - Self::FromAddrs(_, Some(listener)) => Ok(listener), - Self::FromListener(listener) => Ok(listener), - Self::FromAddrs(addrs, None) => Err(io::Error::new( - io::ErrorKind::AddrNotAvailable, - format!("unable to connect to {:?}", addrs), - )), + pub fn from_listener(tcp_listener: impl Into) -> Self { + Self { + addrs: None, + listener: Some(tcp_listener.into()), + server: None, } } + + // fn listener(&self) -> io::Result<&net::TcpListener> { + // if let Some(listener) = self.listener { + // Ok(&listener) + // } else { + // Err(io::Error::new( + // io::ErrorKind::AddrNotAvailable, + // format!("unable to connect to {:?}", self.addrs.unwrap()), + // )) + // } + // } } fn handle_tcp(app: Server, stream: TcpStream) { @@ -62,17 +87,35 @@ fn handle_tcp(app: Server, stream: } #[async_trait::async_trait] -impl Listener for TcpListener { - async fn bind(&mut self) -> io::Result<()> { - if let Self::FromAddrs(addrs, listener @ None) = self { - *listener = Some(net::TcpListener::bind(addrs.as_slice()).await?); +impl Listener for TcpListener +where + State: Clone + Send + Sync + 'static, +{ + async fn bind(&mut self, server: Server) -> io::Result<()> { + assert!(self.server.is_none(), "`bind` should only be called once"); + self.server = Some(server); + + if let None = self.listener { + let addrs = self + .addrs + .take() + .expect("`bind` should only be called once"); + let listener = net::TcpListener::bind(addrs.as_slice()).await?; + self.listener = Some(listener); } crate::log::info!("Server listening on {}", self); Ok(()) } - async fn accept(&mut self, app: Server) -> io::Result<()> { - let listener = self.listener()?; + async fn accept(&mut self) -> io::Result<()> { + let server = self + .server + .take() + .expect("`Listener::bind` must be called before `Listener::accept`"); + let listener = self + .listener + .take() + .expect("`Listener::bind` must be called before `Listener::accept`"); let mut incoming = listener.incoming(); @@ -87,7 +130,7 @@ impl Listener for TcpListener { } Ok(stream) => { - handle_tcp(app.clone(), stream); + handle_tcp(server.clone(), stream); } }; } @@ -95,27 +138,20 @@ impl Listener for TcpListener { } } -impl Display for TcpListener { +impl Display for TcpListener { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - match self { - Self::FromListener(l) | Self::FromAddrs(_, Some(l)) => write!( - f, - "http://{}", - l.local_addr() - .ok() - .map(|a| a.to_string()) - .as_deref() - .unwrap_or("[unknown]") - ), - Self::FromAddrs(addrs, None) => write!( + match &self.listener { + Some(listener) => write!( f, "{}", - addrs + listener + .local_addr() .iter() .map(|a| format!("http://{}", a)) .collect::>() .join(", ") ), + None => write!(f, "Not listening. Did you forget to call `Listener::bind`?"), } } } diff --git a/src/listener/to_listener_impls.rs b/src/listener/to_listener_impls.rs index de7bb5fa7..40c010264 100644 --- a/src/listener/to_listener_impls.rs +++ b/src/listener/to_listener_impls.rs @@ -9,7 +9,7 @@ impl ToListener for Url where State: Clone + Send + Sync + 'static, { - type Listener = ParsedListener; + type Listener = ParsedListener; fn to_listener(self) -> io::Result { match self.scheme() { @@ -55,7 +55,7 @@ impl ToListener for String where State: Clone + Send + Sync + 'static, { - type Listener = ParsedListener; + type Listener = ParsedListener; fn to_listener(self) -> io::Result { ToListener::::to_listener(self.as_str()) } @@ -65,7 +65,7 @@ impl ToListener for &str where State: Clone + Send + Sync + 'static, { - type Listener = ParsedListener; + type Listener = ParsedListener; fn to_listener(self) -> io::Result { if let Ok(socket_addrs) = self.to_socket_addrs() { @@ -109,7 +109,7 @@ impl ToListener for async_std::net::TcpListener where State: Clone + Send + Sync + 'static, { - type Listener = TcpListener; + type Listener = TcpListener; fn to_listener(self) -> io::Result { Ok(TcpListener::from_listener(self)) } @@ -119,7 +119,7 @@ impl ToListener for std::net::TcpListener where State: Clone + Send + Sync + 'static, { - type Listener = TcpListener; + type Listener = TcpListener; fn to_listener(self) -> io::Result { Ok(TcpListener::from_listener(self)) } @@ -129,7 +129,7 @@ impl ToListener for (&str, u16) where State: Clone + Send + Sync + 'static, { - type Listener = TcpListener; + type Listener = TcpListener; fn to_listener(self) -> io::Result { Ok(TcpListener::from_addrs(self.to_socket_addrs()?.collect())) @@ -158,7 +158,7 @@ where } } -impl ToListener for TcpListener +impl ToListener for TcpListener where State: Clone + Send + Sync + 'static, { @@ -189,7 +189,7 @@ where } } -impl ToListener for ParsedListener +impl ToListener for ParsedListener where State: Clone + Send + Sync + 'static, { @@ -213,7 +213,7 @@ impl ToListener for std::net::SocketAddr where State: Clone + Send + Sync + 'static, { - type Listener = TcpListener; + type Listener = TcpListener; fn to_listener(self) -> io::Result { Ok(TcpListener::from_addrs(vec![self])) } @@ -245,48 +245,24 @@ mod parse_tests { #[test] fn url_to_tcp_listener() { let listener = listen(Url::parse("http://localhost:8000").unwrap()).unwrap(); - assert!(matches!( - listener, - ParsedListener::Tcp(TcpListener::FromAddrs(_, None)) - )); - assert!(listener.to_string().contains("http://127.0.0.1:8000")); + assert!(listener.to_string().contains("http://127.0.0.1:8000")); let listener = listen(Url::parse("tcp://localhost:8000").unwrap()).unwrap(); - assert!(matches!( - listener, - ParsedListener::Tcp(TcpListener::FromAddrs(_, None)) - )); assert!(listener.to_string().contains("http://127.0.0.1:8000")); let listener = listen(Url::parse("http://127.0.0.1").unwrap()).unwrap(); - assert!(matches!( - listener, - ParsedListener::Tcp(TcpListener::FromAddrs(_, None)) - )); assert_eq!(listener.to_string(), "http://127.0.0.1:80"); } #[test] fn str_url_to_tcp_listener() { let listener = listen("tcp://localhost:8000").unwrap(); - assert!(matches!( - listener, - ParsedListener::Tcp(TcpListener::FromAddrs(_, None)) - )); assert!(listener.to_string().contains("http://127.0.0.1:8000")); let listener = listen("tcp://localhost:8000").unwrap(); - assert!(matches!( - listener, - ParsedListener::Tcp(TcpListener::FromAddrs(_, None)) - )); assert!(listener.to_string().contains("http://127.0.0.1:8000")); let listener = listen("tcp://127.0.0.1").unwrap(); - assert!(matches!( - listener, - ParsedListener::Tcp(TcpListener::FromAddrs(_, None)) - )); assert_eq!(listener.to_string(), "http://127.0.0.1:80"); } @@ -366,24 +342,12 @@ mod parse_tests { #[test] fn str_to_socket_addr() { let listener = listen("127.0.0.1:1312").unwrap(); - assert!(matches!( - listener, - ParsedListener::Tcp(TcpListener::FromAddrs(_, None)) - )); assert_eq!("http://127.0.0.1:1312", listener.to_string()); let listener = listen("[::1]:1312").unwrap(); - assert!(matches!( - listener, - ParsedListener::Tcp(TcpListener::FromAddrs(_, None)) - )); assert_eq!("http://[::1]:1312", listener.to_string()); let listener = listen("localhost:3000").unwrap(); - assert!(matches!( - listener, - ParsedListener::Tcp(TcpListener::FromAddrs(_, None)) - )); assert!(listener.to_string().contains(":3000")); } diff --git a/src/server.rs b/src/server.rs index 91c5598bd..71b6e832c 100644 --- a/src/server.rs +++ b/src/server.rs @@ -188,12 +188,23 @@ where self } - /// Asynchronously serve the app with the supplied listener. For more - /// details, see [Listener] and [ToListener] + // /// Asynchronously serve the app with the supplied listener. For more + // /// details, see [Listener] and [ToListener] pub async fn listen>(self, listener: L) -> io::Result<()> { let mut listener = listener.to_listener()?; - listener.bind().await?; - listener.accept(self).await + listener.bind(self).await?; + listener.accept().await?; + Ok(()) + } + + /// Asynchronously bind the listener. + pub async fn bind>( + self, + listener: L, + ) -> io::Result<>::Listener> { + let mut listener = listener.to_listener()?; + listener.bind(self).await?; + Ok(listener) } /// Respond to a `Request` with a `Response`. From ad4490c2ff688998f37890129ae2f1e051539117 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Wed, 11 Nov 2020 13:48:25 +0100 Subject: [PATCH 04/10] Fix TCP tests --- src/listener/parsed_listener.rs | 2 +- src/listener/tcp_listener.rs | 34 ++++++++++++--------------------- 2 files changed, 13 insertions(+), 23 deletions(-) diff --git a/src/listener/parsed_listener.rs b/src/listener/parsed_listener.rs index e58b02a30..364119873 100644 --- a/src/listener/parsed_listener.rs +++ b/src/listener/parsed_listener.rs @@ -19,7 +19,7 @@ pub enum ParsedListener { Tcp(TcpListener), } -impl fmt::Debug for ParsedListener { +impl Debug for ParsedListener { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { match self { #[cfg(unix)] diff --git a/src/listener/tcp_listener.rs b/src/listener/tcp_listener.rs index 1c6a0e2df..e27863239 100644 --- a/src/listener/tcp_listener.rs +++ b/src/listener/tcp_listener.rs @@ -56,17 +56,6 @@ impl TcpListener { server: None, } } - - // fn listener(&self) -> io::Result<&net::TcpListener> { - // if let Some(listener) = self.listener { - // Ok(&listener) - // } else { - // Err(io::Error::new( - // io::ErrorKind::AddrNotAvailable, - // format!("unable to connect to {:?}", self.addrs.unwrap()), - // )) - // } - // } } fn handle_tcp(app: Server, stream: TcpStream) { @@ -140,18 +129,19 @@ where impl Display for TcpListener { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + let http_fmt = |a| format!("http://{}", a); match &self.listener { - Some(listener) => write!( - f, - "{}", - listener - .local_addr() - .iter() - .map(|a| format!("http://{}", a)) - .collect::>() - .join(", ") - ), - None => write!(f, "Not listening. Did you forget to call `Listener::bind`?"), + Some(listener) => { + let addr = listener.local_addr().expect("Could not get local addr"); + write!(f, "{}", http_fmt(&addr)) + } + None => match &self.addrs { + Some(addrs) => { + let addrs = addrs.iter().map(http_fmt).collect::>().join(", "); + write!(f, "{}", addrs) + } + None => write!(f, "Not listening. Did you forget to call `Listener::bind`?"), + }, } } } From e6b05c5ac1aaa5ab77ef136b0879ec543dfe4ac1 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Wed, 11 Nov 2020 13:57:07 +0100 Subject: [PATCH 05/10] docs --- src/listener/mod.rs | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/src/listener/mod.rs b/src/listener/mod.rs index 07749679c..ddcab5e9f 100644 --- a/src/listener/mod.rs +++ b/src/listener/mod.rs @@ -14,8 +14,10 @@ mod unix_listener; use std::fmt::{Debug, Display}; -use crate::Server; use async_std::io; +use async_trait::async_trait; + +use crate::Server; pub use concurrent_listener::ConcurrentListener; pub use failover_listener::FailoverListener; @@ -28,27 +30,26 @@ pub(crate) use tcp_listener::TcpListener; #[cfg(all(unix, feature = "h1-server"))] pub(crate) use unix_listener::UnixListener; -/// The Listener trait represents an implementation of http transport -/// for a tide application. In order to provide a Listener to tide, -/// you will also need to implement at least one [`ToListener`](crate::listener::ToListener) that +/// The Listener trait represents an implementation of http transport for a tide +/// application. In order to provide a Listener to tide, you will also need to +/// implement at least one [`ToListener`](crate::listener::ToListener) that /// outputs your Listener type. -#[async_trait::async_trait] +#[async_trait] pub trait Listener: Debug + Display + Send + Sync + 'static where State: Send + Sync + 'static, { - /// Bind the listener to the specified address. + /// Bind the listener. This starts the listening process by opening the + /// necessary network ports, but not yet accepting incoming connections. This + /// method must be called before `accept`. async fn bind(&mut self, app: Server) -> io::Result<()>; - /// This is the primary entrypoint for the Listener trait. listen - /// is called exactly once, and is expected to spawn tasks for - /// each incoming connection. - /// - /// Accept a new incoming connection from this listener. + /// Start accepting incoming connections. This method must be called only + /// after `bind` has succeeded. async fn accept(&mut self) -> io::Result<()>; } -#[async_trait::async_trait] +#[async_trait] impl Listener for Box where L: Listener, From 4c493f815cb83074fb80f5084661835d74c4af4d Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Wed, 11 Nov 2020 17:34:15 +0100 Subject: [PATCH 06/10] Add `ListenInfo` instance --- src/listener/concurrent_listener.rs | 10 +++++- src/listener/failover_listener.rs | 12 +++++++ src/listener/mod.rs | 55 +++++++++++++++++++++++++++++ src/listener/parsed_listener.rs | 10 +++++- src/listener/tcp_listener.rs | 54 ++++++++++++++++++---------- src/prelude.rs | 1 + src/server.rs | 50 ++++++++++++++++++++++++-- 7 files changed, 169 insertions(+), 23 deletions(-) diff --git a/src/listener/concurrent_listener.rs b/src/listener/concurrent_listener.rs index c06462c63..486b21e68 100644 --- a/src/listener/concurrent_listener.rs +++ b/src/listener/concurrent_listener.rs @@ -1,4 +1,4 @@ -use crate::listener::{Listener, ToListener}; +use crate::listener::{ListenInfo, Listener, ToListener}; use crate::Server; use std::fmt::{self, Debug, Display, Formatter}; @@ -109,6 +109,14 @@ where } Ok(()) } + + fn info(&self) -> Vec { + self.listeners + .iter() + .map(|listener| listener.info().into_iter()) + .flatten() + .collect() + } } impl Debug for ConcurrentListener { diff --git a/src/listener/failover_listener.rs b/src/listener/failover_listener.rs index 7bf6f891d..d81658c77 100644 --- a/src/listener/failover_listener.rs +++ b/src/listener/failover_listener.rs @@ -5,6 +5,8 @@ use std::fmt::{self, Debug, Display, Formatter}; use async_std::io; +use crate::listener::ListenInfo; + /// FailoverListener allows tide to attempt to listen in a sequential /// order to any number of ports/addresses. The first successful /// listener is used. @@ -134,6 +136,16 @@ where )), } } + + fn info(&self) -> Vec { + match self.index { + Some(index) => match self.listeners.get(index) { + Some(Some(listener)) => listener.info(), + _ => vec![], + }, + None => vec![], + } + } } impl Debug for FailoverListener { diff --git a/src/listener/mod.rs b/src/listener/mod.rs index ddcab5e9f..2d469872d 100644 --- a/src/listener/mod.rs +++ b/src/listener/mod.rs @@ -47,6 +47,10 @@ where /// Start accepting incoming connections. This method must be called only /// after `bind` has succeeded. async fn accept(&mut self) -> io::Result<()>; + + /// Expose information about the connection. This should always return valid + /// data after `bind` has succeeded. + fn info(&self) -> Vec; } #[async_trait] @@ -62,6 +66,10 @@ where async fn accept(&mut self) -> io::Result<()> { self.as_mut().accept().await } + + fn info(&self) -> Vec { + self.as_ref().info() + } } /// crate-internal shared logic used by tcp and unix listeners to @@ -76,3 +84,50 @@ pub(crate) fn is_transient_error(e: &io::Error) -> bool { ConnectionRefused | ConnectionAborted | ConnectionReset ) } + +/// Information about the `Listener`. +/// +/// See [`Report`](../listener/trait.Report.html) for more. +#[derive(Debug, Clone)] +pub struct ListenInfo { + conn_string: String, + transport: String, + tls: bool, +} + +impl ListenInfo { + /// Create a new instance of `ListenInfo`. + /// + /// This method should only be called when implementing a new Tide `listener` + /// strategy. + pub fn new(conn_string: String, transport: String, tls: bool) -> Self { + Self { + conn_string, + transport, + tls, + } + } + + /// Get the connection string. + pub fn connection(&self) -> &str { + self.conn_string.as_str() + } + + /// The underlying transport this connection listens on. + /// + /// Examples are: "tcp", "uds", etc. + pub fn transport(&self) -> &str { + self.transport.as_str() + } + + /// Is the connection encrypted? + pub fn is_encrypted(&self) -> bool { + self.tls + } +} + +impl Display for ListenInfo { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.conn_string) + } +} diff --git a/src/listener/parsed_listener.rs b/src/listener/parsed_listener.rs index 364119873..f4d845e0b 100644 --- a/src/listener/parsed_listener.rs +++ b/src/listener/parsed_listener.rs @@ -1,6 +1,6 @@ #[cfg(unix)] use super::UnixListener; -use super::{Listener, TcpListener}; +use super::{ListenInfo, Listener, TcpListener}; use crate::Server; use async_std::io; @@ -59,4 +59,12 @@ where Self::Tcp(t) => t.accept().await, } } + + fn info(&self) -> Vec { + match self { + #[cfg(unix)] + ParsedListener::Unix(unix) => unix.info(), + ParsedListener::Tcp(tcp) => tcp.info(), + } + } } diff --git a/src/listener/tcp_listener.rs b/src/listener/tcp_listener.rs index e27863239..8e755b208 100644 --- a/src/listener/tcp_listener.rs +++ b/src/listener/tcp_listener.rs @@ -1,4 +1,4 @@ -use super::is_transient_error; +use super::{is_transient_error, ListenInfo}; use crate::listener::Listener; use crate::{log, Server}; @@ -21,23 +21,7 @@ pub struct TcpListener { addrs: Option>, listener: Option, server: Option>, -} - -impl fmt::Debug for TcpListener { - fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - f.debug_struct("TcpListener") - .field(&"listener", &self.listener) - .field(&"addrs", &self.addrs) - .field( - &"server", - if self.server.is_some() { - &"Some(Server)" - } else { - &"None" - }, - ) - .finish() - } + info: Option, } impl TcpListener { @@ -46,6 +30,7 @@ impl TcpListener { addrs: Some(addrs), listener: None, server: None, + info: None, } } @@ -54,6 +39,7 @@ impl TcpListener { addrs: None, listener: Some(tcp_listener.into()), server: None, + info: None, } } } @@ -92,7 +78,13 @@ where let listener = net::TcpListener::bind(addrs.as_slice()).await?; self.listener = Some(listener); } - crate::log::info!("Server listening on {}", self); + + // Format the listen information. + let conn_string = format!("{}", self); + let transport = "tcp".to_owned(); + let tls = false; + self.info = Some(ListenInfo::new(conn_string, transport, tls)); + Ok(()) } @@ -125,6 +117,30 @@ where } Ok(()) } + + fn info(&self) -> Vec { + match &self.info { + Some(info) => vec![info.clone()], + None => vec![], + } + } +} + +impl fmt::Debug for TcpListener { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("TcpListener") + .field(&"listener", &self.listener) + .field(&"addrs", &self.addrs) + .field( + &"server", + if self.server.is_some() { + &"Some(Server)" + } else { + &"None" + }, + ) + .finish() + } } impl Display for TcpListener { diff --git a/src/prelude.rs b/src/prelude.rs index 739319081..185b90adf 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -1,3 +1,4 @@ //! The Tide prelude. pub use crate::convert::{json, Deserialize, Serialize}; +pub use crate::listener::Listener; pub use http_types::Status; diff --git a/src/server.rs b/src/server.rs index 71b6e832c..39a3ed776 100644 --- a/src/server.rs +++ b/src/server.rs @@ -188,16 +188,62 @@ where self } - // /// Asynchronously serve the app with the supplied listener. For more - // /// details, see [Listener] and [ToListener] + /// Asynchronously serve the app with the supplied listener. + /// + /// This is a shorthand for calling `Server::bind`, logging the `ListenInfo` + /// instances from `Listener::info`, and then calling `Listener::accept`. + /// + /// # Examples + /// + /// ```no_run + /// # use async_std::task::block_on; + /// # fn main() -> Result<(), std::io::Error> { block_on(async { + /// # + /// let mut app = tide::new(); + /// app.at("/").get(|_| async { Ok("Hello, world!") }); + /// app.listen("127.0.0.1:8080").await?; + /// # + /// # Ok(()) }) } + /// ``` pub async fn listen>(self, listener: L) -> io::Result<()> { let mut listener = listener.to_listener()?; listener.bind(self).await?; + for info in listener.info().iter() { + log::info!("Server listening on {}", info); + } listener.accept().await?; Ok(()) } /// Asynchronously bind the listener. + /// + /// Bind the listener. This starts the listening process by opening the + /// necessary network ports, but not yet accepting incoming connections. + /// `Listener::listen` should be called after this to start accepting + /// connections. + /// + /// When calling `Listener::info` multiple `ListenInfo` instances may be + /// returned. This is useful when using for example `ConcurrentListener` + /// which enables a single server to listen on muliple ports. + /// + /// # Examples + /// + /// ```no_run + /// # use async_std::task::block_on; + /// # fn main() -> Result<(), std::io::Error> { block_on(async { + /// # + /// use tide::prelude::*; + /// + /// let mut app = tide::new(); + /// app.at("/").get(|_| async { Ok("Hello, world!") }); + /// let mut listener = app.bind("127.0.0.1:8080").await?; + /// for info in listener.info().iter() { + /// println!("Server listening on {}", info); + /// } + /// listener.accept().await?; + /// # + /// # Ok(()) }) } + /// ``` pub async fn bind>( self, listener: L, From 245885cddcdcc3325aa5aeac944a784774c69cee Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Wed, 11 Nov 2020 18:00:00 +0100 Subject: [PATCH 07/10] impl unixlistener --- src/listener/parsed_listener.rs | 4 +- src/listener/to_listener_impls.rs | 22 +---- src/listener/unix_listener.rs | 157 +++++++++++++++++++----------- 3 files changed, 106 insertions(+), 77 deletions(-) diff --git a/src/listener/parsed_listener.rs b/src/listener/parsed_listener.rs index f4d845e0b..ad2926a10 100644 --- a/src/listener/parsed_listener.rs +++ b/src/listener/parsed_listener.rs @@ -15,7 +15,7 @@ use std::fmt::{self, Debug, Display, Formatter}; /// to create these through [ToListener](crate::ToListener) conversions. pub enum ParsedListener { #[cfg(unix)] - Unix(UnixListener), + Unix(UnixListener), Tcp(TcpListener), } @@ -23,7 +23,7 @@ impl Debug for ParsedListener { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { match self { #[cfg(unix)] - ParsedListener::Unix(unix) => Debug::fmt(tcp, f), + ParsedListener::Unix(unix) => Debug::fmt(unix, f), ParsedListener::Tcp(tcp) => Debug::fmt(tcp, f), } } diff --git a/src/listener/to_listener_impls.rs b/src/listener/to_listener_impls.rs index 40c010264..579b5d9ba 100644 --- a/src/listener/to_listener_impls.rs +++ b/src/listener/to_listener_impls.rs @@ -88,7 +88,7 @@ impl ToListener for async_std::path::PathBuf where State: Clone + Send + Sync + 'static, { - type Listener = UnixListener; + type Listener = UnixListener; fn to_listener(self) -> io::Result { Ok(UnixListener::from_path(self)) } @@ -99,7 +99,7 @@ impl ToListener for std::path::PathBuf where State: Clone + Send + Sync + 'static, { - type Listener = UnixListener; + type Listener = UnixListener; fn to_listener(self) -> io::Result { Ok(UnixListener::from_path(self)) } @@ -141,7 +141,7 @@ impl ToListener for async_std::os::unix::net::UnixListener where State: Clone + Send + Sync + 'static, { - type Listener = UnixListener; + type Listener = UnixListener; fn to_listener(self) -> io::Result { Ok(UnixListener::from_listener(self)) } @@ -152,7 +152,7 @@ impl ToListener for std::os::unix::net::UnixListener where State: Clone + Send + Sync + 'static, { - type Listener = UnixListener; + type Listener = UnixListener; fn to_listener(self) -> io::Result { Ok(UnixListener::from_listener(self)) } @@ -169,7 +169,7 @@ where } #[cfg(unix)] -impl ToListener for UnixListener +impl ToListener for UnixListener where State: Clone + Send + Sync + 'static, { @@ -273,24 +273,12 @@ mod parse_tests { #[test] fn str_url_to_unix_listener() { let listener = listen("http+unix:///var/run/tide/socket").unwrap(); - assert!(matches!( - listener, - ParsedListener::Unix(UnixListener::FromPath(_, None)) - )); assert_eq!("http+unix:///var/run/tide/socket", listener.to_string()); let listener = listen("http+unix://./socket").unwrap(); - assert!(matches!( - listener, - ParsedListener::Unix(UnixListener::FromPath(_, None)) - )); assert_eq!("http+unix://./socket", listener.to_string()); let listener = listen("http+unix://socket").unwrap(); - assert!(matches!( - listener, - ParsedListener::Unix(UnixListener::FromPath(_, None)) - )); assert_eq!("http+unix://socket", listener.to_string()); } diff --git a/src/listener/unix_listener.rs b/src/listener/unix_listener.rs index 72aff852d..48564f44c 100644 --- a/src/listener/unix_listener.rs +++ b/src/listener/unix_listener.rs @@ -1,4 +1,4 @@ -use super::is_transient_error; +use super::{is_transient_error, ListenInfo}; use crate::listener::Listener; use crate::{log, Server}; @@ -7,63 +7,44 @@ use std::fmt::{self, Display, Formatter}; use async_std::os::unix::net::{self, SocketAddr, UnixStream}; use async_std::prelude::*; -use async_std::{io, path::PathBuf, task}; +use async_std::{io, task}; +use async_std::path::PathBuf; /// This represents a tide [Listener](crate::listener::Listener) that /// wraps an [async_std::os::unix::net::UnixListener]. It is implemented as an /// enum in order to allow creation of a tide::listener::UnixListener -/// from a [PathBuf] that has not yet been opened/bound OR from a bound +/// from a [`PathBuf`] spec that has not yet been bound OR from a bound /// [async_std::os::unix::net::UnixListener]. /// /// This is currently crate-visible only, and tide users are expected /// to create these through [ToListener](crate::ToListener) conversions. -#[derive(Debug)] -pub enum UnixListener { - FromPath(PathBuf, Option), - FromListener(net::UnixListener), +pub struct UnixListener { + path: Option, + listener: Option, + server: Option>, + info: Option, } -impl UnixListener { +impl UnixListener { pub fn from_path(path: impl Into) -> Self { - Self::FromPath(path.into(), None) - } - - pub fn from_listener(listener: impl Into) -> Self { - Self::FromListener(listener.into()) - } - - fn listener(&self) -> io::Result<&net::UnixListener> { - match self { - Self::FromPath(_, Some(listener)) => Ok(listener), - Self::FromListener(listener) => Ok(listener), - Self::FromPath(path, None) => Err(io::Error::new( - io::ErrorKind::AddrNotAvailable, - format!( - "unable to connect to {}", - path.to_str().unwrap_or("[unknown]") - ), - )), + Self { + path: Some(path.into()), + listener: None, + server: None, + info: None, } } - async fn connect(&mut self) -> io::Result<()> { - if let Self::FromPath(path, listener @ None) = self { - *listener = Some(net::UnixListener::bind(path).await?); + pub fn from_listener(unix_listener: impl Into) -> Self { + Self { + path: None, + listener: Some(unix_listener.into()), + server: None, + info: None, } - Ok(()) } } -fn unix_socket_addr_to_string(result: io::Result) -> Option { - result.ok().and_then(|addr| { - if let Some(pathname) = addr.as_pathname().and_then(|p| p.canonicalize().ok()) { - Some(format!("http+unix://{}", pathname.display())) - } else { - None - } - }) -} - fn handle_unix(app: Server, stream: UnixStream) { task::spawn(async move { let local_addr = unix_socket_addr_to_string(stream.local_addr()); @@ -82,11 +63,39 @@ fn handle_unix(app: Server, stream: } #[async_trait::async_trait] -impl Listener for UnixListener { - async fn listen(&mut self, app: Server) -> io::Result<()> { - self.connect().await?; - crate::log::info!("Server listening on {}", self); - let listener = self.listener()?; +impl Listener for UnixListener +where + State: Clone + Send + Sync + 'static, +{ + async fn bind(&mut self, server: Server) -> io::Result<()> { + assert!(self.server.is_none(), "`bind` should only be called once"); + self.server = Some(server); + + if let None = self.listener { + let path = self.path.take().expect("`bind` should only be called once"); + let listener = net::UnixListener::bind(path).await?; + self.listener = Some(listener); + } + + // Format the listen information. + let conn_string = format!("{}", self); + let transport = "uds".to_owned(); + let tls = false; + self.info = Some(ListenInfo::new(conn_string, transport, tls)); + + Ok(()) + } + + async fn accept(&mut self) -> io::Result<()> { + let server = self + .server + .take() + .expect("`Listener::bind` must be called before `Listener::accept`"); + let listener = self + .listener + .take() + .expect("`Listener::bind` must be called before `Listener::accept`"); + let mut incoming = listener.incoming(); while let Some(stream) = incoming.next().await { @@ -100,28 +109,60 @@ impl Listener for UnixListener { } Ok(stream) => { - handle_unix(app.clone(), stream); + handle_unix(server.clone(), stream); } }; } - Ok(()) } + + fn info(&self) -> Vec { + match &self.info { + Some(info) => vec![info.clone()], + None => vec![], + } + } +} + +impl fmt::Debug for UnixListener { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("UnixListener") + .field(&"listener", &self.listener) + .field(&"path", &self.path) + .field( + &"server", + if self.server.is_some() { + &"Some(Server)" + } else { + &"None" + }, + ) + .finish() + } } -impl Display for UnixListener { +impl Display for UnixListener { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - match self { - Self::FromListener(l) | Self::FromPath(_, Some(l)) => write!( - f, - "{}", - unix_socket_addr_to_string(l.local_addr()) - .as_deref() - .unwrap_or("http+unix://[unknown]") - ), - Self::FromPath(path, None) => { - write!(f, "http+unix://{}", path.to_str().unwrap_or("[unknown]")) + match &self.listener { + Some(listener) => { + let path = listener.local_addr().expect("Could not get local path dir"); + let pathname = path.as_pathname().and_then(|p| p.canonicalize().ok()).expect("Could not canonicalize path dir"); + write!(f, "http+unix://{}", pathname.display()) } + None => match &self.path { + Some(path) => write!(f, "http+unix://{}", path.display()), + None => write!(f, "Not listening. Did you forget to call `Listener::bind`?"), + }, } } } + +fn unix_socket_addr_to_string(result: io::Result) -> Option { + result.ok().and_then(|addr| { + if let Some(pathname) = addr.as_pathname().and_then(|p| p.canonicalize().ok()) { + Some(format!("http+unix://{}", pathname.display())) + } else { + None + } + }) +} From 5fa176527331c7ecc4a867eff87bd73ef6bfe520 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Wed, 11 Nov 2020 21:44:26 +0100 Subject: [PATCH 08/10] Update log.rs --- tests/log.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/log.rs b/tests/log.rs index 263221bbe..2ef621a89 100644 --- a/tests/log.rs +++ b/tests/log.rs @@ -16,7 +16,7 @@ async fn test_server_listen(logger: &mut logtest::Logger) { let port = test_utils::find_port().await; let app = tide::new(); let res = app - .listen(("localhost", port)) + .listen(("127.0.0.1", port)) .timeout(Duration::from_millis(60)) .await; assert!(res.is_err()); @@ -26,7 +26,7 @@ async fn test_server_listen(logger: &mut logtest::Logger) { .unwrap(); assert_eq!( record.args(), - format!("Server listening on http://[::1]:{}", port) + format!("Server listening on http://127.0.0.1:{}", port) ); } From de9ce529c1d5872227c269fdc2682a4dabbe4725 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Wed, 11 Nov 2020 22:07:15 +0100 Subject: [PATCH 09/10] cargo fmt --- src/listener/unix_listener.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/listener/unix_listener.rs b/src/listener/unix_listener.rs index 48564f44c..8b18319a8 100644 --- a/src/listener/unix_listener.rs +++ b/src/listener/unix_listener.rs @@ -6,9 +6,9 @@ use crate::{log, Server}; use std::fmt::{self, Display, Formatter}; use async_std::os::unix::net::{self, SocketAddr, UnixStream}; +use async_std::path::PathBuf; use async_std::prelude::*; use async_std::{io, task}; -use async_std::path::PathBuf; /// This represents a tide [Listener](crate::listener::Listener) that /// wraps an [async_std::os::unix::net::UnixListener]. It is implemented as an @@ -146,11 +146,14 @@ impl Display for UnixListener { match &self.listener { Some(listener) => { let path = listener.local_addr().expect("Could not get local path dir"); - let pathname = path.as_pathname().and_then(|p| p.canonicalize().ok()).expect("Could not canonicalize path dir"); + let pathname = path + .as_pathname() + .and_then(|p| p.canonicalize().ok()) + .expect("Could not canonicalize path dir"); write!(f, "http+unix://{}", pathname.display()) } None => match &self.path { - Some(path) => write!(f, "http+unix://{}", path.display()), + Some(path) => write!(f, "http+unix://{}", path.display()), None => write!(f, "Not listening. Did you forget to call `Listener::bind`?"), }, } From 6daa700ebfe652c750f43dcfd077fa958b757962 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Wed, 11 Nov 2020 22:13:47 +0100 Subject: [PATCH 10/10] cargo clippy --- src/listener/tcp_listener.rs | 2 +- src/listener/unix_listener.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/listener/tcp_listener.rs b/src/listener/tcp_listener.rs index 8e755b208..9ca3585f8 100644 --- a/src/listener/tcp_listener.rs +++ b/src/listener/tcp_listener.rs @@ -70,7 +70,7 @@ where assert!(self.server.is_none(), "`bind` should only be called once"); self.server = Some(server); - if let None = self.listener { + if self.listener.is_none() { let addrs = self .addrs .take() diff --git a/src/listener/unix_listener.rs b/src/listener/unix_listener.rs index 8b18319a8..6b9c95221 100644 --- a/src/listener/unix_listener.rs +++ b/src/listener/unix_listener.rs @@ -71,7 +71,7 @@ where assert!(self.server.is_none(), "`bind` should only be called once"); self.server = Some(server); - if let None = self.listener { + if self.listener.is_none() { let path = self.path.take().expect("`bind` should only be called once"); let listener = net::UnixListener::bind(path).await?; self.listener = Some(listener);