diff --git a/src/catalog/src/durable/impls/shadow.rs b/src/catalog/src/durable/impls/shadow.rs index 5af70cdd0a3f7..8bedeb2fed14e 100644 --- a/src/catalog/src/durable/impls/shadow.rs +++ b/src/catalog/src/durable/impls/shadow.rs @@ -7,6 +7,8 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::cmp::Ordering; +use std::collections::BTreeMap; use std::fmt::Debug; use std::time::Duration; @@ -88,7 +90,7 @@ where ); let stash = stash?; let persist = persist?; - Ok(Box::new(ShadowCatalogState { stash, persist })) + Ok(Box::new(ShadowCatalogState::new(stash, persist).await?)) } async fn open_read_only( @@ -106,7 +108,7 @@ where ); let stash = stash?; let persist = persist?; - Ok(Box::new(ShadowCatalogState { stash, persist })) + Ok(Box::new(ShadowCatalogState::new_read_only(stash, persist))) } async fn open( @@ -129,7 +131,7 @@ where ); let stash = stash?; let persist = persist?; - Ok(Box::new(ShadowCatalogState { stash, persist })) + Ok(Box::new(ShadowCatalogState::new(stash, persist).await?)) } async fn open_debug(mut self: Box) -> Result { @@ -163,6 +165,56 @@ pub struct ShadowCatalogState { pub persist: Box, } +impl ShadowCatalogState { + async fn new( + stash: Box, + persist: Box, + ) -> Result { + let mut state = ShadowCatalogState { stash, persist }; + state.fix_timestamps().await?; + Ok(state) + } + + fn new_read_only( + stash: Box, + persist: Box, + ) -> ShadowCatalogState { + ShadowCatalogState { stash, persist } + } + + /// The Coordinator will update the timestamps of every timeline continuously on an interval. + /// If we shut down the Coordinator while it's updating the timestamps, then it's possible that + /// only one catalog implementation is updated, while the other is not. This will leave the two + /// catalogs in an inconsistent state. Since this implementation is just used for tests, and + /// that specific inconsistency is expected, we fix it during open. + async fn fix_timestamps(&mut self) -> Result<(), CatalogError> { + let mut stash_timelines: BTreeMap<_, _> = self + .stash + .get_timestamps() + .await? + .into_iter() + .map(|timeline_timestamp| (timeline_timestamp.timeline, timeline_timestamp.ts)) + .collect(); + + for TimelineTimestamp { timeline, ts } in self.persist.get_timestamps().await? { + match stash_timelines.remove(&timeline) { + Some(stash_ts) => match stash_ts.cmp(&ts) { + Ordering::Less => self.stash.set_timestamp(&timeline, ts).await?, + Ordering::Greater => self.persist.set_timestamp(&timeline, ts).await?, + Ordering::Equal => {} + }, + None => self.stash.set_timestamp(&timeline, ts).await?, + } + } + + for (timeline, ts) in stash_timelines { + self.persist.set_timestamp(&timeline, ts).await?; + } + + Ok(()) + } +} + #[async_trait] impl ReadOnlyDurableCatalogState for ShadowCatalogState { fn epoch(&mut self) -> Epoch {