Skip to content

Commit

Permalink
build(deps): bump hyper from 0.3 to 1.0
Browse files Browse the repository at this point in the history
  • Loading branch information
olix0r committed Jan 6, 2025
1 parent 5229b3c commit 347be22
Show file tree
Hide file tree
Showing 11 changed files with 142 additions and 203 deletions.
256 changes: 84 additions & 172 deletions Cargo.lock

Large diffs are not rendered by default.

14 changes: 11 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,15 @@ k8s-openapi = { version = "0.24", features = ["v1_31"] }
kube = { version = "0.98", default-features = false }
kubert = { version = "0.23.0-alpha5", default-features = false }

[workspace.dependencies.k8s-gateway-api]
# TODO(ver): Remove this once we update to a proper generated version of the gateway api bindings.
k8s-gateway-api = { git = "https://github.com/linkerd/k8s-gateway-api-rs", features = [
"experimental",
] }
git = "https://github.com/linkerd/k8s-gateway-api-rs"
features = ["experimental"]

[workspace.dependencies.linkerd2-proxy-api]
git = "https://github.com/linkerd/linkerd2-proxy-api"
branch = "ver/deps-http"
features = [
"inbound",
"outbound",
]
2 changes: 1 addition & 1 deletion policy-controller/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@ anyhow = "1"
async-trait = "0.1"
chrono = { version = "0.4.39", default-features = false }
futures = { version = "0.3", default-features = false, features = ["std"] }
http = "0.2"
http = "1"
ipnet = "2"
regex = "1"
8 changes: 4 additions & 4 deletions policy-controller/grpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,19 @@ publish = false
[dependencies]
async-stream = "0.3"
async-trait = "0.1"
http = "0.2"
http = "1"
drain = "0.1"
futures = { version = "0.3", default-features = false }
hyper = { version = "1", features = ["http2", "server"] }
linkerd-policy-controller-core = { path = "../core" }
maplit = "1"
prost-types = "0.12.6"
prost-types = "0.13"
tokio = { version = "1", features = ["macros"] }
tonic = { version = "0.10", default-features = false }
tonic = { version = "0.12", default-features = false }
tracing = "0.1"
serde = { version = "1", features = ["derive"] }
serde_json = "1"

[dependencies.linkerd2-proxy-api]
version = "0.15"
workspace = true
features = ["inbound", "outbound"]
2 changes: 1 addition & 1 deletion policy-controller/grpc/src/outbound/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ pub(crate) fn protocol(
}),
http1: Some(outbound::proxy_protocol::Http1 {
routes: routes.clone(),
failure_accrual: accrual.clone(),
failure_accrual: accrual,
}),
http2: Some(outbound::proxy_protocol::Http2 {
routes,
Expand Down
2 changes: 1 addition & 1 deletion policy-controller/k8s/index/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ ahash = "0.8"
anyhow = "1"
chrono = { version = "0.4.39", default-features = false }
futures = { version = "0.3", default-features = false }
http = "0.2"
http = "1"
kube = { workspace = true, default-features = false, features = [
"client",
"derive",
Expand Down
5 changes: 4 additions & 1 deletion policy-controller/runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ rustls-tls = ["kube/rustls-tls"]
[dependencies]
anyhow = "1"
async-trait = "0.1"
bytes = "1"
drain = "0.1"
futures = { version = "0.3", default-features = false }
k8s-openapi = { workspace = true }
http-body-util = "0.1"
hyper = { version = "1", features = ["http1", "http2", "server"] }
ipnet = { version = "2", default-features = false }
openssl = { version = "0.10.68", optional = true }
Expand All @@ -28,6 +30,7 @@ serde = "1"
serde_json = "1"
thiserror = "2"
tokio-stream = { version = "0.1", features = ["sync"] }
tower = "0.4"
tracing = "0.1"
regex = "1"

Expand Down Expand Up @@ -66,6 +69,6 @@ version = "1"
features = ["macros", "parking_lot", "rt", "rt-multi-thread", "signal"]

[dependencies.tonic]
version = "0.10"
version = "0.12"
default-features = false
features = ["transport"]
23 changes: 15 additions & 8 deletions policy-controller/runtime/src/admission.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@ use crate::k8s::policy::{
};
use anyhow::{anyhow, bail, ensure, Context, Result};
use futures::future;
use hyper::{body::Buf, http, Body, Request, Response};
use http_body_util::BodyExt;
use hyper::{http, Request, Response};
use k8s_openapi::api::core::v1::{Namespace, ServiceAccount};
use kube::{core::DynamicObject, Resource, ResourceExt};
use linkerd_policy_controller_core as core;
use linkerd_policy_controller_k8s_api::gateway::{self as k8s_gateway_api, GrpcRoute};
use linkerd_policy_controller_k8s_index::{self as index, outbound::index as outbound_index};
use serde::de::DeserializeOwned;
use std::{collections::BTreeMap, task};
use std::collections::BTreeMap;
use thiserror::Error;
use tracing::{debug, info, trace, warn};

Expand Down Expand Up @@ -49,31 +50,37 @@ trait Validate<T> {
) -> Result<()>;
}

type Body = http_body_util::Full<bytes::Bytes>;

// === impl AdmissionService ===

impl hyper::service::Service<Request<hyper::body::Incoming>> for Admission {
impl tower::Service<Request<hyper::body::Incoming>> for Admission {
type Response = Response<Body>;
type Error = Error;
type Future = future::BoxFuture<'static, Result<Response<Body>, Error>>;

fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> task::Poll<Result<(), Error>> {
task::Poll::Ready(Ok(()))
fn poll_ready(
&mut self,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<std::result::Result<(), Self::Error>> {
std::task::Poll::Ready(Ok(()))
}

fn call(&mut self, req: Request<Body>) -> Self::Future {
fn call(&mut self, req: Request<hyper::body::Incoming>) -> Self::Future {
trace!(?req);
if req.method() != http::Method::POST || req.uri().path() != "/" {
return Box::pin(future::ok(
Response::builder()
.status(http::StatusCode::NOT_FOUND)
.body(Body::empty())
.body(Body::default())
.expect("not found response must be valid"),
));
}

let admission = self.clone();
Box::pin(async move {
let bytes = hyper::body::aggregate(req.into_body()).await?;
use bytes::Buf;
let bytes = req.into_body().collect().await?.to_bytes();
let review: Review = match serde_json::from_reader(bytes.reader()) {
Ok(review) => review,
Err(error) => {
Expand Down
2 changes: 1 addition & 1 deletion policy-controller/runtime/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ impl Args {
kubert::LeaseParams {
name: LEASE_NAME.to_string(),
namespace: control_plane_namespace.clone(),
claimant: hostname,
claimant: hostname.clone(),
lease_duration: LEASE_DURATION,
renew_grace_period: RENEW_GRACE_PERIOD,
field_manager: Some("policy-controller".into()),
Expand Down
8 changes: 6 additions & 2 deletions policy-test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ publish = false

[dependencies]
anyhow = "1"
bytes = "1"
http-body-util = "0.1"
hyper = { version = "1", features = ["client", "http2"] }
hyper-util = { version = "0.1" }
futures = { version = "0.3", default-features = false }
ipnet = "2"
k8s-gateway-api = { workspace = true }
Expand All @@ -20,8 +23,9 @@ rand = "0.8"
serde = "1"
serde_json = "1"
schemars = "0.8"
tonic = { version = "0.10", default-features = false }
tonic = { version = "0.12", default-features = false }
tokio = { version = "1", features = ["macros", "rt"] }
tower = "0.4"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

Expand All @@ -31,7 +35,7 @@ default-features = false
features = ["client", "openssl-tls", "runtime", "ws"]

[dependencies.linkerd2-proxy-api]
version = "0.15"
workspace = true
features = ["inbound", "outbound"]

[dev-dependencies]
Expand Down
23 changes: 14 additions & 9 deletions policy-test/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
//! forwarding to connect to a running instance.
use anyhow::Result;
use futures::{future, prelude::*};
pub use linkerd2_proxy_api::*;
use linkerd2_proxy_api::{
inbound::inbound_server_policies_client::InboundServerPoliciesClient,
Expand Down Expand Up @@ -105,7 +106,7 @@ pub struct OutboundPolicyClient {

#[derive(Debug)]
struct GrpcHttp {
tx: hyper::client::conn::SendRequest<tonic::body::BoxBody>,
tx: hyper::client::conn::http2::SendRequest<tonic::body::BoxBody>,
}

async fn get_policy_controller_pod(client: &kube::Client) -> Result<String> {
Expand Down Expand Up @@ -338,19 +339,21 @@ impl GrpcHttp {
where
I: io::AsyncRead + io::AsyncWrite + Unpin + Send + 'static,
{
let (tx, conn) = hyper::client::conn::Builder::new()
.http2_only(true)
.handshake(io)
.await?;
let (tx, conn) =
hyper::client::conn::http2::Builder::new(hyper_util::rt::TokioExecutor::new())
.handshake(hyper_util::rt::TokioIo::new(io))
.await?;
tokio::spawn(conn);
Ok(Self { tx })
}
}

impl hyper::service::Service<hyper::Request<tonic::body::BoxBody>> for GrpcHttp {
type Response = hyper::Response<hyper::Body>;
type Body = hyper::body::Incoming;

impl tower::Service<hyper::Request<tonic::body::BoxBody>> for GrpcHttp {
type Response = hyper::Response<Body>;
type Error = hyper::Error;
type Future = hyper::client::conn::ResponseFuture;
type Future = future::BoxFuture<'static, Result<hyper::Response<Body>, hyper::Error>>;

fn poll_ready(
&mut self,
Expand All @@ -371,7 +374,9 @@ impl hyper::service::Service<hyper::Request<tonic::body::BoxBody>> for GrpcHttp
);
parts.uri = hyper::Uri::from_parts(uri).unwrap();

self.tx.call(hyper::Request::from_parts(parts, body))
self.tx
.send_request(hyper::Request::from_parts(parts, body))
.boxed()
}
}

Expand Down

0 comments on commit 347be22

Please sign in to comment.