From f4b55d40ea34835339a3e0848733279b6515dda5 Mon Sep 17 00:00:00 2001 From: Joseph Koshakow Date: Wed, 15 Nov 2023 13:03:24 -0500 Subject: [PATCH] catalog: Fix shadow timestamp discrepancies (#23219) The shadow catalog is a catalog implementation used for testing. Internally it has a stash and persist catalog implementation. On every operation it compares the results of each implementation and panics if there's a difference. The Coordinator will update the timestamps of every timeline on a set interval, currently set to 10 seconds. Timestamps are currently stored in the catalog. If the Coordinator is shut down during this interval, then it's possible that only one of the catalog implementations updated the timestamps, while the other didn't. This makes it impossible to correctly test scenarios involving restarts with the shadow catalog. This commit fixes this issue, by removing any discrepancies in the timestamps across the stash and persist when opening the catalog. The shadow catalog will treat the larger of the two timestamps as correct and update accordingly. --- src/catalog/src/durable/impls/shadow.rs | 58 +++++++++++++++++++++++-- 1 file changed, 55 insertions(+), 3 deletions(-) diff --git a/src/catalog/src/durable/impls/shadow.rs b/src/catalog/src/durable/impls/shadow.rs index 5af70cdd0a3f7..8bedeb2fed14e 100644 --- a/src/catalog/src/durable/impls/shadow.rs +++ b/src/catalog/src/durable/impls/shadow.rs @@ -7,6 +7,8 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::cmp::Ordering; +use std::collections::BTreeMap; use std::fmt::Debug; use std::time::Duration; @@ -88,7 +90,7 @@ where ); let stash = stash?; let persist = persist?; - Ok(Box::new(ShadowCatalogState { stash, persist })) + Ok(Box::new(ShadowCatalogState::new(stash, persist).await?)) } async fn open_read_only( @@ -106,7 +108,7 @@ where ); let stash = stash?; let persist = persist?; - Ok(Box::new(ShadowCatalogState { stash, persist })) + Ok(Box::new(ShadowCatalogState::new_read_only(stash, persist))) } async fn open( @@ -129,7 +131,7 @@ where ); let stash = stash?; let persist = persist?; - Ok(Box::new(ShadowCatalogState { stash, persist })) + Ok(Box::new(ShadowCatalogState::new(stash, persist).await?)) } async fn open_debug(mut self: Box) -> Result { @@ -163,6 +165,56 @@ pub struct ShadowCatalogState { pub persist: Box, } +impl ShadowCatalogState { + async fn new( + stash: Box, + persist: Box, + ) -> Result { + let mut state = ShadowCatalogState { stash, persist }; + state.fix_timestamps().await?; + Ok(state) + } + + fn new_read_only( + stash: Box, + persist: Box, + ) -> ShadowCatalogState { + ShadowCatalogState { stash, persist } + } + + /// The Coordinator will update the timestamps of every timeline continuously on an interval. + /// If we shut down the Coordinator while it's updating the timestamps, then it's possible that + /// only one catalog implementation is updated, while the other is not. This will leave the two + /// catalogs in an inconsistent state. Since this implementation is just used for tests, and + /// that specific inconsistency is expected, we fix it during open. + async fn fix_timestamps(&mut self) -> Result<(), CatalogError> { + let mut stash_timelines: BTreeMap<_, _> = self + .stash + .get_timestamps() + .await? + .into_iter() + .map(|timeline_timestamp| (timeline_timestamp.timeline, timeline_timestamp.ts)) + .collect(); + + for TimelineTimestamp { timeline, ts } in self.persist.get_timestamps().await? { + match stash_timelines.remove(&timeline) { + Some(stash_ts) => match stash_ts.cmp(&ts) { + Ordering::Less => self.stash.set_timestamp(&timeline, ts).await?, + Ordering::Greater => self.persist.set_timestamp(&timeline, ts).await?, + Ordering::Equal => {} + }, + None => self.stash.set_timestamp(&timeline, ts).await?, + } + } + + for (timeline, ts) in stash_timelines { + self.persist.set_timestamp(&timeline, ts).await?; + } + + Ok(()) + } +} + #[async_trait] impl ReadOnlyDurableCatalogState for ShadowCatalogState { fn epoch(&mut self) -> Epoch {