diff --git a/README.md b/README.md index 4753cd387..9a64c66dc 100644 --- a/README.md +++ b/README.md @@ -43,7 +43,7 @@ For real world projects see [ADOPTERS](https://kube.rs/adopters/). ## Api -The [`Api`](https://docs.rs/kube/*/kube/struct.Api.html) is what interacts with Kubernetes resources, and is generic over [`Resource`](https://docs.rs/kube/*/kube/trait.Resource.html): +The [`Api`](https://docs.rs/kube/latest/kube/struct.Api.html) is what interacts with Kubernetes resources, and is generic over [`Resource`](https://docs.rs/kube/latest/kube/trait.Resource.html): ```rust use k8s_openapi::api::core::v1::Pod; @@ -102,7 +102,7 @@ A streaming interface (similar to informers) that presents [`watcher::Event`](ht ```rust let api = Api::::default_namespaced(client); -let stream = watcher(api, Config::default()).applied_objects(); +let stream = watcher(api, Config::default()).default_backoff().applied_objects(); ``` This now gives a continual stream of events and you do not need to care about the watch having to restart, or connections dropping. @@ -113,6 +113,7 @@ while let Some(event) = stream.try_next().await? { } ``` + Note the base items from a `watcher` stream are an abstraction above the native `WatchEvent` to allow for store buffering. If you are following along to "see what changed", you can use utilities from [`WatchStreamExt`](https://docs.rs/kube/latest/kube/runtime/trait.WatchStreamExt.html), such as `applied_objects` to get a more conventional stream. ## Reflectors diff --git a/deny.toml b/deny.toml index 5a67b9755..0e5655f50 100644 --- a/deny.toml +++ b/deny.toml @@ -64,6 +64,10 @@ multiple-versions = "deny" [[bans.skip]] name = "rustls-native-certs" +[[bans.skip]] +# blocked on us swapping out serde_yaml +name = "hashbrown" + [[bans.skip]] # base64 did some annoying breaking changes name = "base64" diff --git a/kube-runtime/src/utils/event_flatten.rs b/kube-runtime/src/utils/event_decode.rs similarity index 90% rename from kube-runtime/src/utils/event_flatten.rs rename to kube-runtime/src/utils/event_decode.rs index 489b2103b..2a0085120 100644 --- a/kube-runtime/src/utils/event_flatten.rs +++ b/kube-runtime/src/utils/event_decode.rs @@ -9,17 +9,17 @@ use pin_project::pin_project; #[pin_project] /// Stream returned by the [`applied_objects`](super::WatchStreamExt::applied_objects) and [`touched_objects`](super::WatchStreamExt::touched_objects) method. #[must_use = "streams do nothing unless polled"] -pub struct EventFlatten { +pub struct EventDecode { #[pin] stream: St, emit_deleted: bool, } -impl>, K> EventFlatten { +impl>, K> EventDecode { pub(super) fn new(stream: St, emit_deleted: bool) -> Self { Self { stream, emit_deleted } } } -impl Stream for EventFlatten +impl Stream for EventDecode where St: Stream, Error>>, { @@ -50,11 +50,11 @@ where pub(crate) mod tests { use std::{pin::pin, task::Poll}; - use super::{Error, Event, EventFlatten}; + use super::{Error, Event, EventDecode}; use futures::{poll, stream, StreamExt}; #[tokio::test] - async fn watches_applies_uses_correct_eventflattened_stream() { + async fn watches_applies_uses_correct_stream() { let data = stream::iter([ Ok(Event::Apply(0)), Ok(Event::Apply(1)), @@ -65,7 +65,7 @@ pub(crate) mod tests { Err(Error::NoResourceVersion), Ok(Event::Apply(2)), ]); - let mut rx = pin!(EventFlatten::new(data, false)); + let mut rx = pin!(EventDecode::new(data, false)); assert!(matches!(poll!(rx.next()), Poll::Ready(Some(Ok(0))))); assert!(matches!(poll!(rx.next()), Poll::Ready(Some(Ok(1))))); // NB: no Deleted events here diff --git a/kube-runtime/src/utils/mod.rs b/kube-runtime/src/utils/mod.rs index 2ae546bc8..b8fd342b7 100644 --- a/kube-runtime/src/utils/mod.rs +++ b/kube-runtime/src/utils/mod.rs @@ -2,7 +2,7 @@ mod backoff_reset_timer; pub(crate) mod delayed_init; -mod event_flatten; +mod event_decode; mod event_modify; #[cfg(feature = "unstable-runtime-predicates")] mod predicate; mod reflect; @@ -10,13 +10,19 @@ mod stream_backoff; mod watch_ext; pub use backoff_reset_timer::ResetTimerBackoff; -pub use event_flatten::EventFlatten; +pub use event_decode::EventDecode; pub use event_modify::EventModify; #[cfg(feature = "unstable-runtime-predicates")] pub use predicate::{predicates, Predicate, PredicateFilter}; pub use reflect::Reflect; pub use stream_backoff::StreamBackoff; pub use watch_ext::WatchStreamExt; +/// Deprecated type alias for `EventDecode` +#[deprecated( + since = "0.96.0", + note = "renamed to by `EventDecode`. This alias will be removed in 0.100.0." +)] +pub use EventDecode as EventFlatten; use futures::{ stream::{self, Peekable}, diff --git a/kube-runtime/src/utils/watch_ext.rs b/kube-runtime/src/utils/watch_ext.rs index 0b74c05c1..94e4cbe16 100644 --- a/kube-runtime/src/utils/watch_ext.rs +++ b/kube-runtime/src/utils/watch_ext.rs @@ -1,7 +1,7 @@ #[cfg(feature = "unstable-runtime-predicates")] use crate::utils::predicate::{Predicate, PredicateFilter}; use crate::{ - utils::{event_flatten::EventFlatten, event_modify::EventModify, stream_backoff::StreamBackoff}, + utils::{event_decode::EventDecode, event_modify::EventModify, stream_backoff::StreamBackoff}, watcher, }; use kube_client::Resource; @@ -33,24 +33,24 @@ pub trait WatchStreamExt: Stream { StreamBackoff::new(self, b) } - /// Flatten a [`watcher()`] stream into a stream of applied objects + /// Decode a [`watcher()`] stream into a stream of applied objects /// /// All Added/Modified events are passed through, and critical errors bubble up. - fn applied_objects(self) -> EventFlatten + fn applied_objects(self) -> EventDecode where Self: Stream, watcher::Error>> + Sized, { - EventFlatten::new(self, false) + EventDecode::new(self, false) } - /// Flatten a [`watcher()`] stream into a stream of touched objects + /// Decode a [`watcher()`] stream into a stream of touched objects /// /// All Added/Modified/Deleted events are passed through, and critical errors bubble up. - fn touched_objects(self) -> EventFlatten + fn touched_objects(self) -> EventDecode where Self: Stream, watcher::Error>> + Sized, { - EventFlatten::new(self, true) + EventDecode::new(self, true) } /// Modify elements of a [`watcher()`] stream. @@ -88,7 +88,7 @@ pub trait WatchStreamExt: Stream { EventModify::new(self, f) } - /// Filter out a flattened stream on [`predicates`](crate::predicates). + /// Filter a stream based on on [`predicates`](crate::predicates). /// /// This will filter out repeat calls where the predicate returns the same result. /// Common use case for this is to avoid repeat events for status updates diff --git a/kube-runtime/src/watcher.rs b/kube-runtime/src/watcher.rs index 33157cbde..b7ef1b569 100644 --- a/kube-runtime/src/watcher.rs +++ b/kube-runtime/src/watcher.rs @@ -71,7 +71,10 @@ impl Event { /// /// `Deleted` objects are ignored, all objects mentioned by `Restarted` events are /// emitted individually. - #[deprecated(since = "0.92.0", note = "unnecessary to flatten a single object")] + #[deprecated( + since = "0.92.0", + note = "unnecessary to flatten a single object. This fn will be removed in 0.96.0." + )] pub fn into_iter_applied(self) -> impl Iterator { match self { Self::Apply(obj) | Self::InitApply(obj) => Some(obj), @@ -85,7 +88,10 @@ impl Event { /// Note that `Deleted` events may be missed when restarting the stream. Use finalizers /// or owner references instead if you care about cleaning up external resources after /// deleted objects. - #[deprecated(since = "0.92.0", note = "unnecessary to flatten a single object")] + #[deprecated( + since = "0.92.0", + note = "unnecessary to flatten a single object. This fn will be removed in 0.96.0." + )] pub fn into_iter_touched(self) -> impl Iterator { match self { Self::Apply(obj) | Self::Delete(obj) | Self::InitApply(obj) => Some(obj), @@ -710,8 +716,8 @@ where /// [`try_for_each`](futures::TryStreamExt::try_for_each) and [`try_concat`](futures::TryStreamExt::try_concat)) /// will terminate eagerly as soon as they receive an [`Err`]. /// -/// This is intended to provide a safe and atomic input interface for a state store like a [`reflector`]. -/// Direct users may want to flatten composite events via [`WatchStreamExt`]: +/// The events are intended to provide a safe input interface for a state store like a [`reflector`]. +/// Direct users may want to use [`WatchStreamExt`] for higher-level constructs. /// /// ```no_run /// use kube::{ @@ -773,8 +779,8 @@ pub fn watcher( /// [`try_for_each`](futures::TryStreamExt::try_for_each) and [`try_concat`](futures::TryStreamExt::try_concat)) /// will terminate eagerly as soon as they receive an [`Err`]. /// -/// This is intended to provide a safe and atomic input interface for a state store like a [`reflector`]. -/// Direct users may want to flatten composite events via [`WatchStreamExt`]: +/// The events are intended to provide a safe input interface for a state store like a [`reflector`]. +/// Direct users may want to use [`WatchStreamExt`] for higher-level constructs. /// /// ```no_run /// use kube::{