diff --git a/Cargo.toml b/Cargo.toml index 906fece31..a6633ac7f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,7 +37,7 @@ assert-json-diff = "2.0.2" async-broadcast = "0.7.0" async-stream = "0.3.5" async-trait = "0.1.64" -backoff = "0.4.0" +backon = "1.3" base64 = "0.22.1" bytes = "1.1.0" chrono = { version = "0.4.34", default-features = false } diff --git a/examples/Cargo.toml b/examples/Cargo.toml index f260bc73b..b5bbe2c7c 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -52,7 +52,7 @@ tower-http = { workspace = true, features = ["trace", "decompression-gzip"] } hyper = { workspace = true, features = ["client", "http1"] } hyper-util = { workspace = true, features = ["client-legacy", "http1", "tokio"] } thiserror.workspace = true -backoff.workspace = true +backon.workspace = true clap = { version = "4.0", default-features = false, features = ["std", "cargo", "derive"] } edit = "0.1.3" tokio-stream = { version = "0.1.9", features = ["net"] } diff --git a/kube-runtime/Cargo.toml b/kube-runtime/Cargo.toml index 959601641..09975cf5f 100644 --- a/kube-runtime/Cargo.toml +++ b/kube-runtime/Cargo.toml @@ -43,7 +43,7 @@ json-patch.workspace = true jsonptr.workspace = true serde_json.workspace = true thiserror.workspace = true -backoff.workspace = true +backon.workspace = true async-trait.workspace = true hashbrown.workspace = true k8s-openapi.workspace = true diff --git a/kube-runtime/src/controller/mod.rs b/kube-runtime/src/controller/mod.rs index e701b1d1b..8a8f7fc3a 100644 --- a/kube-runtime/src/controller/mod.rs +++ b/kube-runtime/src/controller/mod.rs @@ -8,10 +8,11 @@ use crate::{ ObjectRef, }, scheduler::{debounced_scheduler, ScheduleRequest}, - utils::{trystream_try_via, CancelableJoinHandle, KubeRuntimeStreamExt, StreamBackoff, WatchStreamExt}, + utils::{ + trystream_try_via, Backoff, CancelableJoinHandle, KubeRuntimeStreamExt, StreamBackoff, WatchStreamExt, + }, watcher::{self, metadata_watcher, watcher, DefaultBackoff}, }; -use backoff::backoff::Backoff; use educe::Educe; use futures::{ channel, @@ -915,7 +916,7 @@ where /// The [`default_backoff`](crate::watcher::default_backoff) follows client-go conventions, /// but can be overridden by calling this method. #[must_use] - pub fn trigger_backoff(mut self, backoff: impl Backoff + Send + 'static) -> Self { + pub fn trigger_backoff(mut self, backoff: impl Backoff + 'static) -> Self { self.trigger_backoff = Box::new(backoff); self } diff --git a/kube-runtime/src/utils/backoff_reset_timer.rs b/kube-runtime/src/utils/backoff_reset_timer.rs index 1c09a5344..e18817c24 100644 --- a/kube-runtime/src/utils/backoff_reset_timer.rs +++ b/kube-runtime/src/utils/backoff_reset_timer.rs @@ -1,36 +1,40 @@ use std::time::{Duration, Instant}; -use backoff::{backoff::Backoff, Clock, SystemClock}; +pub trait Backoff: Iterator + Send + Sync + Unpin { + /// Resets the internal state to the initial value. + fn reset(&mut self); +} + +impl Backoff for Box { + fn reset(&mut self) { + let this: &mut B = self; + this.reset() + } +} /// A [`Backoff`] wrapper that resets after a fixed duration has elapsed. -pub struct ResetTimerBackoff { +pub struct ResetTimerBackoff { backoff: B, - clock: C, last_backoff: Option, reset_duration: Duration, } impl ResetTimerBackoff { pub fn new(backoff: B, reset_duration: Duration) -> Self { - Self::new_with_custom_clock(backoff, reset_duration, SystemClock {}) - } -} - -impl ResetTimerBackoff { - fn new_with_custom_clock(backoff: B, reset_duration: Duration, clock: C) -> Self { Self { backoff, - clock, last_backoff: None, reset_duration, } } } -impl Backoff for ResetTimerBackoff { - fn next_backoff(&mut self) -> Option { +impl Iterator for ResetTimerBackoff { + type Item = Duration; + + fn next(&mut self) -> Option { if let Some(last_backoff) = self.last_backoff { - if self.clock.now() > last_backoff + self.reset_duration { + if tokio::time::Instant::now().into_std() > last_backoff + self.reset_duration { tracing::debug!( ?last_backoff, reset_duration = ?self.reset_duration, @@ -39,48 +43,40 @@ impl Backoff for ResetTimerBackoff { self.backoff.reset(); } } - self.last_backoff = Some(self.clock.now()); - self.backoff.next_backoff() + self.last_backoff = Some(tokio::time::Instant::now().into_std()); + self.backoff.next() } +} +impl Backoff for ResetTimerBackoff { fn reset(&mut self) { - // Do not even bother trying to reset here, since `next_backoff` will take care of this when the timer expires. + self.backoff.reset(); } } #[cfg(test)] mod tests { - use backoff::{backoff::Backoff, Clock}; use tokio::time::advance; use super::ResetTimerBackoff; use crate::utils::stream_backoff::tests::LinearBackoff; - use std::time::{Duration, Instant}; + use std::time::Duration; #[tokio::test] async fn should_reset_when_timer_expires() { tokio::time::pause(); - let mut backoff = ResetTimerBackoff::new_with_custom_clock( + let mut backoff = ResetTimerBackoff::new( LinearBackoff::new(Duration::from_secs(2)), Duration::from_secs(60), - TokioClock, ); - assert_eq!(backoff.next_backoff(), Some(Duration::from_secs(2))); + assert_eq!(backoff.next(), Some(Duration::from_secs(2))); advance(Duration::from_secs(40)).await; - assert_eq!(backoff.next_backoff(), Some(Duration::from_secs(4))); + assert_eq!(backoff.next(), Some(Duration::from_secs(4))); advance(Duration::from_secs(40)).await; - assert_eq!(backoff.next_backoff(), Some(Duration::from_secs(6))); + assert_eq!(backoff.next(), Some(Duration::from_secs(6))); advance(Duration::from_secs(80)).await; - assert_eq!(backoff.next_backoff(), Some(Duration::from_secs(2))); + assert_eq!(backoff.next(), Some(Duration::from_secs(2))); advance(Duration::from_secs(80)).await; - assert_eq!(backoff.next_backoff(), Some(Duration::from_secs(2))); - } - - struct TokioClock; - - impl Clock for TokioClock { - fn now(&self) -> Instant { - tokio::time::Instant::now().into_std() - } + assert_eq!(backoff.next(), Some(Duration::from_secs(2))); } } diff --git a/kube-runtime/src/utils/mod.rs b/kube-runtime/src/utils/mod.rs index 74cc7cf2f..e2722b0fa 100644 --- a/kube-runtime/src/utils/mod.rs +++ b/kube-runtime/src/utils/mod.rs @@ -9,7 +9,7 @@ mod reflect; mod stream_backoff; mod watch_ext; -pub use backoff_reset_timer::ResetTimerBackoff; +pub use backoff_reset_timer::{Backoff, ResetTimerBackoff}; pub use event_decode::EventDecode; pub use event_modify::EventModify; pub use predicate::{predicates, Predicate, PredicateFilter}; diff --git a/kube-runtime/src/utils/stream_backoff.rs b/kube-runtime/src/utils/stream_backoff.rs index 01c6c4292..a23a3461e 100644 --- a/kube-runtime/src/utils/stream_backoff.rs +++ b/kube-runtime/src/utils/stream_backoff.rs @@ -1,10 +1,11 @@ use std::{future::Future, pin::Pin, task::Poll}; -use backoff::backoff::Backoff; use futures::{Stream, TryStream}; use pin_project::pin_project; use tokio::time::{sleep, Instant, Sleep}; +use crate::utils::Backoff; + /// Applies a [`Backoff`] policy to a [`Stream`] /// /// After any [`Err`] is emitted, the stream is paused for [`Backoff::next_backoff`]. The @@ -71,7 +72,7 @@ impl Stream for StreamBackoff { let next_item = this.stream.try_poll_next(cx); match &next_item { Poll::Ready(Some(Err(_))) => { - if let Some(backoff_duration) = this.backoff.next_backoff() { + if let Some(backoff_duration) = this.backoff.next() { let backoff_sleep = sleep(backoff_duration); tracing::debug!( deadline = ?backoff_sleep.deadline(), @@ -98,16 +99,54 @@ impl Stream for StreamBackoff { pub(crate) mod tests { use std::{pin::pin, task::Poll, time::Duration}; + use crate::utils::Backoff; + use super::StreamBackoff; - use backoff::backoff::Backoff; + use backon::BackoffBuilder; use futures::{channel::mpsc, poll, stream, StreamExt}; + pub struct ConstantBackoff { + inner: backon::ConstantBackoff, + delay: Duration, + max_times: usize, + } + + impl ConstantBackoff { + pub fn new(delay: Duration, max_times: usize) -> Self { + Self { + inner: backon::ConstantBuilder::default() + .with_delay(delay) + .with_max_times(max_times) + .build(), + delay, + max_times, + } + } + } + + impl Iterator for ConstantBackoff { + type Item = Duration; + + fn next(&mut self) -> Option { + self.inner.next() + } + } + + impl Backoff for ConstantBackoff { + fn reset(&mut self) { + self.inner = backon::ConstantBuilder::default() + .with_delay(self.delay) + .with_max_times(self.max_times) + .build(); + } + } + #[tokio::test] async fn stream_should_back_off() { tokio::time::pause(); let tick = Duration::from_secs(1); let rx = stream::iter([Ok(0), Ok(1), Err(2), Ok(3), Ok(4)]); - let mut rx = pin!(StreamBackoff::new(rx, backoff::backoff::Constant::new(tick))); + let mut rx = pin!(StreamBackoff::new(rx, ConstantBackoff::new(tick, 10))); assert_eq!(poll!(rx.next()), Poll::Ready(Some(Ok(0)))); assert_eq!(poll!(rx.next()), Poll::Ready(Some(Ok(1)))); assert_eq!(poll!(rx.next()), Poll::Ready(Some(Err(2)))); @@ -149,16 +188,27 @@ pub(crate) mod tests { #[tokio::test] async fn backoff_should_close_when_requested() { assert_eq!( - StreamBackoff::new( - stream::iter([Ok(0), Ok(1), Err(2), Ok(3)]), - backoff::backoff::Stop {} - ) - .collect::>() - .await, + StreamBackoff::new(stream::iter([Ok(0), Ok(1), Err(2), Ok(3)]), StoppedBackoff {}) + .collect::>() + .await, vec![Ok(0), Ok(1), Err(2)] ); } + struct StoppedBackoff; + + impl Backoff for StoppedBackoff { + fn reset(&mut self) {} + } + + impl Iterator for StoppedBackoff { + type Item = Duration; + + fn next(&mut self) -> Option { + None + } + } + /// Dynamic backoff policy that is still deterministic and testable pub struct LinearBackoff { interval: Duration, @@ -174,12 +224,16 @@ pub(crate) mod tests { } } - impl Backoff for LinearBackoff { - fn next_backoff(&mut self) -> Option { + impl Iterator for LinearBackoff { + type Item = Duration; + + fn next(&mut self) -> Option { self.current_duration += self.interval; Some(self.current_duration) } + } + impl Backoff for LinearBackoff { fn reset(&mut self) { self.current_duration = Duration::ZERO } diff --git a/kube-runtime/src/utils/watch_ext.rs b/kube-runtime/src/utils/watch_ext.rs index 7ed636201..241871837 100644 --- a/kube-runtime/src/utils/watch_ext.rs +++ b/kube-runtime/src/utils/watch_ext.rs @@ -9,10 +9,12 @@ use crate::{ }; use kube_client::Resource; -use crate::{reflector::store::Writer, utils::Reflect}; +use crate::{ + reflector::store::Writer, + utils::{Backoff, Reflect}, +}; use crate::watcher::DefaultBackoff; -use backoff::backoff::Backoff; use futures::{Stream, TryStream}; /// Extension trait for streams returned by [`watcher`](watcher()) or [`reflector`](crate::reflector::reflector) diff --git a/kube-runtime/src/watcher.rs b/kube-runtime/src/watcher.rs index 8a649ec17..953838dac 100644 --- a/kube-runtime/src/watcher.rs +++ b/kube-runtime/src/watcher.rs @@ -2,9 +2,10 @@ //! //! See [`watcher`] for the primary entry point. -use crate::utils::ResetTimerBackoff; +use crate::utils::{Backoff, ResetTimerBackoff}; + use async_trait::async_trait; -use backoff::{backoff::Backoff, ExponentialBackoff}; +use backon::BackoffBuilder; use educe::Educe; use futures::{stream::BoxStream, Stream, StreamExt}; use kube_client::{ @@ -497,11 +498,14 @@ where { match state { State::Empty => match wc.initial_list_strategy { - InitialListStrategy::ListWatch => (Some(Ok(Event::Init)), State::InitPage { - continue_token: None, - objects: VecDeque::default(), - last_bookmark: None, - }), + InitialListStrategy::ListWatch => ( + Some(Ok(Event::Init)), + State::InitPage { + continue_token: None, + objects: VecDeque::default(), + last_bookmark: None, + }, + ), InitialListStrategy::StreamingList => match api.watch(&wc.to_watch_params(), "0").await { Ok(stream) => (None, State::InitialWatch { stream }), Err(err) => { @@ -520,11 +524,14 @@ where last_bookmark, } => { if let Some(next) = objects.pop_front() { - return (Some(Ok(Event::InitApply(next))), State::InitPage { - continue_token, - objects, - last_bookmark, - }); + return ( + Some(Ok(Event::InitApply(next))), + State::InitPage { + continue_token, + objects, + last_bookmark, + }, + ); } // check if we need to perform more pages if continue_token.is_none() { @@ -544,11 +551,14 @@ where } // Buffer page here, causing us to return to this enum branch (State::InitPage) // until the objects buffer has drained - (None, State::InitPage { - continue_token, - objects: list.items.into_iter().collect(), - last_bookmark, - }) + ( + None, + State::InitPage { + continue_token, + objects: list.items.into_iter().collect(), + last_bookmark, + }, + ) } Err(err) => { if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) { @@ -574,10 +584,13 @@ where Some(Ok(WatchEvent::Bookmark(bm))) => { let marks_initial_end = bm.metadata.annotations.contains_key("k8s.io/initial-events-end"); if marks_initial_end { - (Some(Ok(Event::InitDone)), State::Watching { - resource_version: bm.metadata.resource_version, - stream, - }) + ( + Some(Ok(Event::InitDone)), + State::Watching { + resource_version: bm.metadata.resource_version, + stream, + }, + ) } else { (None, State::InitialWatch { stream }) } @@ -609,19 +622,23 @@ where } State::InitListed { resource_version } => { match api.watch(&wc.to_watch_params(), &resource_version).await { - Ok(stream) => (None, State::Watching { - resource_version, - stream, - }), + Ok(stream) => ( + None, + State::Watching { + resource_version, + stream, + }, + ), Err(err) => { if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) { warn!("watch initlist error with 403: {err:?}"); } else { debug!("watch initlist error: {err:?}"); } - (Some(Err(Error::WatchStartFailed(err))), State::InitListed { - resource_version, - }) + ( + Some(Err(Error::WatchStartFailed(err))), + State::InitListed { resource_version }, + ) } } } @@ -634,10 +651,13 @@ where if resource_version.is_empty() { (Some(Err(Error::NoResourceVersion)), State::default()) } else { - (Some(Ok(Event::Apply(obj))), State::Watching { - resource_version, - stream, - }) + ( + Some(Ok(Event::Apply(obj))), + State::Watching { + resource_version, + stream, + }, + ) } } Some(Ok(WatchEvent::Deleted(obj))) => { @@ -645,16 +665,22 @@ where if resource_version.is_empty() { (Some(Err(Error::NoResourceVersion)), State::default()) } else { - (Some(Ok(Event::Delete(obj))), State::Watching { - resource_version, - stream, - }) + ( + Some(Ok(Event::Delete(obj))), + State::Watching { + resource_version, + stream, + }, + ) } } - Some(Ok(WatchEvent::Bookmark(bm))) => (None, State::Watching { - resource_version: bm.metadata.resource_version, - stream, - }), + Some(Ok(WatchEvent::Bookmark(bm))) => ( + None, + State::Watching { + resource_version: bm.metadata.resource_version, + stream, + }, + ), Some(Ok(WatchEvent::Error(err))) => { // HTTP GONE, means we have desynced and need to start over and re-list :( let new_state = if err.code == 410 { @@ -678,10 +704,13 @@ where } else { debug!("watcher error: {err:?}"); } - (Some(Err(Error::WatchFailed(err))), State::Watching { - resource_version, - stream, - }) + ( + Some(Err(Error::WatchFailed(err))), + State::Watching { + resource_version, + stream, + }, + ) } None => (None, State::InitListed { resource_version }), }, @@ -882,6 +911,52 @@ pub fn watch_object Self { + Self { + inner: backon::ExponentialBuilder::default() + .with_min_delay(min_delay) + .with_max_delay(max_delay) + .with_factor(factor) + .with_jitter() + .build(), + min_delay, + max_delay, + factor, + enable_jitter, + } + } +} + +impl Backoff for ExponentialBackoff { + fn reset(&mut self) { + let mut builder = backon::ExponentialBuilder::default() + .with_min_delay(self.min_delay) + .with_max_delay(self.max_delay) + .with_factor(self.factor); + if self.enable_jitter { + builder = builder.with_jitter(); + } + self.inner = builder.build(); + } +} + +impl Iterator for ExponentialBackoff { + type Item = Duration; + + fn next(&mut self) -> Option { + self.inner.next() + } +} + /// Default watcher backoff inspired by Kubernetes' client-go. /// /// The parameters currently optimize for being kind to struggling apiservers. @@ -898,24 +973,22 @@ type Strategy = ResetTimerBackoff; impl Default for DefaultBackoff { fn default() -> Self { Self(ResetTimerBackoff::new( - backoff::ExponentialBackoffBuilder::new() - .with_initial_interval(Duration::from_millis(800)) - .with_max_interval(Duration::from_secs(30)) - .with_randomization_factor(1.0) - .with_multiplier(2.0) - .with_max_elapsed_time(None) - .build(), + ExponentialBackoff::new(Duration::from_millis(800), Duration::from_secs(30), 2.0, true), Duration::from_secs(120), )) } } -impl Backoff for DefaultBackoff { - fn next_backoff(&mut self) -> Option { - self.0.next_backoff() +impl Iterator for DefaultBackoff { + type Item = Duration; + + fn next(&mut self) -> Option { + self.0.next() } +} +impl Backoff for DefaultBackoff { fn reset(&mut self) { - self.0.reset() + self.0.reset(); } }