diff --git a/kube-runtime/src/reflector/dispatcher.rs b/kube-runtime/src/reflector/dispatcher.rs index 1060dab2b..525d01ff6 100644 --- a/kube-runtime/src/reflector/dispatcher.rs +++ b/kube-runtime/src/reflector/dispatcher.rs @@ -70,6 +70,11 @@ where pub(crate) fn subscribe(&self, reader: Store) -> ReflectHandle { ReflectHandle::new(reader, self.dispatch_tx.new_receiver()) } + + // Return a number of active subscribers to this shared sender. + pub(crate) fn subscribers(&self) -> usize { + self.dispatch_tx.receiver_count() + } } /// A handle to a shared stream reader @@ -132,10 +137,12 @@ where fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut this = self.project(); match ready!(this.rx.as_mut().poll_next(cx)) { - Some(obj_ref) => this - .reader - .get(&obj_ref) - .map_or(Poll::Pending, |obj| Poll::Ready(Some(obj))), + Some(obj_ref) => if obj_ref.extra.remaining_lookups.is_some() { + this.reader.remove(&obj_ref) + } else { + this.reader.get(&obj_ref) + } + .map_or(Poll::Pending, |obj| Poll::Ready(Some(obj))), None => Poll::Ready(None), } } @@ -145,8 +152,7 @@ where #[cfg(test)] pub(crate) mod test { use crate::{ - watcher::{Error, Event}, - WatchStreamExt, + reflector::ObjectRef, watcher::{Error, Event}, WatchStreamExt }; use std::{pin::pin, sync::Arc, task::Poll}; @@ -232,8 +238,9 @@ pub(crate) mod test { let foo = Arc::new(foo); let _bar = Arc::new(bar); - let (_, writer) = reflector::store_shared(10); + let (reader, writer) = reflector::store_shared(10); let mut subscriber = pin!(writer.subscribe().unwrap()); + let mut other_subscriber = pin!(writer.subscribe().unwrap()); let mut reflect = pin!(st.reflect_shared(writer)); // Deleted events should be skipped by subscriber. @@ -241,7 +248,11 @@ pub(crate) mod test { poll!(reflect.next()), Poll::Ready(Some(Ok(Event::Delete(_)))) )); - assert_eq!(poll!(subscriber.next()), Poll::Pending); + assert_eq!(reader.get(&ObjectRef::from_obj(&foo)), Some(foo.clone())); + assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(foo.clone()))); + assert_eq!(reader.get(&ObjectRef::from_obj(&foo)), Some(foo.clone())); + assert_eq!(poll!(other_subscriber.next()), Poll::Ready(Some(foo.clone()))); + assert_eq!(reader.get(&ObjectRef::from_obj(&foo)), None); assert!(matches!( poll!(reflect.next()), diff --git a/kube-runtime/src/reflector/object_ref.rs b/kube-runtime/src/reflector/object_ref.rs index 9cfc4e028..7829f5494 100644 --- a/kube-runtime/src/reflector/object_ref.rs +++ b/kube-runtime/src/reflector/object_ref.rs @@ -57,6 +57,7 @@ pub trait Lookup { extra: Extra { resource_version: self.resource_version().map(Cow::into_owned), uid: self.uid().map(Cow::into_owned), + remaining_lookups: None, }, } } @@ -156,6 +157,8 @@ pub struct Extra { pub resource_version: Option, /// The uid of the object pub uid: Option, + /// Number of remaining cache lookups on this reference + pub remaining_lookups: Option, } impl ObjectRef @@ -225,6 +228,7 @@ impl ObjectRef { extra: Extra { resource_version: None, uid: Some(owner.uid.clone()), + remaining_lookups: None, }, }) } else { @@ -268,10 +272,12 @@ impl From> for ObjectReference { dyntype: dt, name, namespace, - extra: Extra { - resource_version, - uid, - }, + extra: + Extra { + resource_version, + uid, + .. + }, } = val; ObjectReference { api_version: Some(K::api_version(&dt).into_owned()), @@ -351,6 +357,7 @@ mod tests { extra: Extra { resource_version: Some("123".to_string()), uid: Some("638ffacd-f666-4402-ba10-7848c66ef576".to_string()), + remaining_lookups: None, }, ..minimal.clone() }; diff --git a/kube-runtime/src/reflector/store.rs b/kube-runtime/src/reflector/store.rs index d6d264dea..a99b7fce5 100644 --- a/kube-runtime/src/reflector/store.rs +++ b/kube-runtime/src/reflector/store.rs @@ -107,8 +107,14 @@ where self.store.write().insert(key, obj); } watcher::Event::Delete(obj) => { - let key = obj.to_object_ref(self.dyntype.clone()); - self.store.write().remove(&key); + let mut key = obj.to_object_ref(self.dyntype.clone()); + let mut store = self.store.write(); + store.remove(&key); + if self.dispatcher.is_some() { + // Re-insert the entry with updated key, as insert on its own doesnt modify the key + key.extra.remaining_lookups = self.dispatcher.as_ref().map(Dispatcher::subscribers); + store.insert(key, Arc::new(obj.clone())); + } } watcher::Event::Init => { self.buffer = AHashMap::new(); @@ -159,6 +165,12 @@ where } } + watcher::Event::Delete(obj) => { + let mut obj_ref = obj.to_object_ref(self.dyntype.clone()); + obj_ref.extra.remaining_lookups = Some(dispatcher.subscribers()); + dispatcher.broadcast(obj_ref).await; + } + _ => {} } } @@ -236,6 +248,22 @@ where .cloned() } + #[must_use] + pub fn remove(&self, key: &ObjectRef) -> Option> { + let mut store = self.store.write(); + store.remove_entry(key).map(|(mut key, obj)| { + match key.extra.remaining_lookups { + Some(..=1) | None => (), + Some(lookups) => { + key.extra.remaining_lookups = Some(lookups - 1); + store.insert(key, obj.clone()); + } + }; + + obj + }) + } + /// Return a full snapshot of the current values #[must_use] pub fn state(&self) -> Vec> {