diff --git a/Cargo.lock b/Cargo.lock index 26195f9511971..389478914d237 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1415,6 +1415,7 @@ dependencies = [ "clap", "drain", "futures", + "http-body", "hyper", "ipnet", "k8s-openapi", diff --git a/policy-controller/grpc/Cargo.toml b/policy-controller/grpc/Cargo.toml index c8f2b765d319e..acfdf6a5ada9e 100644 --- a/policy-controller/grpc/Cargo.toml +++ b/policy-controller/grpc/Cargo.toml @@ -11,7 +11,7 @@ async-trait = "0.1" http = "0.2" drain = "0.1" futures = { version = "0.3", default-features = false } -hyper = { version = "0.14", features = ["http2", "server", "tcp"] } +hyper = { version = "0.14", features = ["backports", "deprecated", "http2", "server", "tcp"] } linkerd-policy-controller-core = { path = "../core" } maplit = "1" prost-types = "0.12.6" diff --git a/policy-controller/runtime/Cargo.toml b/policy-controller/runtime/Cargo.toml index 0a70e6fe21269..49eaebee0ba04 100644 --- a/policy-controller/runtime/Cargo.toml +++ b/policy-controller/runtime/Cargo.toml @@ -19,7 +19,8 @@ async-trait = "0.1" drain = "0.1" futures = { version = "0.3", default-features = false } k8s-openapi = { workspace = true } -hyper = { version = "0.14", features = ["http1", "http2", "runtime", "server"] } +http-body = "0.4" +hyper = { version = "0.14", features = ["backports", "deprecated", "http1", "http2", "runtime", "server"] } ipnet = { version = "2", default-features = false } openssl = { version = "0.10.68", optional = true } parking_lot = "0.12" diff --git a/policy-controller/runtime/src/admission.rs b/policy-controller/runtime/src/admission.rs index 71cd8df790391..270fc3b80cff3 100644 --- a/policy-controller/runtime/src/admission.rs +++ b/policy-controller/runtime/src/admission.rs @@ -73,7 +73,8 @@ impl hyper::service::Service> for Admission { let admission = self.clone(); Box::pin(async move { - let bytes = hyper::body::aggregate(req.into_body()).await?; + use http_body::Body as _; + let bytes = req.into_body().collect().await?.aggregate(); let review: Review = match serde_json::from_reader(bytes.reader()) { Ok(review) => review, Err(error) => { diff --git a/policy-test/Cargo.toml b/policy-test/Cargo.toml index 82aa16fd405a4..33a220289cef5 100644 --- a/policy-test/Cargo.toml +++ b/policy-test/Cargo.toml @@ -7,7 +7,7 @@ publish = false [dependencies] anyhow = "1" -hyper = { version = "0.14", features = ["client", "http2"] } +hyper = { version = "0.14", features = ["backports", "deprecated", "client", "http2", "runtime"] } futures = { version = "0.3", default-features = false } ipnet = "2" k8s-gateway-api = "0.16" diff --git a/policy-test/src/grpc.rs b/policy-test/src/grpc.rs index b640c0b26724c..2b9624c059bfc 100644 --- a/policy-test/src/grpc.rs +++ b/policy-test/src/grpc.rs @@ -4,15 +4,17 @@ //! forwarding to connect to a running instance. use anyhow::Result; -pub use linkerd2_proxy_api::*; use linkerd2_proxy_api::{ inbound::inbound_server_policies_client::InboundServerPoliciesClient, outbound::outbound_policies_client::OutboundPoliciesClient, }; use linkerd_policy_controller_grpc::workload; use linkerd_policy_controller_k8s_api::{self as k8s, ResourceExt}; +use std::{future::Future, pin::Pin}; use tokio::io; +pub use linkerd2_proxy_api::*; + #[macro_export] macro_rules! assert_is_default_all_unauthenticated { ($config:expr) => { @@ -105,7 +107,7 @@ pub struct OutboundPolicyClient { #[derive(Debug)] struct GrpcHttp { - tx: hyper::client::conn::SendRequest, + tx: hyper::client::conn::http2::SendRequest, } async fn get_policy_controller_pod(client: &kube::Client) -> Result { @@ -338,8 +340,7 @@ impl GrpcHttp { where I: io::AsyncRead + io::AsyncWrite + Unpin + Send + 'static, { - let (tx, conn) = hyper::client::conn::Builder::new() - .http2_only(true) + let (tx, conn) = hyper::client::conn::http2::Builder::new(crate::rt::TokioExecutor) .handshake(io) .await?; tokio::spawn(conn); @@ -350,7 +351,7 @@ impl GrpcHttp { impl hyper::service::Service> for GrpcHttp { type Response = hyper::Response; type Error = hyper::Error; - type Future = hyper::client::conn::ResponseFuture; + type Future = Pin>>>; fn poll_ready( &mut self, @@ -360,6 +361,8 @@ impl hyper::service::Service> for GrpcHttp } fn call(&mut self, req: hyper::Request) -> Self::Future { + use futures::FutureExt; + let (mut parts, body) = req.into_parts(); let mut uri = parts.uri.into_parts(); @@ -371,7 +374,9 @@ impl hyper::service::Service> for GrpcHttp ); parts.uri = hyper::Uri::from_parts(uri).unwrap(); - self.tx.call(hyper::Request::from_parts(parts, body)) + self.tx + .send_request(hyper::Request::from_parts(parts, body)) + .boxed() } } diff --git a/policy-test/src/lib.rs b/policy-test/src/lib.rs index da69d55a10d40..233f3c62f118b 100644 --- a/policy-test/src/lib.rs +++ b/policy-test/src/lib.rs @@ -8,6 +8,8 @@ pub mod grpc; pub mod outbound_api; pub mod web; +mod rt; + use kube::runtime::wait::Condition; use linkerd_policy_controller_k8s_api::{ self as k8s, diff --git a/policy-test/src/rt.rs b/policy-test/src/rt.rs new file mode 100644 index 0000000000000..fc569997aa9bf --- /dev/null +++ b/policy-test/src/rt.rs @@ -0,0 +1,18 @@ +//! HTTP runtime components for Linkerd. + +use hyper::rt::Executor; +use std::future::Future; + +#[derive(Clone, Debug, Default)] +pub struct TokioExecutor; + +impl Executor for TokioExecutor +where + F: Future + Send + 'static, + F::Output: Send + 'static, +{ + #[inline] + fn execute(&self, f: F) { + tokio::spawn(f); + } +}