Skip to content

Commit

Permalink
wip: update kube and hyper
Browse files Browse the repository at this point in the history
  • Loading branch information
olix0r committed Jan 5, 2025
1 parent 8a98113 commit 5229b3c
Show file tree
Hide file tree
Showing 10 changed files with 570 additions and 576 deletions.
1,002 changes: 494 additions & 508 deletions Cargo.lock

Large diffs are not rendered by default.

11 changes: 8 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ members = [
lto = "thin"

[workspace.dependencies]
k8s-openapi = { version = "0.20", features = ["v1_22"] }
kube = { version = "0.87.1", default-features = false }
kubert = { version = "0.22", default-features = false }
k8s-openapi = { version = "0.24", features = ["v1_31"] }
kube = { version = "0.98", default-features = false }
kubert = { version = "0.23.0-alpha5", default-features = false }

# TODO(ver): Remove this once we update to a proper generated version of the gateway api bindings.
k8s-gateway-api = { git = "https://github.com/linkerd/k8s-gateway-api-rs", features = [
"experimental",
] }
2 changes: 1 addition & 1 deletion policy-controller/grpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ async-trait = "0.1"
http = "0.2"
drain = "0.1"
futures = { version = "0.3", default-features = false }
hyper = { version = "0.14", features = ["http2", "server", "tcp"] }
hyper = { version = "1", features = ["http2", "server"] }
linkerd-policy-controller-core = { path = "../core" }
maplit = "1"
prost-types = "0.12.6"
Expand Down
2 changes: 1 addition & 1 deletion policy-controller/k8s/api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ publish = false

[dependencies]
k8s-openapi = { workspace = true }
k8s-gateway-api = { version = "0.16", features = ["experimental"] }
k8s-gateway-api = { workspace = true, features = ["experimental"] }
kube = { workspace = true, default-features = false, features = [
"client",
"derive",
Expand Down
2 changes: 1 addition & 1 deletion policy-controller/k8s/index/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ tracing = "0.1"

[dev-dependencies]
chrono = { version = "0.4", default-features = false }
k8s-openapi = { version = "0.20", features = ["schemars"] }
k8s-openapi = { workspace = true, features = ["schemars"] }
maplit = "1"
tokio-stream = "0.1"
tokio-test = "0.4"
Expand Down
3 changes: 2 additions & 1 deletion policy-controller/runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ async-trait = "0.1"
drain = "0.1"
futures = { version = "0.3", default-features = false }
k8s-openapi = { workspace = true }
hyper = { version = "0.14", features = ["http1", "http2", "runtime", "server"] }
hyper = { version = "1", features = ["http1", "http2", "server"] }
ipnet = { version = "2", default-features = false }
openssl = { version = "0.10.68", optional = true }
parking_lot = "0.12"
Expand Down Expand Up @@ -56,6 +56,7 @@ features = [
"lease",
"prometheus-client",
"runtime",
"runtime-diagnostics",
"server",
"rustls-tls",
]
Expand Down
2 changes: 1 addition & 1 deletion policy-controller/runtime/src/admission.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ trait Validate<T> {

// === impl AdmissionService ===

impl hyper::service::Service<Request<Body>> for Admission {
impl hyper::service::Service<Request<hyper::body::Incoming>> for Admission {
type Response = Response<Body>;
type Error = Error;
type Future = future::BoxFuture<'static, Result<Response<Body>, Error>>;
Expand Down
102 changes: 54 additions & 48 deletions policy-controller/runtime/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ use futures::prelude::*;
use k8s::{api::apps::v1::Deployment, Client, ObjectMeta, Resource};
use k8s_openapi::api::coordination::v1 as coordv1;
use kube::{api::PatchParams, runtime::watcher};
use kubert::LeaseManager;
use prometheus_client::registry::Registry;
use std::{net::SocketAddr, sync::Arc};
use tokio::{sync::mpsc, time::Duration};
use tokio::{
sync::{mpsc, watch},
time::Duration,
};
use tonic::transport::Server;
use tracing::{info, info_span, instrument, Instrument};

Expand Down Expand Up @@ -179,18 +181,20 @@ impl Args {

let hostname =
std::env::var("HOSTNAME").expect("Failed to fetch `HOSTNAME` environment variable");
let params = kubert::lease::ClaimParams {
lease_duration: LEASE_DURATION,
renew_grace_period: RENEW_GRACE_PERIOD,
};

let lease = init_lease(
runtime.client(),
&control_plane_namespace,
let claims = init_lease(
&runtime,
&policy_deployment_name,
kubert::LeaseParams {
name: LEASE_NAME.to_string(),
namespace: control_plane_namespace.clone(),
claimant: hostname,
lease_duration: LEASE_DURATION,
renew_grace_period: RENEW_GRACE_PERIOD,
field_manager: Some("policy-controller".into()),
},
)
.await?;
let (claims, _task) = lease.spawn(hostname.clone(), params).await?;

// Build the status index which will maintain information necessary for
// updating the status field of policy resources.
Expand Down Expand Up @@ -463,60 +467,62 @@ async fn grpc(
Ok(())
}

async fn init_lease(client: Client, ns: &str, deployment_name: &str) -> Result<LeaseManager> {
async fn init_lease<T>(
runtime: &kubert::Runtime<T>,
deployment_name: &str,
params: kubert::LeaseParams,
) -> Result<watch::Receiver<Arc<kubert::lease::Claim>>> {
// Fetch the policy-controller deployment so that we can use it as an owner
// reference of the Lease.
let api = k8s::Api::<Deployment>::namespaced(client.clone(), ns);
let api = k8s::Api::<Deployment>::namespaced(runtime.client(), &params.namespace);
let deployment = api.get(deployment_name).await?;

let api = k8s::Api::namespaced(client, ns);
let params = PatchParams {
field_manager: Some("policy-controller".to_string()),
..Default::default()
let lease = coordv1::Lease {
metadata: ObjectMeta {
name: Some(params.name.clone()),
namespace: Some(params.namespace.clone()),
// Specifying a resource version of "0" means that we will
// only create the Lease if it does not already exist.
resource_version: Some("0".to_string()),
owner_references: Some(vec![deployment.controller_owner_ref(&()).unwrap()]),
labels: Some(
[
(
"linkerd.io/control-plane-component".to_string(),
"destination".to_string(),
),
(
"linkerd.io/control-plane-ns".to_string(),
params.namespace.clone(),
),
]
.into_iter()
.collect(),
),
..Default::default()
},
spec: None,
};
match api
match k8s::Api::<coordv1::Lease>::namespaced(runtime.client(), &params.namespace)
.patch(
LEASE_NAME,
&params,
&kube::api::Patch::Apply(coordv1::Lease {
metadata: ObjectMeta {
name: Some(LEASE_NAME.to_string()),
namespace: Some(ns.to_string()),
// Specifying a resource version of "0" means that we will
// only create the Lease if it does not already exist.
resource_version: Some("0".to_string()),
owner_references: Some(vec![deployment.controller_owner_ref(&()).unwrap()]),
labels: Some(
[
(
"linkerd.io/control-plane-component".to_string(),
"destination".to_string(),
),
("linkerd.io/control-plane-ns".to_string(), ns.to_string()),
]
.into_iter()
.collect(),
),
..Default::default()
},
spec: None,
}),
&PatchParams {
field_manager: params.field_manager.clone().map(Into::into),
..Default::default()
},
&kube::api::Patch::Apply(lease),
)
.await
{
Ok(lease) => tracing::info!(?lease, "Created Lease resource"),
Err(k8s::Error::Api(_)) => tracing::debug!("Lease already exists, no need to create it"),
Err(error) => {
tracing::error!(%error, "Failed to create Lease resource");
return Err(error.into());
}
};
// Create the lease manager used for trying to claim the policy
// controller write lease.
// todo: Do we need to use LeaseManager::field_manager here?
kubert::lease::LeaseManager::init(api, LEASE_NAME)
.await
.map_err(Into::into)

let (claim, _task) = runtime.spawn_lease(params).await?;
Ok(claim)
}

async fn api_resource_exists<T>(client: &Client) -> bool
Expand Down
4 changes: 2 additions & 2 deletions policy-test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ publish = false

[dependencies]
anyhow = "1"
hyper = { version = "0.14", features = ["client", "http2"] }
hyper = { version = "1", features = ["client", "http2"] }
futures = { version = "0.3", default-features = false }
ipnet = "2"
k8s-gateway-api = "0.16"
k8s-gateway-api = { workspace = true }
k8s-openapi = { workspace = true }
linkerd-policy-controller-core = { path = "../policy-controller/core" }
linkerd-policy-controller-k8s-api = { path = "../policy-controller/k8s/api" }
Expand Down
16 changes: 6 additions & 10 deletions policy-test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ pub async fn create_ready_pod(client: &kube::Client, pod: k8s::Pod) -> k8s::Pod
ip = %pod
.status.as_ref().expect("pod must have a status")
.pod_ips.as_ref().unwrap()[0]
.ip.as_deref().expect("pod ip must be set"),
.ip,
containers = ?pod
.spec.as_ref().expect("pod must have a spec")
.containers.iter().map(|c| &*c.name).collect::<Vec<_>>(),
Expand Down Expand Up @@ -755,15 +755,11 @@ pub async fn await_service_account(client: &kube::Client, ns: &str, name: &str)
.expect("serviceaccounts watch must not fail");
tracing::info!(?ev);
match ev {
kube::runtime::watcher::Event::Restarted(sas) => {
if sas.iter().any(|sa| sa.name_unchecked() == name) {
return;
}
}
kube::runtime::watcher::Event::Applied(sa) => {
if sa.name_unchecked() == name {
return;
}
kube::runtime::watcher::Event::InitApply(sa)
| kube::runtime::watcher::Event::Apply(sa)
if sa.name_unchecked() == name =>
{
return
}
_ => {}
}
Expand Down

0 comments on commit 5229b3c

Please sign in to comment.