diff --git a/src/listener/concurrent_listener.rs b/src/listener/concurrent_listener.rs index 1cff9b0d7..486b21e68 100644 --- a/src/listener/concurrent_listener.rs +++ b/src/listener/concurrent_listener.rs @@ -1,4 +1,4 @@ -use crate::listener::{Listener, ToListener}; +use crate::listener::{ListenInfo, Listener, ToListener}; use crate::Server; use std::fmt::{self, Debug, Display, Formatter}; @@ -33,12 +33,14 @@ use futures_util::stream::{futures_unordered::FuturesUnordered, StreamExt}; ///``` #[derive(Default)] -pub struct ConcurrentListener(Vec>>); +pub struct ConcurrentListener { + listeners: Vec>>, +} impl ConcurrentListener { /// creates a new ConcurrentListener pub fn new() -> Self { - Self(vec![]) + Self { listeners: vec![] } } /// Adds any [`ToListener`](crate::listener::ToListener) to this @@ -55,8 +57,11 @@ impl ConcurrentListener { /// # std::mem::drop(tide::new().listen(listener)); // for the State generic /// # Ok(()) } /// ``` - pub fn add>(&mut self, listener: TL) -> io::Result<()> { - self.0.push(Box::new(listener.to_listener()?)); + pub fn add(&mut self, listener: L) -> io::Result<()> + where + L: ToListener, + { + self.listeners.push(Box::new(listener.to_listener()?)); Ok(()) } @@ -71,20 +76,32 @@ impl ConcurrentListener { /// .with_listener(async_std::net::TcpListener::bind("127.0.0.1:8081").await?), /// ).await?; /// # Ok(()) }) } - pub fn with_listener>(mut self, listener: TL) -> Self { + pub fn with_listener(mut self, listener: L) -> Self + where + L: ToListener, + { self.add(listener).expect("Unable to add listener"); self } } #[async_trait::async_trait] -impl Listener for ConcurrentListener { - async fn listen(&mut self, app: Server) -> io::Result<()> { +impl Listener for ConcurrentListener +where + State: Clone + Send + Sync + 'static, +{ + async fn bind(&mut self, app: Server) -> io::Result<()> { + for listener in self.listeners.iter_mut() { + listener.bind(app.clone()).await?; + } + Ok(()) + } + + async fn accept(&mut self) -> io::Result<()> { let mut futures_unordered = FuturesUnordered::new(); - for listener in self.0.iter_mut() { - let app = app.clone(); - futures_unordered.push(listener.listen(app)); + for listener in self.listeners.iter_mut() { + futures_unordered.push(listener.accept()); } while let Some(result) = futures_unordered.next().await { @@ -92,18 +109,26 @@ impl Listener for ConcurrentListene } Ok(()) } + + fn info(&self) -> Vec { + self.listeners + .iter() + .map(|listener| listener.info().into_iter()) + .flatten() + .collect() + } } impl Debug for ConcurrentListener { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - write!(f, "{:?}", self.0) + write!(f, "{:?}", self.listeners) } } impl Display for ConcurrentListener { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { let string = self - .0 + .listeners .iter() .map(|l| l.to_string()) .collect::>() diff --git a/src/listener/failover_listener.rs b/src/listener/failover_listener.rs index 4ab1bd242..d81658c77 100644 --- a/src/listener/failover_listener.rs +++ b/src/listener/failover_listener.rs @@ -5,6 +5,8 @@ use std::fmt::{self, Debug, Display, Formatter}; use async_std::io; +use crate::listener::ListenInfo; + /// FailoverListener allows tide to attempt to listen in a sequential /// order to any number of ports/addresses. The first successful /// listener is used. @@ -31,14 +33,22 @@ use async_std::io; /// }) ///} ///``` - #[derive(Default)] -pub struct FailoverListener(Vec>>); +pub struct FailoverListener { + listeners: Vec>>>, + index: Option, +} -impl FailoverListener { +impl FailoverListener +where + State: Clone + Send + Sync + 'static, +{ /// creates a new FailoverListener pub fn new() -> Self { - Self(vec![]) + Self { + listeners: vec![], + index: None, + } } /// Adds any [`ToListener`](crate::listener::ToListener) to this @@ -57,8 +67,11 @@ impl FailoverListener { /// # std::mem::drop(tide::new().listen(listener)); // for the State generic /// # Ok(()) } /// ``` - pub fn add>(&mut self, listener: TL) -> io::Result<()> { - self.0.push(Box::new(listener.to_listener()?)); + pub fn add(&mut self, listener: L) -> io::Result<()> + where + L: ToListener, + { + self.listeners.push(Some(Box::new(listener.to_listener()?))); Ok(()) } @@ -73,21 +86,30 @@ impl FailoverListener { /// .with_listener(("localhost", 8081)), /// ).await?; /// # Ok(()) }) } - pub fn with_listener>(mut self, listener: TL) -> Self { + pub fn with_listener(mut self, listener: L) -> Self + where + L: ToListener, + { self.add(listener).expect("Unable to add listener"); self } } #[async_trait::async_trait] -impl Listener for FailoverListener { - async fn listen(&mut self, app: Server) -> io::Result<()> { - for listener in self.0.iter_mut() { - let app = app.clone(); - match listener.listen(app).await { - Ok(_) => return Ok(()), +impl Listener for FailoverListener +where + State: Clone + Send + Sync + 'static, +{ + async fn bind(&mut self, app: Server) -> io::Result<()> { + for (index, listener) in self.listeners.iter_mut().enumerate() { + let listener = listener.as_deref_mut().expect("bind called twice"); + match listener.bind(app.clone()).await { + Ok(_) => { + self.index = Some(index); + return Ok(()); + } Err(e) => { - crate::log::info!("unable to listen", { + crate::log::info!("unable to bind", { listener: listener.to_string(), error: e.to_string() }); @@ -100,20 +122,47 @@ impl Listener for FailoverListener< "unable to bind to any supplied listener spec", )) } + + async fn accept(&mut self) -> io::Result<()> { + match self.index { + Some(index) => { + let mut listener = self.listeners[index].take().expect("accept called twice"); + listener.accept().await?; + Ok(()) + } + None => Err(io::Error::new( + io::ErrorKind::AddrNotAvailable, + "unable to listen to any supplied listener spec", + )), + } + } + + fn info(&self) -> Vec { + match self.index { + Some(index) => match self.listeners.get(index) { + Some(Some(listener)) => listener.info(), + _ => vec![], + }, + None => vec![], + } + } } impl Debug for FailoverListener { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - write!(f, "{:?}", self.0) + write!(f, "{:?}", self.listeners) } } impl Display for FailoverListener { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { let string = self - .0 + .listeners .iter() - .map(|l| l.to_string()) + .map(|l| match l { + Some(l) => l.to_string(), + None => String::new(), + }) .collect::>() .join(", "); diff --git a/src/listener/mod.rs b/src/listener/mod.rs index 82033de15..2d469872d 100644 --- a/src/listener/mod.rs +++ b/src/listener/mod.rs @@ -12,8 +12,12 @@ mod to_listener_impls; #[cfg(all(unix, feature = "h1-server"))] mod unix_listener; -use crate::Server; +use std::fmt::{Debug, Display}; + use async_std::io; +use async_trait::async_trait; + +use crate::Server; pub use concurrent_listener::ConcurrentListener; pub use failover_listener::FailoverListener; @@ -26,18 +30,46 @@ pub(crate) use tcp_listener::TcpListener; #[cfg(all(unix, feature = "h1-server"))] pub(crate) use unix_listener::UnixListener; -/// The Listener trait represents an implementation of http transport -/// for a tide application. In order to provide a Listener to tide, -/// you will also need to implement at least one [`ToListener`](crate::listener::ToListener) that +/// The Listener trait represents an implementation of http transport for a tide +/// application. In order to provide a Listener to tide, you will also need to +/// implement at least one [`ToListener`](crate::listener::ToListener) that /// outputs your Listener type. -#[async_trait::async_trait] -pub trait Listener: - std::fmt::Debug + std::fmt::Display + Send + Sync + 'static +#[async_trait] +pub trait Listener: Debug + Display + Send + Sync + 'static +where + State: Send + Sync + 'static, +{ + /// Bind the listener. This starts the listening process by opening the + /// necessary network ports, but not yet accepting incoming connections. This + /// method must be called before `accept`. + async fn bind(&mut self, app: Server) -> io::Result<()>; + + /// Start accepting incoming connections. This method must be called only + /// after `bind` has succeeded. + async fn accept(&mut self) -> io::Result<()>; + + /// Expose information about the connection. This should always return valid + /// data after `bind` has succeeded. + fn info(&self) -> Vec; +} + +#[async_trait] +impl Listener for Box +where + L: Listener, + State: Send + Sync + 'static, { - /// This is the primary entrypoint for the Listener trait. listen - /// is called exactly once, and is expected to spawn tasks for - /// each incoming connection. - async fn listen(&mut self, app: Server) -> io::Result<()>; + async fn bind(&mut self, app: Server) -> io::Result<()> { + self.as_mut().bind(app).await + } + + async fn accept(&mut self) -> io::Result<()> { + self.as_mut().accept().await + } + + fn info(&self) -> Vec { + self.as_ref().info() + } } /// crate-internal shared logic used by tcp and unix listeners to @@ -52,3 +84,50 @@ pub(crate) fn is_transient_error(e: &io::Error) -> bool { ConnectionRefused | ConnectionAborted | ConnectionReset ) } + +/// Information about the `Listener`. +/// +/// See [`Report`](../listener/trait.Report.html) for more. +#[derive(Debug, Clone)] +pub struct ListenInfo { + conn_string: String, + transport: String, + tls: bool, +} + +impl ListenInfo { + /// Create a new instance of `ListenInfo`. + /// + /// This method should only be called when implementing a new Tide `listener` + /// strategy. + pub fn new(conn_string: String, transport: String, tls: bool) -> Self { + Self { + conn_string, + transport, + tls, + } + } + + /// Get the connection string. + pub fn connection(&self) -> &str { + self.conn_string.as_str() + } + + /// The underlying transport this connection listens on. + /// + /// Examples are: "tcp", "uds", etc. + pub fn transport(&self) -> &str { + self.transport.as_str() + } + + /// Is the connection encrypted? + pub fn is_encrypted(&self) -> bool { + self.tls + } +} + +impl Display for ListenInfo { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.conn_string) + } +} diff --git a/src/listener/parsed_listener.rs b/src/listener/parsed_listener.rs index 4b0e186a9..ad2926a10 100644 --- a/src/listener/parsed_listener.rs +++ b/src/listener/parsed_listener.rs @@ -1,10 +1,10 @@ #[cfg(unix)] use super::UnixListener; -use super::{Listener, TcpListener}; +use super::{ListenInfo, Listener, TcpListener}; use crate::Server; use async_std::io; -use std::fmt::{self, Display, Formatter}; +use std::fmt::{self, Debug, Display, Formatter}; /// This is an enum that contains variants for each of the listeners /// that can be parsed from a string. This is used as the associated @@ -13,14 +13,23 @@ use std::fmt::{self, Display, Formatter}; /// /// This is currently crate-visible only, and tide users are expected /// to create these through [ToListener](crate::ToListener) conversions. -#[derive(Debug)] -pub enum ParsedListener { +pub enum ParsedListener { #[cfg(unix)] - Unix(UnixListener), - Tcp(TcpListener), + Unix(UnixListener), + Tcp(TcpListener), } -impl Display for ParsedListener { +impl Debug for ParsedListener { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + match self { + #[cfg(unix)] + ParsedListener::Unix(unix) => Debug::fmt(unix, f), + ParsedListener::Tcp(tcp) => Debug::fmt(tcp, f), + } + } +} + +impl Display for ParsedListener { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { match self { #[cfg(unix)] @@ -31,12 +40,31 @@ impl Display for ParsedListener { } #[async_trait::async_trait] -impl Listener for ParsedListener { - async fn listen(&mut self, app: Server) -> io::Result<()> { +impl Listener for ParsedListener +where + State: Clone + Send + Sync + 'static, +{ + async fn bind(&mut self, server: Server) -> io::Result<()> { + match self { + #[cfg(unix)] + Self::Unix(u) => u.bind(server).await, + Self::Tcp(t) => t.bind(server).await, + } + } + + async fn accept(&mut self) -> io::Result<()> { + match self { + #[cfg(unix)] + Self::Unix(u) => u.accept().await, + Self::Tcp(t) => t.accept().await, + } + } + + fn info(&self) -> Vec { match self { #[cfg(unix)] - Self::Unix(u) => u.listen(app).await, - Self::Tcp(t) => t.listen(app).await, + ParsedListener::Unix(unix) => unix.info(), + ParsedListener::Tcp(tcp) => tcp.info(), } } } diff --git a/src/listener/tcp_listener.rs b/src/listener/tcp_listener.rs index db68530ee..9ca3585f8 100644 --- a/src/listener/tcp_listener.rs +++ b/src/listener/tcp_listener.rs @@ -1,4 +1,4 @@ -use super::is_transient_error; +use super::{is_transient_error, ListenInfo}; use crate::listener::Listener; use crate::{log, Server}; @@ -17,38 +17,31 @@ use async_std::{io, task}; /// /// This is currently crate-visible only, and tide users are expected /// to create these through [ToListener](crate::ToListener) conversions. -#[derive(Debug)] -pub enum TcpListener { - FromListener(net::TcpListener), - FromAddrs(Vec, Option), +pub struct TcpListener { + addrs: Option>, + listener: Option, + server: Option>, + info: Option, } -impl TcpListener { +impl TcpListener { pub fn from_addrs(addrs: Vec) -> Self { - Self::FromAddrs(addrs, None) + Self { + addrs: Some(addrs), + listener: None, + server: None, + info: None, + } } pub fn from_listener(tcp_listener: impl Into) -> Self { - Self::FromListener(tcp_listener.into()) - } - - fn listener(&self) -> io::Result<&net::TcpListener> { - match self { - Self::FromAddrs(_, Some(listener)) => Ok(listener), - Self::FromListener(listener) => Ok(listener), - Self::FromAddrs(addrs, None) => Err(io::Error::new( - io::ErrorKind::AddrNotAvailable, - format!("unable to connect to {:?}", addrs), - )), + Self { + addrs: None, + listener: Some(tcp_listener.into()), + server: None, + info: None, } } - - async fn connect(&mut self) -> io::Result<()> { - if let Self::FromAddrs(addrs, listener @ None) = self { - *listener = Some(net::TcpListener::bind(addrs.as_slice()).await?); - } - Ok(()) - } } fn handle_tcp(app: Server, stream: TcpStream) { @@ -69,11 +62,41 @@ fn handle_tcp(app: Server, stream: } #[async_trait::async_trait] -impl Listener for TcpListener { - async fn listen(&mut self, app: Server) -> io::Result<()> { - self.connect().await?; - let listener = self.listener()?; - crate::log::info!("Server listening on {}", self); +impl Listener for TcpListener +where + State: Clone + Send + Sync + 'static, +{ + async fn bind(&mut self, server: Server) -> io::Result<()> { + assert!(self.server.is_none(), "`bind` should only be called once"); + self.server = Some(server); + + if self.listener.is_none() { + let addrs = self + .addrs + .take() + .expect("`bind` should only be called once"); + let listener = net::TcpListener::bind(addrs.as_slice()).await?; + self.listener = Some(listener); + } + + // Format the listen information. + let conn_string = format!("{}", self); + let transport = "tcp".to_owned(); + let tls = false; + self.info = Some(ListenInfo::new(conn_string, transport, tls)); + + Ok(()) + } + + async fn accept(&mut self) -> io::Result<()> { + let server = self + .server + .take() + .expect("`Listener::bind` must be called before `Listener::accept`"); + let listener = self + .listener + .take() + .expect("`Listener::bind` must be called before `Listener::accept`"); let mut incoming = listener.incoming(); @@ -88,35 +111,53 @@ impl Listener for TcpListener { } Ok(stream) => { - handle_tcp(app.clone(), stream); + handle_tcp(server.clone(), stream); } }; } Ok(()) } + + fn info(&self) -> Vec { + match &self.info { + Some(info) => vec![info.clone()], + None => vec![], + } + } } -impl Display for TcpListener { +impl fmt::Debug for TcpListener { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - match self { - Self::FromListener(l) | Self::FromAddrs(_, Some(l)) => write!( - f, - "http://{}", - l.local_addr() - .ok() - .map(|a| a.to_string()) - .as_deref() - .unwrap_or("[unknown]") - ), - Self::FromAddrs(addrs, None) => write!( - f, - "{}", - addrs - .iter() - .map(|a| format!("http://{}", a)) - .collect::>() - .join(", ") - ), + f.debug_struct("TcpListener") + .field(&"listener", &self.listener) + .field(&"addrs", &self.addrs) + .field( + &"server", + if self.server.is_some() { + &"Some(Server)" + } else { + &"None" + }, + ) + .finish() + } +} + +impl Display for TcpListener { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + let http_fmt = |a| format!("http://{}", a); + match &self.listener { + Some(listener) => { + let addr = listener.local_addr().expect("Could not get local addr"); + write!(f, "{}", http_fmt(&addr)) + } + None => match &self.addrs { + Some(addrs) => { + let addrs = addrs.iter().map(http_fmt).collect::>().join(", "); + write!(f, "{}", addrs) + } + None => write!(f, "Not listening. Did you forget to call `Listener::bind`?"), + }, } } } diff --git a/src/listener/to_listener_impls.rs b/src/listener/to_listener_impls.rs index a1b406d1f..579b5d9ba 100644 --- a/src/listener/to_listener_impls.rs +++ b/src/listener/to_listener_impls.rs @@ -5,8 +5,11 @@ use crate::http::url::Url; use async_std::io; use std::net::ToSocketAddrs; -impl ToListener for Url { - type Listener = ParsedListener; +impl ToListener for Url +where + State: Clone + Send + Sync + 'static, +{ + type Listener = ParsedListener; fn to_listener(self) -> io::Result { match self.scheme() { @@ -48,15 +51,21 @@ impl ToListener for Url { } } -impl ToListener for String { - type Listener = ParsedListener; +impl ToListener for String +where + State: Clone + Send + Sync + 'static, +{ + type Listener = ParsedListener; fn to_listener(self) -> io::Result { ToListener::::to_listener(self.as_str()) } } -impl ToListener for &str { - type Listener = ParsedListener; +impl ToListener for &str +where + State: Clone + Send + Sync + 'static, +{ + type Listener = ParsedListener; fn to_listener(self) -> io::Result { if let Ok(socket_addrs) = self.to_socket_addrs() { @@ -75,37 +84,52 @@ impl ToListener for &str { } #[cfg(unix)] -impl ToListener for async_std::path::PathBuf { - type Listener = UnixListener; +impl ToListener for async_std::path::PathBuf +where + State: Clone + Send + Sync + 'static, +{ + type Listener = UnixListener; fn to_listener(self) -> io::Result { Ok(UnixListener::from_path(self)) } } #[cfg(unix)] -impl ToListener for std::path::PathBuf { - type Listener = UnixListener; +impl ToListener for std::path::PathBuf +where + State: Clone + Send + Sync + 'static, +{ + type Listener = UnixListener; fn to_listener(self) -> io::Result { Ok(UnixListener::from_path(self)) } } -impl ToListener for async_std::net::TcpListener { - type Listener = TcpListener; +impl ToListener for async_std::net::TcpListener +where + State: Clone + Send + Sync + 'static, +{ + type Listener = TcpListener; fn to_listener(self) -> io::Result { Ok(TcpListener::from_listener(self)) } } -impl ToListener for std::net::TcpListener { - type Listener = TcpListener; +impl ToListener for std::net::TcpListener +where + State: Clone + Send + Sync + 'static, +{ + type Listener = TcpListener; fn to_listener(self) -> io::Result { Ok(TcpListener::from_listener(self)) } } -impl ToListener for (&str, u16) { - type Listener = TcpListener; +impl ToListener for (&str, u16) +where + State: Clone + Send + Sync + 'static, +{ + type Listener = TcpListener; fn to_listener(self) -> io::Result { Ok(TcpListener::from_addrs(self.to_socket_addrs()?.collect())) @@ -113,24 +137,31 @@ impl ToListener for (&str, u16) { } #[cfg(unix)] -impl ToListener - for async_std::os::unix::net::UnixListener +impl ToListener for async_std::os::unix::net::UnixListener +where + State: Clone + Send + Sync + 'static, { - type Listener = UnixListener; + type Listener = UnixListener; fn to_listener(self) -> io::Result { Ok(UnixListener::from_listener(self)) } } #[cfg(unix)] -impl ToListener for std::os::unix::net::UnixListener { - type Listener = UnixListener; +impl ToListener for std::os::unix::net::UnixListener +where + State: Clone + Send + Sync + 'static, +{ + type Listener = UnixListener; fn to_listener(self) -> io::Result { Ok(UnixListener::from_listener(self)) } } -impl ToListener for TcpListener { +impl ToListener for TcpListener +where + State: Clone + Send + Sync + 'static, +{ type Listener = Self; fn to_listener(self) -> io::Result { Ok(self) @@ -138,42 +169,61 @@ impl ToListener for TcpListener { } #[cfg(unix)] -impl ToListener for UnixListener { +impl ToListener for UnixListener +where + State: Clone + Send + Sync + 'static, +{ type Listener = Self; fn to_listener(self) -> io::Result { Ok(self) } } -impl ToListener for ConcurrentListener { +impl ToListener for ConcurrentListener +where + State: Clone + Send + Sync + 'static, +{ type Listener = Self; fn to_listener(self) -> io::Result { Ok(self) } } -impl ToListener for ParsedListener { +impl ToListener for ParsedListener +where + State: Clone + Send + Sync + 'static, +{ type Listener = Self; fn to_listener(self) -> io::Result { Ok(self) } } -impl ToListener for FailoverListener { +impl ToListener for FailoverListener +where + State: Clone + Send + Sync + 'static, +{ type Listener = Self; fn to_listener(self) -> io::Result { Ok(self) } } -impl ToListener for std::net::SocketAddr { - type Listener = TcpListener; +impl ToListener for std::net::SocketAddr +where + State: Clone + Send + Sync + 'static, +{ + type Listener = TcpListener; fn to_listener(self) -> io::Result { Ok(TcpListener::from_addrs(vec![self])) } } -impl, State: Clone + Send + Sync + 'static> ToListener for Vec { +impl ToListener for Vec +where + L: ToListener, + State: Clone + Send + Sync + 'static, +{ type Listener = ConcurrentListener; fn to_listener(self) -> io::Result { let mut concurrent_listener = ConcurrentListener::new(); @@ -188,55 +238,31 @@ impl, State: Clone + Send + Sync + 'static> ToListener>(listener: TL) -> io::Result { + fn listen>(listener: L) -> io::Result { listener.to_listener() } #[test] fn url_to_tcp_listener() { let listener = listen(Url::parse("http://localhost:8000").unwrap()).unwrap(); - assert!(matches!( - listener, - ParsedListener::Tcp(TcpListener::FromAddrs(_, None)) - )); - assert!(listener.to_string().contains("http://127.0.0.1:8000")); + assert!(listener.to_string().contains("http://127.0.0.1:8000")); let listener = listen(Url::parse("tcp://localhost:8000").unwrap()).unwrap(); - assert!(matches!( - listener, - ParsedListener::Tcp(TcpListener::FromAddrs(_, None)) - )); assert!(listener.to_string().contains("http://127.0.0.1:8000")); let listener = listen(Url::parse("http://127.0.0.1").unwrap()).unwrap(); - assert!(matches!( - listener, - ParsedListener::Tcp(TcpListener::FromAddrs(_, None)) - )); assert_eq!(listener.to_string(), "http://127.0.0.1:80"); } #[test] fn str_url_to_tcp_listener() { let listener = listen("tcp://localhost:8000").unwrap(); - assert!(matches!( - listener, - ParsedListener::Tcp(TcpListener::FromAddrs(_, None)) - )); assert!(listener.to_string().contains("http://127.0.0.1:8000")); let listener = listen("tcp://localhost:8000").unwrap(); - assert!(matches!( - listener, - ParsedListener::Tcp(TcpListener::FromAddrs(_, None)) - )); assert!(listener.to_string().contains("http://127.0.0.1:8000")); let listener = listen("tcp://127.0.0.1").unwrap(); - assert!(matches!( - listener, - ParsedListener::Tcp(TcpListener::FromAddrs(_, None)) - )); assert_eq!(listener.to_string(), "http://127.0.0.1:80"); } @@ -247,24 +273,12 @@ mod parse_tests { #[test] fn str_url_to_unix_listener() { let listener = listen("http+unix:///var/run/tide/socket").unwrap(); - assert!(matches!( - listener, - ParsedListener::Unix(UnixListener::FromPath(_, None)) - )); assert_eq!("http+unix:///var/run/tide/socket", listener.to_string()); let listener = listen("http+unix://./socket").unwrap(); - assert!(matches!( - listener, - ParsedListener::Unix(UnixListener::FromPath(_, None)) - )); assert_eq!("http+unix://./socket", listener.to_string()); let listener = listen("http+unix://socket").unwrap(); - assert!(matches!( - listener, - ParsedListener::Unix(UnixListener::FromPath(_, None)) - )); assert_eq!("http+unix://socket", listener.to_string()); } @@ -316,24 +330,12 @@ mod parse_tests { #[test] fn str_to_socket_addr() { let listener = listen("127.0.0.1:1312").unwrap(); - assert!(matches!( - listener, - ParsedListener::Tcp(TcpListener::FromAddrs(_, None)) - )); assert_eq!("http://127.0.0.1:1312", listener.to_string()); let listener = listen("[::1]:1312").unwrap(); - assert!(matches!( - listener, - ParsedListener::Tcp(TcpListener::FromAddrs(_, None)) - )); assert_eq!("http://[::1]:1312", listener.to_string()); let listener = listen("localhost:3000").unwrap(); - assert!(matches!( - listener, - ParsedListener::Tcp(TcpListener::FromAddrs(_, None)) - )); assert!(listener.to_string().contains(":3000")); } diff --git a/src/listener/unix_listener.rs b/src/listener/unix_listener.rs index 72aff852d..6b9c95221 100644 --- a/src/listener/unix_listener.rs +++ b/src/listener/unix_listener.rs @@ -1,4 +1,4 @@ -use super::is_transient_error; +use super::{is_transient_error, ListenInfo}; use crate::listener::Listener; use crate::{log, Server}; @@ -6,64 +6,45 @@ use crate::{log, Server}; use std::fmt::{self, Display, Formatter}; use async_std::os::unix::net::{self, SocketAddr, UnixStream}; +use async_std::path::PathBuf; use async_std::prelude::*; -use async_std::{io, path::PathBuf, task}; +use async_std::{io, task}; /// This represents a tide [Listener](crate::listener::Listener) that /// wraps an [async_std::os::unix::net::UnixListener]. It is implemented as an /// enum in order to allow creation of a tide::listener::UnixListener -/// from a [PathBuf] that has not yet been opened/bound OR from a bound +/// from a [`PathBuf`] spec that has not yet been bound OR from a bound /// [async_std::os::unix::net::UnixListener]. /// /// This is currently crate-visible only, and tide users are expected /// to create these through [ToListener](crate::ToListener) conversions. -#[derive(Debug)] -pub enum UnixListener { - FromPath(PathBuf, Option), - FromListener(net::UnixListener), +pub struct UnixListener { + path: Option, + listener: Option, + server: Option>, + info: Option, } -impl UnixListener { +impl UnixListener { pub fn from_path(path: impl Into) -> Self { - Self::FromPath(path.into(), None) - } - - pub fn from_listener(listener: impl Into) -> Self { - Self::FromListener(listener.into()) - } - - fn listener(&self) -> io::Result<&net::UnixListener> { - match self { - Self::FromPath(_, Some(listener)) => Ok(listener), - Self::FromListener(listener) => Ok(listener), - Self::FromPath(path, None) => Err(io::Error::new( - io::ErrorKind::AddrNotAvailable, - format!( - "unable to connect to {}", - path.to_str().unwrap_or("[unknown]") - ), - )), + Self { + path: Some(path.into()), + listener: None, + server: None, + info: None, } } - async fn connect(&mut self) -> io::Result<()> { - if let Self::FromPath(path, listener @ None) = self { - *listener = Some(net::UnixListener::bind(path).await?); + pub fn from_listener(unix_listener: impl Into) -> Self { + Self { + path: None, + listener: Some(unix_listener.into()), + server: None, + info: None, } - Ok(()) } } -fn unix_socket_addr_to_string(result: io::Result) -> Option { - result.ok().and_then(|addr| { - if let Some(pathname) = addr.as_pathname().and_then(|p| p.canonicalize().ok()) { - Some(format!("http+unix://{}", pathname.display())) - } else { - None - } - }) -} - fn handle_unix(app: Server, stream: UnixStream) { task::spawn(async move { let local_addr = unix_socket_addr_to_string(stream.local_addr()); @@ -82,11 +63,39 @@ fn handle_unix(app: Server, stream: } #[async_trait::async_trait] -impl Listener for UnixListener { - async fn listen(&mut self, app: Server) -> io::Result<()> { - self.connect().await?; - crate::log::info!("Server listening on {}", self); - let listener = self.listener()?; +impl Listener for UnixListener +where + State: Clone + Send + Sync + 'static, +{ + async fn bind(&mut self, server: Server) -> io::Result<()> { + assert!(self.server.is_none(), "`bind` should only be called once"); + self.server = Some(server); + + if self.listener.is_none() { + let path = self.path.take().expect("`bind` should only be called once"); + let listener = net::UnixListener::bind(path).await?; + self.listener = Some(listener); + } + + // Format the listen information. + let conn_string = format!("{}", self); + let transport = "uds".to_owned(); + let tls = false; + self.info = Some(ListenInfo::new(conn_string, transport, tls)); + + Ok(()) + } + + async fn accept(&mut self) -> io::Result<()> { + let server = self + .server + .take() + .expect("`Listener::bind` must be called before `Listener::accept`"); + let listener = self + .listener + .take() + .expect("`Listener::bind` must be called before `Listener::accept`"); + let mut incoming = listener.incoming(); while let Some(stream) = incoming.next().await { @@ -100,28 +109,63 @@ impl Listener for UnixListener { } Ok(stream) => { - handle_unix(app.clone(), stream); + handle_unix(server.clone(), stream); } }; } - Ok(()) } + + fn info(&self) -> Vec { + match &self.info { + Some(info) => vec![info.clone()], + None => vec![], + } + } +} + +impl fmt::Debug for UnixListener { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("UnixListener") + .field(&"listener", &self.listener) + .field(&"path", &self.path) + .field( + &"server", + if self.server.is_some() { + &"Some(Server)" + } else { + &"None" + }, + ) + .finish() + } } -impl Display for UnixListener { +impl Display for UnixListener { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - match self { - Self::FromListener(l) | Self::FromPath(_, Some(l)) => write!( - f, - "{}", - unix_socket_addr_to_string(l.local_addr()) - .as_deref() - .unwrap_or("http+unix://[unknown]") - ), - Self::FromPath(path, None) => { - write!(f, "http+unix://{}", path.to_str().unwrap_or("[unknown]")) + match &self.listener { + Some(listener) => { + let path = listener.local_addr().expect("Could not get local path dir"); + let pathname = path + .as_pathname() + .and_then(|p| p.canonicalize().ok()) + .expect("Could not canonicalize path dir"); + write!(f, "http+unix://{}", pathname.display()) } + None => match &self.path { + Some(path) => write!(f, "http+unix://{}", path.display()), + None => write!(f, "Not listening. Did you forget to call `Listener::bind`?"), + }, } } } + +fn unix_socket_addr_to_string(result: io::Result) -> Option { + result.ok().and_then(|addr| { + if let Some(pathname) = addr.as_pathname().and_then(|p| p.canonicalize().ok()) { + Some(format!("http+unix://{}", pathname.display())) + } else { + None + } + }) +} diff --git a/src/prelude.rs b/src/prelude.rs index 739319081..185b90adf 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -1,3 +1,4 @@ //! The Tide prelude. pub use crate::convert::{json, Deserialize, Serialize}; +pub use crate::listener::Listener; pub use http_types::Status; diff --git a/src/server.rs b/src/server.rs index e2d2bb556..bff2266a3 100644 --- a/src/server.rs +++ b/src/server.rs @@ -67,7 +67,10 @@ impl Default for Server<()> { } } -impl Server { +impl Server +where + State: Clone + Send + Sync + 'static, +{ /// Create a new Tide server with shared application scoped state. /// /// Application scoped state is useful for storing items @@ -185,9 +188,69 @@ impl Server { self } - /// Asynchronously serve the app with the supplied listener. For more details, see [Listener] and [ToListener] - pub async fn listen>(self, listener: TL) -> io::Result<()> { - listener.to_listener()?.listen(self).await + /// Asynchronously serve the app with the supplied listener. + /// + /// This is a shorthand for calling `Server::bind`, logging the `ListenInfo` + /// instances from `Listener::info`, and then calling `Listener::accept`. + /// + /// # Examples + /// + /// ```no_run + /// # use async_std::task::block_on; + /// # fn main() -> Result<(), std::io::Error> { block_on(async { + /// # + /// let mut app = tide::new(); + /// app.at("/").get(|_| async { Ok("Hello, world!") }); + /// app.listen("127.0.0.1:8080").await?; + /// # + /// # Ok(()) }) } + /// ``` + pub async fn listen>(self, listener: L) -> io::Result<()> { + let mut listener = listener.to_listener()?; + listener.bind(self).await?; + for info in listener.info().iter() { + log::info!("Server listening on {}", info); + } + listener.accept().await?; + Ok(()) + } + + /// Asynchronously bind the listener. + /// + /// Bind the listener. This starts the listening process by opening the + /// necessary network ports, but not yet accepting incoming connections. + /// `Listener::listen` should be called after this to start accepting + /// connections. + /// + /// When calling `Listener::info` multiple `ListenInfo` instances may be + /// returned. This is useful when using for example `ConcurrentListener` + /// which enables a single server to listen on muliple ports. + /// + /// # Examples + /// + /// ```no_run + /// # use async_std::task::block_on; + /// # fn main() -> Result<(), std::io::Error> { block_on(async { + /// # + /// use tide::prelude::*; + /// + /// let mut app = tide::new(); + /// app.at("/").get(|_| async { Ok("Hello, world!") }); + /// let mut listener = app.bind("127.0.0.1:8080").await?; + /// for info in listener.info().iter() { + /// println!("Server listening on {}", info); + /// } + /// listener.accept().await?; + /// # + /// # Ok(()) }) } + /// ``` + pub async fn bind>( + self, + listener: L, + ) -> io::Result<>::Listener> { + let mut listener = listener.to_listener()?; + listener.bind(self).await?; + Ok(listener) } /// Respond to a `Request` with a `Response`. diff --git a/tests/log.rs b/tests/log.rs index 263221bbe..2ef621a89 100644 --- a/tests/log.rs +++ b/tests/log.rs @@ -16,7 +16,7 @@ async fn test_server_listen(logger: &mut logtest::Logger) { let port = test_utils::find_port().await; let app = tide::new(); let res = app - .listen(("localhost", port)) + .listen(("127.0.0.1", port)) .timeout(Duration::from_millis(60)) .await; assert!(res.is_err()); @@ -26,7 +26,7 @@ async fn test_server_listen(logger: &mut logtest::Logger) { .unwrap(); assert_eq!( record.args(), - format!("Server listening on http://[::1]:{}", port) + format!("Server listening on http://127.0.0.1:{}", port) ); }