From f6468a92f7800ad11f3f90dbc95cb17c8f7aca60 Mon Sep 17 00:00:00 2001 From: katelyn martin Date: Thu, 26 Sep 2024 00:00:00 +0000 Subject: [PATCH] chore(deps): address hyper deprecations in policy controller NB: this branch is based upon #13492. see #8733 for more information about migrating to hyper 1.0. this enables the `backports` and `deprecated` feature flags in the hyper dependencies in this project, and addresses warnings. see for more information about these feature flags. largely, the control plane is unaffected by this upgrade, besides the following changes: * one usage of a deprecated `hyper::body::aggregate` function is updated. * a `hyper::rt::Executor` implementation, which spawns tasks onto the tokio runtime, is provided. once we upgrade to hyper 1.0, we can replace this with the executor provided in [`hyper-util`](https://docs.rs/hyper-util/latest/hyper_util/rt/tokio/struct.TokioExecutor.html#impl-Executor%3CFut%3E-for-TokioExecutor). * the `hyper::service::Service>` implementation for `GrpcHttp` now boxes its returned future, on account of `SendRequest` returning an anonymous `impl Future`. * the `policy-test` additionally depends on the `runtime` feature of hyper. this is an artifact of an internal config structure shared by the legacy connection builder and the backported connection builder containing two keep-alive fields that were feature gated prior to 1.0. Signed-off-by: katelyn martin --- Cargo.lock | 1 + policy-controller/grpc/Cargo.toml | 2 +- policy-controller/runtime/Cargo.toml | 3 ++- policy-controller/runtime/src/admission.rs | 3 ++- policy-test/Cargo.toml | 2 +- policy-test/src/grpc.rs | 17 +++++++++++------ policy-test/src/lib.rs | 2 ++ policy-test/src/rt.rs | 18 ++++++++++++++++++ 8 files changed, 38 insertions(+), 10 deletions(-) create mode 100644 policy-test/src/rt.rs diff --git a/Cargo.lock b/Cargo.lock index 26195f9511971..389478914d237 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1415,6 +1415,7 @@ dependencies = [ "clap", "drain", "futures", + "http-body", "hyper", "ipnet", "k8s-openapi", diff --git a/policy-controller/grpc/Cargo.toml b/policy-controller/grpc/Cargo.toml index c8f2b765d319e..acfdf6a5ada9e 100644 --- a/policy-controller/grpc/Cargo.toml +++ b/policy-controller/grpc/Cargo.toml @@ -11,7 +11,7 @@ async-trait = "0.1" http = "0.2" drain = "0.1" futures = { version = "0.3", default-features = false } -hyper = { version = "0.14", features = ["http2", "server", "tcp"] } +hyper = { version = "0.14", features = ["backports", "deprecated", "http2", "server", "tcp"] } linkerd-policy-controller-core = { path = "../core" } maplit = "1" prost-types = "0.12.6" diff --git a/policy-controller/runtime/Cargo.toml b/policy-controller/runtime/Cargo.toml index 0a70e6fe21269..49eaebee0ba04 100644 --- a/policy-controller/runtime/Cargo.toml +++ b/policy-controller/runtime/Cargo.toml @@ -19,7 +19,8 @@ async-trait = "0.1" drain = "0.1" futures = { version = "0.3", default-features = false } k8s-openapi = { workspace = true } -hyper = { version = "0.14", features = ["http1", "http2", "runtime", "server"] } +http-body = "0.4" +hyper = { version = "0.14", features = ["backports", "deprecated", "http1", "http2", "runtime", "server"] } ipnet = { version = "2", default-features = false } openssl = { version = "0.10.68", optional = true } parking_lot = "0.12" diff --git a/policy-controller/runtime/src/admission.rs b/policy-controller/runtime/src/admission.rs index 71cd8df790391..270fc3b80cff3 100644 --- a/policy-controller/runtime/src/admission.rs +++ b/policy-controller/runtime/src/admission.rs @@ -73,7 +73,8 @@ impl hyper::service::Service> for Admission { let admission = self.clone(); Box::pin(async move { - let bytes = hyper::body::aggregate(req.into_body()).await?; + use http_body::Body as _; + let bytes = req.into_body().collect().await?.aggregate(); let review: Review = match serde_json::from_reader(bytes.reader()) { Ok(review) => review, Err(error) => { diff --git a/policy-test/Cargo.toml b/policy-test/Cargo.toml index 82aa16fd405a4..33a220289cef5 100644 --- a/policy-test/Cargo.toml +++ b/policy-test/Cargo.toml @@ -7,7 +7,7 @@ publish = false [dependencies] anyhow = "1" -hyper = { version = "0.14", features = ["client", "http2"] } +hyper = { version = "0.14", features = ["backports", "deprecated", "client", "http2", "runtime"] } futures = { version = "0.3", default-features = false } ipnet = "2" k8s-gateway-api = "0.16" diff --git a/policy-test/src/grpc.rs b/policy-test/src/grpc.rs index b640c0b26724c..2b9624c059bfc 100644 --- a/policy-test/src/grpc.rs +++ b/policy-test/src/grpc.rs @@ -4,15 +4,17 @@ //! forwarding to connect to a running instance. use anyhow::Result; -pub use linkerd2_proxy_api::*; use linkerd2_proxy_api::{ inbound::inbound_server_policies_client::InboundServerPoliciesClient, outbound::outbound_policies_client::OutboundPoliciesClient, }; use linkerd_policy_controller_grpc::workload; use linkerd_policy_controller_k8s_api::{self as k8s, ResourceExt}; +use std::{future::Future, pin::Pin}; use tokio::io; +pub use linkerd2_proxy_api::*; + #[macro_export] macro_rules! assert_is_default_all_unauthenticated { ($config:expr) => { @@ -105,7 +107,7 @@ pub struct OutboundPolicyClient { #[derive(Debug)] struct GrpcHttp { - tx: hyper::client::conn::SendRequest, + tx: hyper::client::conn::http2::SendRequest, } async fn get_policy_controller_pod(client: &kube::Client) -> Result { @@ -338,8 +340,7 @@ impl GrpcHttp { where I: io::AsyncRead + io::AsyncWrite + Unpin + Send + 'static, { - let (tx, conn) = hyper::client::conn::Builder::new() - .http2_only(true) + let (tx, conn) = hyper::client::conn::http2::Builder::new(crate::rt::TokioExecutor) .handshake(io) .await?; tokio::spawn(conn); @@ -350,7 +351,7 @@ impl GrpcHttp { impl hyper::service::Service> for GrpcHttp { type Response = hyper::Response; type Error = hyper::Error; - type Future = hyper::client::conn::ResponseFuture; + type Future = Pin>>>; fn poll_ready( &mut self, @@ -360,6 +361,8 @@ impl hyper::service::Service> for GrpcHttp } fn call(&mut self, req: hyper::Request) -> Self::Future { + use futures::FutureExt; + let (mut parts, body) = req.into_parts(); let mut uri = parts.uri.into_parts(); @@ -371,7 +374,9 @@ impl hyper::service::Service> for GrpcHttp ); parts.uri = hyper::Uri::from_parts(uri).unwrap(); - self.tx.call(hyper::Request::from_parts(parts, body)) + self.tx + .send_request(hyper::Request::from_parts(parts, body)) + .boxed() } } diff --git a/policy-test/src/lib.rs b/policy-test/src/lib.rs index da69d55a10d40..233f3c62f118b 100644 --- a/policy-test/src/lib.rs +++ b/policy-test/src/lib.rs @@ -8,6 +8,8 @@ pub mod grpc; pub mod outbound_api; pub mod web; +mod rt; + use kube::runtime::wait::Condition; use linkerd_policy_controller_k8s_api::{ self as k8s, diff --git a/policy-test/src/rt.rs b/policy-test/src/rt.rs new file mode 100644 index 0000000000000..fc569997aa9bf --- /dev/null +++ b/policy-test/src/rt.rs @@ -0,0 +1,18 @@ +//! HTTP runtime components for Linkerd. + +use hyper::rt::Executor; +use std::future::Future; + +#[derive(Clone, Debug, Default)] +pub struct TokioExecutor; + +impl Executor for TokioExecutor +where + F: Future + Send + 'static, + F::Output: Send + 'static, +{ + #[inline] + fn execute(&self, f: F) { + tokio::spawn(f); + } +}