Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

runtime: rename references from Flatten to Decode #1520

Merged
merged 11 commits into from
Oct 4, 2024
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ For real world projects see [ADOPTERS](https://kube.rs/adopters/).

## Api

The [`Api`](https://docs.rs/kube/*/kube/struct.Api.html) is what interacts with Kubernetes resources, and is generic over [`Resource`](https://docs.rs/kube/*/kube/trait.Resource.html):
The [`Api`](https://docs.rs/kube/latest/kube/struct.Api.html) is what interacts with Kubernetes resources, and is generic over [`Resource`](https://docs.rs/kube/latest/kube/trait.Resource.html):

```rust
use k8s_openapi::api::core::v1::Pod;
Expand Down Expand Up @@ -102,7 +102,7 @@ A streaming interface (similar to informers) that presents [`watcher::Event`](ht

```rust
let api = Api::<Pod>::default_namespaced(client);
let stream = watcher(api, Config::default()).applied_objects();
let stream = watcher(api, Config::default()).default_backoff().applied_objects();
```

This now gives a continual stream of events and you do not need to care about the watch having to restart, or connections dropping.
Expand All @@ -113,6 +113,7 @@ while let Some(event) = stream.try_next().await? {
}
```


Note the base items from a `watcher` stream are an abstraction above the native `WatchEvent` to allow for store buffering. If you are following along to "see what changed", you can use utilities from [`WatchStreamExt`](https://docs.rs/kube/latest/kube/runtime/trait.WatchStreamExt.html), such as `applied_objects` to get a more conventional stream.

## Reflectors
Expand Down
4 changes: 4 additions & 0 deletions deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ multiple-versions = "deny"
[[bans.skip]]
name = "rustls-native-certs"

[[bans.skip]]
# blocked on us swapping out serde_yaml
name = "hashbrown"

[[bans.skip]]
# base64 did some annoying breaking changes
name = "base64"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,17 @@ use pin_project::pin_project;
#[pin_project]
/// Stream returned by the [`applied_objects`](super::WatchStreamExt::applied_objects) and [`touched_objects`](super::WatchStreamExt::touched_objects) method.
#[must_use = "streams do nothing unless polled"]
pub struct EventFlatten<St> {
pub struct EventDecode<St> {
#[pin]
stream: St,
emit_deleted: bool,
}
impl<St: TryStream<Ok = Event<K>>, K> EventFlatten<St> {
impl<St: TryStream<Ok = Event<K>>, K> EventDecode<St> {
pub(super) fn new(stream: St, emit_deleted: bool) -> Self {
Self { stream, emit_deleted }
}
}
impl<St, K> Stream for EventFlatten<St>
impl<St, K> Stream for EventDecode<St>
where
St: Stream<Item = Result<Event<K>, Error>>,
{
Expand Down Expand Up @@ -50,11 +50,11 @@ where
pub(crate) mod tests {
use std::{pin::pin, task::Poll};

use super::{Error, Event, EventFlatten};
use super::{Error, Event, EventDecode};
use futures::{poll, stream, StreamExt};

#[tokio::test]
async fn watches_applies_uses_correct_eventflattened_stream() {
async fn watches_applies_uses_correct_stream() {
let data = stream::iter([
Ok(Event::Apply(0)),
Ok(Event::Apply(1)),
Expand All @@ -65,7 +65,7 @@ pub(crate) mod tests {
Err(Error::NoResourceVersion),
Ok(Event::Apply(2)),
]);
let mut rx = pin!(EventFlatten::new(data, false));
let mut rx = pin!(EventDecode::new(data, false));
assert!(matches!(poll!(rx.next()), Poll::Ready(Some(Ok(0)))));
assert!(matches!(poll!(rx.next()), Poll::Ready(Some(Ok(1)))));
// NB: no Deleted events here
Expand Down
10 changes: 8 additions & 2 deletions kube-runtime/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,27 @@

mod backoff_reset_timer;
pub(crate) mod delayed_init;
mod event_flatten;
mod event_decode;
mod event_modify;
#[cfg(feature = "unstable-runtime-predicates")] mod predicate;
mod reflect;
mod stream_backoff;
mod watch_ext;

pub use backoff_reset_timer::ResetTimerBackoff;
pub use event_flatten::EventFlatten;
pub use event_decode::EventDecode;
pub use event_modify::EventModify;
#[cfg(feature = "unstable-runtime-predicates")]
pub use predicate::{predicates, Predicate, PredicateFilter};
pub use reflect::Reflect;
pub use stream_backoff::StreamBackoff;
pub use watch_ext::WatchStreamExt;
/// Deprecated type alias for `EventDecode`
#[deprecated(
since = "0.96.0",
note = "renamed to by `EventDecode`. This alias will be removed in 0.100.0."
)]
pub use EventDecode as EventFlatten;

use futures::{
stream::{self, Peekable},
Expand Down
16 changes: 8 additions & 8 deletions kube-runtime/src/utils/watch_ext.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#[cfg(feature = "unstable-runtime-predicates")]
use crate::utils::predicate::{Predicate, PredicateFilter};
use crate::{
utils::{event_flatten::EventFlatten, event_modify::EventModify, stream_backoff::StreamBackoff},
utils::{event_decode::EventDecode, event_modify::EventModify, stream_backoff::StreamBackoff},
watcher,
};
use kube_client::Resource;
Expand Down Expand Up @@ -33,24 +33,24 @@
StreamBackoff::new(self, b)
}

/// Flatten a [`watcher()`] stream into a stream of applied objects
/// Decode a [`watcher()`] stream into a stream of applied objects
///
/// All Added/Modified events are passed through, and critical errors bubble up.
fn applied_objects<K>(self) -> EventFlatten<Self>
fn applied_objects<K>(self) -> EventDecode<Self>
where
Self: Stream<Item = Result<watcher::Event<K>, watcher::Error>> + Sized,
{
EventFlatten::new(self, false)
EventDecode::new(self, false)
}

/// Flatten a [`watcher()`] stream into a stream of touched objects
/// Decode a [`watcher()`] stream into a stream of touched objects
///
/// All Added/Modified/Deleted events are passed through, and critical errors bubble up.
fn touched_objects<K>(self) -> EventFlatten<Self>
fn touched_objects<K>(self) -> EventDecode<Self>

Check warning on line 49 in kube-runtime/src/utils/watch_ext.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/utils/watch_ext.rs#L49

Added line #L49 was not covered by tests
where
Self: Stream<Item = Result<watcher::Event<K>, watcher::Error>> + Sized,
{
EventFlatten::new(self, true)
EventDecode::new(self, true)

Check warning on line 53 in kube-runtime/src/utils/watch_ext.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/utils/watch_ext.rs#L53

Added line #L53 was not covered by tests
}

/// Modify elements of a [`watcher()`] stream.
Expand Down Expand Up @@ -88,7 +88,7 @@
EventModify::new(self, f)
}

/// Filter out a flattened stream on [`predicates`](crate::predicates).
/// Filter a stream based on on [`predicates`](crate::predicates).
///
/// This will filter out repeat calls where the predicate returns the same result.
/// Common use case for this is to avoid repeat events for status updates
Expand Down
18 changes: 12 additions & 6 deletions kube-runtime/src/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,10 @@ impl<K> Event<K> {
///
/// `Deleted` objects are ignored, all objects mentioned by `Restarted` events are
/// emitted individually.
#[deprecated(since = "0.92.0", note = "unnecessary to flatten a single object")]
#[deprecated(
since = "0.92.0",
note = "unnecessary to flatten a single object. This fn will be removed in 0.96.0."
)]
pub fn into_iter_applied(self) -> impl Iterator<Item = K> {
match self {
Self::Apply(obj) | Self::InitApply(obj) => Some(obj),
Expand All @@ -85,7 +88,10 @@ impl<K> Event<K> {
/// Note that `Deleted` events may be missed when restarting the stream. Use finalizers
/// or owner references instead if you care about cleaning up external resources after
/// deleted objects.
#[deprecated(since = "0.92.0", note = "unnecessary to flatten a single object")]
#[deprecated(
since = "0.92.0",
note = "unnecessary to flatten a single object. This fn will be removed in 0.96.0."
)]
pub fn into_iter_touched(self) -> impl Iterator<Item = K> {
match self {
Self::Apply(obj) | Self::Delete(obj) | Self::InitApply(obj) => Some(obj),
Expand Down Expand Up @@ -710,8 +716,8 @@ where
/// [`try_for_each`](futures::TryStreamExt::try_for_each) and [`try_concat`](futures::TryStreamExt::try_concat))
/// will terminate eagerly as soon as they receive an [`Err`].
///
/// This is intended to provide a safe and atomic input interface for a state store like a [`reflector`].
/// Direct users may want to flatten composite events via [`WatchStreamExt`]:
/// The events are intended to provide a safe input interface for a state store like a [`reflector`].
/// Direct users may want to use [`WatchStreamExt`] for higher-level constructs.
///
/// ```no_run
/// use kube::{
Expand Down Expand Up @@ -773,8 +779,8 @@ pub fn watcher<K: Resource + Clone + DeserializeOwned + Debug + Send + 'static>(
/// [`try_for_each`](futures::TryStreamExt::try_for_each) and [`try_concat`](futures::TryStreamExt::try_concat))
/// will terminate eagerly as soon as they receive an [`Err`].
///
/// This is intended to provide a safe and atomic input interface for a state store like a [`reflector`].
/// Direct users may want to flatten composite events via [`WatchStreamExt`]:
/// The events are intended to provide a safe input interface for a state store like a [`reflector`].
/// Direct users may want to use [`WatchStreamExt`] for higher-level constructs.
///
/// ```no_run
/// use kube::{
Expand Down
Loading