Skip to content

Commit

Permalink
update hyper version to 1.0 in libp2p-server
Browse files Browse the repository at this point in the history
  • Loading branch information
getong committed Jan 2, 2024
1 parent 9db4500 commit 619b8f8
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 36 deletions.
5 changes: 4 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 6 additions & 3 deletions misc/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,19 @@ license = "MIT"
[dependencies]
base64 = "0.21"
clap = { version = "4.4.12", features = ["derive"] }
bytes = "1.5.0"
futures = "0.3"
futures-timer = "3"
hyper = { version = "0.14", features = ["server", "tcp", "http1"] }
http-body-util = "0.1.0"
hyper = { version = "1.1", features = ["full"]}
hyper-util = { version = "0.1.2", features = ["full"] }
libp2p = { workspace = true, features = ["autonat", "dns", "tokio", "noise", "tcp", "yamux", "identify", "kad", "ping", "relay", "metrics", "rsa", "macros", "quic", "websocket"] }
prometheus-client = { workspace = true }
serde = "1.0.193"
serde_derive = "1.0.125"
serde_derive = "1.0.193"
serde_json = "1.0"
tokio = { version = "1", features = ["rt-multi-thread", "macros"] }
tracing = "0.1.37"
tracing = "0.1.40"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
zeroize = "1"

Expand Down
78 changes: 46 additions & 32 deletions misc/server/src/http_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,48 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use bytes::Bytes;
use http_body_util::Full;
use hyper::http::StatusCode;
use hyper::server::conn::http1;
use hyper::service::Service;
use hyper::{Body, Method, Request, Response, Server};
use hyper::{body::Incoming as IncomingBody, Method, Request, Response};
use hyper_util::rt::TokioIo;
use prometheus_client::encoding::text::encode;
use prometheus_client::registry::Registry;
use std::future::Future;
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use tokio::net::TcpListener;

const METRICS_CONTENT_TYPE: &str = "application/openmetrics-text;charset=utf-8;version=1.0.0";
pub(crate) async fn metrics_server(
registry: Registry,
metrics_path: String,
) -> Result<(), hyper::Error> {
) -> Result<(), std::io::Error> {
// Serve on localhost.
let addr = ([0, 0, 0, 0], 8888).into();

let server = Server::bind(&addr).serve(MakeMetricService::new(registry, metrics_path.clone()));
tracing::info!(metrics_server=%format!("http://{}{}", server.local_addr(), metrics_path));
server.await?;
Ok(())
let addr: SocketAddr = ([0, 0, 0, 0], 8888).into();

tracing::info!(metrics_server=%format!("http://{:?}{}", addr, metrics_path));
let make_metrics_service = MakeMetricService::new(registry, metrics_path.clone());
let listener = TcpListener::bind(addr).await?;
loop {
let (stream, _) = listener.accept().await?;
let io = TokioIo::new(stream);
let make_metrics_service_clone = make_metrics_service.clone();
tokio::task::spawn(async move {
if let Err(err) = http1::Builder::new()
.serve_connection(io, make_metrics_service_clone)
.await
{
tracing::error!("server error: {}", err);
}
});
}
}

#[derive(Debug, Clone, Default)]
pub(crate) struct MetricService {
reg: Arc<Mutex<Registry>>,
metrics_path: String,
Expand All @@ -52,54 +71,53 @@ impl MetricService {
fn get_reg(&mut self) -> SharedRegistry {
Arc::clone(&self.reg)
}
fn respond_with_metrics(&mut self) -> Response<String> {
let mut response: Response<String> = Response::default();
fn respond_with_metrics(&mut self) -> Response<Full<Bytes>> {
let mut response: Response<Full<Bytes>> = Response::default();

response.headers_mut().insert(
hyper::header::CONTENT_TYPE,
METRICS_CONTENT_TYPE.try_into().unwrap(),
);

let reg = self.get_reg();
encode(&mut response.body_mut(), &reg.lock().unwrap()).unwrap();
let mut inner_str = String::new();
encode(&mut inner_str, &reg.lock().unwrap()).unwrap();
*response.body_mut() = Full::new(Bytes::from(inner_str));

*response.status_mut() = StatusCode::OK;

response
}
fn respond_with_404_not_found(&mut self) -> Response<String> {
fn respond_with_404_not_found(&mut self) -> Response<Full<Bytes>> {
Response::builder()
.status(StatusCode::NOT_FOUND)
.body(format!(
.body(Full::new(Bytes::from(format!(
"Not found try localhost:[port]/{}",
self.metrics_path
))
))))
.unwrap()
}
}

impl Service<Request<Body>> for MetricService {
type Response = Response<String>;
impl Service<Request<IncomingBody>> for MetricService {
type Response = Response<Full<Bytes>>;
type Error = hyper::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

fn call(&mut self, req: Request<Body>) -> Self::Future {
fn call(&self, req: Request<IncomingBody>) -> Self::Future {
let req_path = req.uri().path();
let req_method = req.method();
let resp = if (req_method == Method::GET) && (req_path == self.metrics_path) {
// Encode and serve metrics from registry.
self.respond_with_metrics()
self.clone().respond_with_metrics()
} else {
self.respond_with_404_not_found()
self.clone().respond_with_404_not_found()
};
Box::pin(async { Ok(resp) })
}
}

#[derive(Debug, Clone)]
pub(crate) struct MakeMetricService {
reg: SharedRegistry,
metrics_path: String,
Expand All @@ -114,19 +132,15 @@ impl MakeMetricService {
}
}

impl<T> Service<T> for MakeMetricService {
type Response = MetricService;
impl Service<Request<IncomingBody>> for MakeMetricService {
type Response = Response<Full<Bytes>>;
type Error = hyper::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

fn poll_ready(&mut self, _: &mut Context) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

fn call(&mut self, _: T) -> Self::Future {
fn call(&self, req: Request<IncomingBody>) -> Self::Future {
let reg = self.reg.clone();
let metrics_path = self.metrics_path.clone();
let fut = async move { Ok(MetricService { reg, metrics_path }) };
let fut = async move { Ok(MetricService { reg, metrics_path }.call(req).await.unwrap()) };
Box::pin(fut)
}
}

0 comments on commit 619b8f8

Please sign in to comment.