Skip to content

Commit

Permalink
catalog: Fix shadow timestamp discrepancies (#23219)
Browse files Browse the repository at this point in the history
The shadow catalog is a catalog implementation used for testing.
Internally it has a stash and persist catalog implementation. On every
operation it compares the results of each implementation and panics if
there's a difference.

The Coordinator will update the timestamps of every timeline on a set
interval, currently set to 10 seconds. Timestamps are currently stored
in the catalog. If the Coordinator is shut down during this interval,
then it's possible that only one of the catalog implementations updated
the timestamps, while the other didn't. This makes it impossible to
correctly test scenarios involving restarts with the shadow catalog.

This commit fixes this issue, by removing any discrepancies in the
timestamps across the stash and persist when opening the catalog. The
shadow catalog will treat the larger of the two timestamps as correct
and update accordingly.
  • Loading branch information
jkosh44 authored Nov 15, 2023
1 parent cac0d3f commit f4b55d4
Showing 1 changed file with 55 additions and 3 deletions.
58 changes: 55 additions & 3 deletions src/catalog/src/durable/impls/shadow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::cmp::Ordering;
use std::collections::BTreeMap;
use std::fmt::Debug;
use std::time::Duration;

Expand Down Expand Up @@ -88,7 +90,7 @@ where
);
let stash = stash?;
let persist = persist?;
Ok(Box::new(ShadowCatalogState { stash, persist }))
Ok(Box::new(ShadowCatalogState::new(stash, persist).await?))
}

async fn open_read_only(
Expand All @@ -106,7 +108,7 @@ where
);
let stash = stash?;
let persist = persist?;
Ok(Box::new(ShadowCatalogState { stash, persist }))
Ok(Box::new(ShadowCatalogState::new_read_only(stash, persist)))
}

async fn open(
Expand All @@ -129,7 +131,7 @@ where
);
let stash = stash?;
let persist = persist?;
Ok(Box::new(ShadowCatalogState { stash, persist }))
Ok(Box::new(ShadowCatalogState::new(stash, persist).await?))
}

async fn open_debug(mut self: Box<Self>) -> Result<DebugCatalogState, CatalogError> {
Expand Down Expand Up @@ -163,6 +165,56 @@ pub struct ShadowCatalogState {
pub persist: Box<dyn DurableCatalogState>,
}

impl ShadowCatalogState {
async fn new(
stash: Box<dyn DurableCatalogState>,
persist: Box<dyn DurableCatalogState>,
) -> Result<ShadowCatalogState, CatalogError> {
let mut state = ShadowCatalogState { stash, persist };
state.fix_timestamps().await?;
Ok(state)
}

fn new_read_only(
stash: Box<dyn DurableCatalogState>,
persist: Box<dyn DurableCatalogState>,
) -> ShadowCatalogState {
ShadowCatalogState { stash, persist }
}

/// The Coordinator will update the timestamps of every timeline continuously on an interval.
/// If we shut down the Coordinator while it's updating the timestamps, then it's possible that
/// only one catalog implementation is updated, while the other is not. This will leave the two
/// catalogs in an inconsistent state. Since this implementation is just used for tests, and
/// that specific inconsistency is expected, we fix it during open.
async fn fix_timestamps(&mut self) -> Result<(), CatalogError> {
let mut stash_timelines: BTreeMap<_, _> = self
.stash
.get_timestamps()
.await?
.into_iter()
.map(|timeline_timestamp| (timeline_timestamp.timeline, timeline_timestamp.ts))
.collect();

for TimelineTimestamp { timeline, ts } in self.persist.get_timestamps().await? {
match stash_timelines.remove(&timeline) {
Some(stash_ts) => match stash_ts.cmp(&ts) {
Ordering::Less => self.stash.set_timestamp(&timeline, ts).await?,
Ordering::Greater => self.persist.set_timestamp(&timeline, ts).await?,
Ordering::Equal => {}
},
None => self.stash.set_timestamp(&timeline, ts).await?,
}
}

for (timeline, ts) in stash_timelines {
self.persist.set_timestamp(&timeline, ts).await?;
}

Ok(())
}
}

#[async_trait]
impl ReadOnlyDurableCatalogState for ShadowCatalogState {
fn epoch(&mut self) -> Epoch {
Expand Down

0 comments on commit f4b55d4

Please sign in to comment.