diff --git a/Cargo.lock b/Cargo.lock index 55f2d1d13fb..6d486f19b5e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3231,10 +3231,13 @@ name = "libp2p-server" version = "0.12.5" dependencies = [ "base64 0.21.5", + "bytes", "clap", "futures", "futures-timer", - "hyper 0.14.27", + "http-body-util", + "hyper 1.1.0", + "hyper-util", "libp2p", "prometheus-client", "serde", diff --git a/misc/server/Cargo.toml b/misc/server/Cargo.toml index 15e36576c7d..f2614927c1c 100644 --- a/misc/server/Cargo.toml +++ b/misc/server/Cargo.toml @@ -13,16 +13,19 @@ license = "MIT" [dependencies] base64 = "0.21" clap = { version = "4.4.12", features = ["derive"] } +bytes = "1.5.0" futures = "0.3" futures-timer = "3" -hyper = { version = "0.14", features = ["server", "tcp", "http1"] } +http-body-util = "0.1.0" +hyper = { version = "1.1", features = ["full"]} +hyper-util = { version = "0.1.2", features = ["full"] } libp2p = { workspace = true, features = ["autonat", "dns", "tokio", "noise", "tcp", "yamux", "identify", "kad", "ping", "relay", "metrics", "rsa", "macros", "quic", "websocket"] } prometheus-client = { workspace = true } serde = "1.0.193" -serde_derive = "1.0.125" +serde_derive = "1.0.193" serde_json = "1.0" tokio = { version = "1", features = ["rt-multi-thread", "macros"] } -tracing = "0.1.37" +tracing = "0.1.40" tracing-subscriber = { version = "0.3", features = ["env-filter"] } zeroize = "1" diff --git a/misc/server/src/http_service.rs b/misc/server/src/http_service.rs index 7905933fbf5..ba4a54208c3 100644 --- a/misc/server/src/http_service.rs +++ b/misc/server/src/http_service.rs @@ -18,29 +18,48 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +use bytes::Bytes; +use http_body_util::Full; use hyper::http::StatusCode; +use hyper::server::conn::http1; use hyper::service::Service; -use hyper::{Body, Method, Request, Response, Server}; +use hyper::{body::Incoming as IncomingBody, Method, Request, Response}; +use hyper_util::rt::TokioIo; use prometheus_client::encoding::text::encode; use prometheus_client::registry::Registry; use std::future::Future; +use std::net::SocketAddr; use std::pin::Pin; use std::sync::{Arc, Mutex}; -use std::task::{Context, Poll}; +use tokio::net::TcpListener; const METRICS_CONTENT_TYPE: &str = "application/openmetrics-text;charset=utf-8;version=1.0.0"; pub(crate) async fn metrics_server( registry: Registry, metrics_path: String, -) -> Result<(), hyper::Error> { +) -> Result<(), std::io::Error> { // Serve on localhost. - let addr = ([0, 0, 0, 0], 8888).into(); - - let server = Server::bind(&addr).serve(MakeMetricService::new(registry, metrics_path.clone())); - tracing::info!(metrics_server=%format!("http://{}{}", server.local_addr(), metrics_path)); - server.await?; - Ok(()) + let addr: SocketAddr = ([0, 0, 0, 0], 8888).into(); + + tracing::info!(metrics_server=%format!("http://{:?}{}", addr, metrics_path)); + let make_metrics_service = MakeMetricService::new(registry, metrics_path.clone()); + let listener = TcpListener::bind(addr).await?; + loop { + let (stream, _) = listener.accept().await?; + let io = TokioIo::new(stream); + let make_metrics_service_clone = make_metrics_service.clone(); + tokio::task::spawn(async move { + if let Err(err) = http1::Builder::new() + .serve_connection(io, make_metrics_service_clone) + .await + { + tracing::error!("server error: {}", err); + } + }); + } } + +#[derive(Debug, Clone, Default)] pub(crate) struct MetricService { reg: Arc>, metrics_path: String, @@ -52,8 +71,8 @@ impl MetricService { fn get_reg(&mut self) -> SharedRegistry { Arc::clone(&self.reg) } - fn respond_with_metrics(&mut self) -> Response { - let mut response: Response = Response::default(); + fn respond_with_metrics(&mut self) -> Response> { + let mut response: Response> = Response::default(); response.headers_mut().insert( hyper::header::CONTENT_TYPE, @@ -61,45 +80,44 @@ impl MetricService { ); let reg = self.get_reg(); - encode(&mut response.body_mut(), ®.lock().unwrap()).unwrap(); + let mut inner_str = String::new(); + encode(&mut inner_str, ®.lock().unwrap()).unwrap(); + *response.body_mut() = Full::new(Bytes::from(inner_str)); *response.status_mut() = StatusCode::OK; response } - fn respond_with_404_not_found(&mut self) -> Response { + fn respond_with_404_not_found(&mut self) -> Response> { Response::builder() .status(StatusCode::NOT_FOUND) - .body(format!( + .body(Full::new(Bytes::from(format!( "Not found try localhost:[port]/{}", self.metrics_path - )) + )))) .unwrap() } } -impl Service> for MetricService { - type Response = Response; +impl Service> for MetricService { + type Response = Response>; type Error = hyper::Error; type Future = Pin> + Send>>; - fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn call(&mut self, req: Request) -> Self::Future { + fn call(&self, req: Request) -> Self::Future { let req_path = req.uri().path(); let req_method = req.method(); let resp = if (req_method == Method::GET) && (req_path == self.metrics_path) { // Encode and serve metrics from registry. - self.respond_with_metrics() + self.clone().respond_with_metrics() } else { - self.respond_with_404_not_found() + self.clone().respond_with_404_not_found() }; Box::pin(async { Ok(resp) }) } } +#[derive(Debug, Clone)] pub(crate) struct MakeMetricService { reg: SharedRegistry, metrics_path: String, @@ -114,19 +132,15 @@ impl MakeMetricService { } } -impl Service for MakeMetricService { - type Response = MetricService; +impl Service> for MakeMetricService { + type Response = Response>; type Error = hyper::Error; type Future = Pin> + Send>>; - fn poll_ready(&mut self, _: &mut Context) -> Poll> { - Poll::Ready(Ok(())) - } - - fn call(&mut self, _: T) -> Self::Future { + fn call(&self, req: Request) -> Self::Future { let reg = self.reg.clone(); let metrics_path = self.metrics_path.clone(); - let fut = async move { Ok(MetricService { reg, metrics_path }) }; + let fut = async move { Ok(MetricService { reg, metrics_path }.call(req).await.unwrap()) }; Box::pin(fut) } }