From 5343bdc116492b5937f8de0ad8ca84072b57681d Mon Sep 17 00:00:00 2001 From: Danil-Grigorev Date: Sat, 28 Dec 2024 19:46:28 +0100 Subject: [PATCH 1/5] Propagate delete events in shared streams Signed-off-by: Danil-Grigorev --- kube-runtime/src/reflector/dispatcher.rs | 15 ++++++++--- kube-runtime/src/reflector/object_ref.rs | 6 +++++ kube-runtime/src/reflector/store.rs | 33 ++++++++++++++++++++++-- 3 files changed, 48 insertions(+), 6 deletions(-) diff --git a/kube-runtime/src/reflector/dispatcher.rs b/kube-runtime/src/reflector/dispatcher.rs index 1060dab2b..eb939d077 100644 --- a/kube-runtime/src/reflector/dispatcher.rs +++ b/kube-runtime/src/reflector/dispatcher.rs @@ -70,6 +70,11 @@ where pub(crate) fn subscribe(&self, reader: Store) -> ReflectHandle { ReflectHandle::new(reader, self.dispatch_tx.new_receiver()) } + + // Return a number of active subscribers to this shared sender. + pub(crate) fn subscribers(&self) -> usize { + self.dispatch_tx.receiver_count() - 1 + } } /// A handle to a shared stream reader @@ -132,10 +137,12 @@ where fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut this = self.project(); match ready!(this.rx.as_mut().poll_next(cx)) { - Some(obj_ref) => this - .reader - .get(&obj_ref) - .map_or(Poll::Pending, |obj| Poll::Ready(Some(obj))), + Some(obj_ref) => if obj_ref.extra.remaining_lookups.is_some() { + this.reader.remove(&obj_ref) + } else { + this.reader.get(&obj_ref) + } + .map_or(Poll::Pending, |obj| Poll::Ready(Some(obj))), None => Poll::Ready(None), } } diff --git a/kube-runtime/src/reflector/object_ref.rs b/kube-runtime/src/reflector/object_ref.rs index 9cfc4e028..ef2c02093 100644 --- a/kube-runtime/src/reflector/object_ref.rs +++ b/kube-runtime/src/reflector/object_ref.rs @@ -57,6 +57,7 @@ pub trait Lookup { extra: Extra { resource_version: self.resource_version().map(Cow::into_owned), uid: self.uid().map(Cow::into_owned), + remaining_lookups: None, }, } } @@ -156,6 +157,8 @@ pub struct Extra { pub resource_version: Option, /// The uid of the object pub uid: Option, + /// Number of remaining cache lookups on this reference + pub remaining_lookups: Option, } impl ObjectRef @@ -225,6 +228,7 @@ impl ObjectRef { extra: Extra { resource_version: None, uid: Some(owner.uid.clone()), + remaining_lookups: None, }, }) } else { @@ -271,6 +275,7 @@ impl From> for ObjectReference { extra: Extra { resource_version, uid, + .. }, } = val; ObjectReference { @@ -351,6 +356,7 @@ mod tests { extra: Extra { resource_version: Some("123".to_string()), uid: Some("638ffacd-f666-4402-ba10-7848c66ef576".to_string()), + remaining_lookups: None, }, ..minimal.clone() }; diff --git a/kube-runtime/src/reflector/store.rs b/kube-runtime/src/reflector/store.rs index d6d264dea..f2e72b6c1 100644 --- a/kube-runtime/src/reflector/store.rs +++ b/kube-runtime/src/reflector/store.rs @@ -107,8 +107,14 @@ where self.store.write().insert(key, obj); } watcher::Event::Delete(obj) => { - let key = obj.to_object_ref(self.dyntype.clone()); - self.store.write().remove(&key); + let mut key = obj.to_object_ref(self.dyntype.clone()); + let mut store = self.store.write(); + store.remove(&key); + if self.dispatcher.is_some() { + // Re-insert the entry with updated key, as insert on its own doesnt modify the key + key.extra.remaining_lookups = self.dispatcher.as_ref().map(|d| d.subscribers()); + store.insert(key, Arc::new(obj.clone())); + } } watcher::Event::Init => { self.buffer = AHashMap::new(); @@ -159,6 +165,12 @@ where } } + watcher::Event::Delete(obj) => { + let mut obj_ref = obj.to_object_ref(self.dyntype.clone()); + obj_ref.extra.remaining_lookups = Some(dispatcher.subscribers()); + dispatcher.broadcast(obj_ref).await; + } + _ => {} } } @@ -236,6 +248,23 @@ where .cloned() } + #[must_use] + pub fn remove(&self, key: &ObjectRef) -> Option> { + let mut store = self.store.write(); + store.remove_entry(key).map(|(k, obj)| { + let mut k = k.clone(); + match k.extra.remaining_lookups { + Some(..=1) | None => (), + Some(lookups) => { + k.extra.remaining_lookups = Some(lookups - 1); + store.insert(k, obj.clone()); + } + }; + + obj + }) + } + /// Return a full snapshot of the current values #[must_use] pub fn state(&self) -> Vec> { From d4ebe7b2e9a226fe303d7d9510fc748879f8d24b Mon Sep 17 00:00:00 2001 From: Danil-Grigorev Date: Sun, 29 Dec 2024 00:11:21 +0100 Subject: [PATCH 2/5] Clippy fixes Signed-off-by: Danil-Grigorev --- kube-runtime/src/reflector/object_ref.rs | 11 ++++++----- kube-runtime/src/reflector/store.rs | 2 +- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/kube-runtime/src/reflector/object_ref.rs b/kube-runtime/src/reflector/object_ref.rs index ef2c02093..7829f5494 100644 --- a/kube-runtime/src/reflector/object_ref.rs +++ b/kube-runtime/src/reflector/object_ref.rs @@ -272,11 +272,12 @@ impl From> for ObjectReference { dyntype: dt, name, namespace, - extra: Extra { - resource_version, - uid, - .. - }, + extra: + Extra { + resource_version, + uid, + .. + }, } = val; ObjectReference { api_version: Some(K::api_version(&dt).into_owned()), diff --git a/kube-runtime/src/reflector/store.rs b/kube-runtime/src/reflector/store.rs index f2e72b6c1..ea6578161 100644 --- a/kube-runtime/src/reflector/store.rs +++ b/kube-runtime/src/reflector/store.rs @@ -112,7 +112,7 @@ where store.remove(&key); if self.dispatcher.is_some() { // Re-insert the entry with updated key, as insert on its own doesnt modify the key - key.extra.remaining_lookups = self.dispatcher.as_ref().map(|d| d.subscribers()); + key.extra.remaining_lookups = self.dispatcher.as_ref().map(Dispatcher::subscribers); store.insert(key, Arc::new(obj.clone())); } } From 6b5fdcb34023bdc094ff4210053151ed97f667fd Mon Sep 17 00:00:00 2001 From: Danil-Grigorev Date: Sun, 29 Dec 2024 00:44:08 +0100 Subject: [PATCH 3/5] Fix tests Signed-off-by: Danil-Grigorev --- kube-runtime/src/reflector/dispatcher.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/kube-runtime/src/reflector/dispatcher.rs b/kube-runtime/src/reflector/dispatcher.rs index eb939d077..2c5fb32a3 100644 --- a/kube-runtime/src/reflector/dispatcher.rs +++ b/kube-runtime/src/reflector/dispatcher.rs @@ -73,7 +73,7 @@ where // Return a number of active subscribers to this shared sender. pub(crate) fn subscribers(&self) -> usize { - self.dispatch_tx.receiver_count() - 1 + self.dispatch_tx.receiver_count() } } @@ -241,6 +241,7 @@ pub(crate) mod test { let (_, writer) = reflector::store_shared(10); let mut subscriber = pin!(writer.subscribe().unwrap()); + let mut other_subscriber = pin!(writer.subscribe().unwrap()); let mut reflect = pin!(st.reflect_shared(writer)); // Deleted events should be skipped by subscriber. @@ -248,7 +249,8 @@ pub(crate) mod test { poll!(reflect.next()), Poll::Ready(Some(Ok(Event::Delete(_)))) )); - assert_eq!(poll!(subscriber.next()), Poll::Pending); + assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(foo.clone()))); + assert_eq!(poll!(other_subscriber.next()), Poll::Ready(Some(foo.clone()))); assert!(matches!( poll!(reflect.next()), From ae5814034c2d25084c346ab5e67dd863dec94b6a Mon Sep 17 00:00:00 2001 From: Danil-Grigorev Date: Sun, 29 Dec 2024 00:47:09 +0100 Subject: [PATCH 4/5] Remove redundant cloning Signed-off-by: Danil-Grigorev --- kube-runtime/src/reflector/store.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/kube-runtime/src/reflector/store.rs b/kube-runtime/src/reflector/store.rs index ea6578161..a99b7fce5 100644 --- a/kube-runtime/src/reflector/store.rs +++ b/kube-runtime/src/reflector/store.rs @@ -251,13 +251,12 @@ where #[must_use] pub fn remove(&self, key: &ObjectRef) -> Option> { let mut store = self.store.write(); - store.remove_entry(key).map(|(k, obj)| { - let mut k = k.clone(); - match k.extra.remaining_lookups { + store.remove_entry(key).map(|(mut key, obj)| { + match key.extra.remaining_lookups { Some(..=1) | None => (), Some(lookups) => { - k.extra.remaining_lookups = Some(lookups - 1); - store.insert(k, obj.clone()); + key.extra.remaining_lookups = Some(lookups - 1); + store.insert(key, obj.clone()); } }; From 61a9766bbe807f58a4309e71bfe464ec6f7b779f Mon Sep 17 00:00:00 2001 From: Danil-Grigorev Date: Sun, 29 Dec 2024 01:05:11 +0100 Subject: [PATCH 5/5] Add cache test Signed-off-by: Danil-Grigorev --- kube-runtime/src/reflector/dispatcher.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/kube-runtime/src/reflector/dispatcher.rs b/kube-runtime/src/reflector/dispatcher.rs index 2c5fb32a3..525d01ff6 100644 --- a/kube-runtime/src/reflector/dispatcher.rs +++ b/kube-runtime/src/reflector/dispatcher.rs @@ -152,8 +152,7 @@ where #[cfg(test)] pub(crate) mod test { use crate::{ - watcher::{Error, Event}, - WatchStreamExt, + reflector::ObjectRef, watcher::{Error, Event}, WatchStreamExt }; use std::{pin::pin, sync::Arc, task::Poll}; @@ -239,7 +238,7 @@ pub(crate) mod test { let foo = Arc::new(foo); let _bar = Arc::new(bar); - let (_, writer) = reflector::store_shared(10); + let (reader, writer) = reflector::store_shared(10); let mut subscriber = pin!(writer.subscribe().unwrap()); let mut other_subscriber = pin!(writer.subscribe().unwrap()); let mut reflect = pin!(st.reflect_shared(writer)); @@ -249,8 +248,11 @@ pub(crate) mod test { poll!(reflect.next()), Poll::Ready(Some(Ok(Event::Delete(_)))) )); + assert_eq!(reader.get(&ObjectRef::from_obj(&foo)), Some(foo.clone())); assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(foo.clone()))); + assert_eq!(reader.get(&ObjectRef::from_obj(&foo)), Some(foo.clone())); assert_eq!(poll!(other_subscriber.next()), Poll::Ready(Some(foo.clone()))); + assert_eq!(reader.get(&ObjectRef::from_obj(&foo)), None); assert!(matches!( poll!(reflect.next()),