Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: Always initialize ingestion statistics #30975

Closed
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
storage: Simplify dataflow as_of computation
Previously, async storage workers would calculate the as_of of a
dataflow from the since of the dataflow's output's remap shards. If
there was more than one distinct remap shard among the outputs, then
the storage worker would panic. It's expected that the only collection
that will ever have a remap shard is the ingestion collection itself.

Furthermore, we are planning to remove the ingestion collection from
the outputs of the dataflow (in fact there's already a feature flag
that does this). If the ingestion is removed from the outputs, then no
output will have a remap shard, and the as_of will always be empty.

This commit simplifies the existing as_of calculation and fixes the
as_of calculation when the ingestion collection is removed from the
outputs. It does this by calculating the as_of directly from the
ingestion's remap shard. Additionally, it asserts that if any of the
outputs have a remap shard, then it must be equal to the ingestion's
remap shard.

Works towards resolving #MaterializeInc/database-issues/issues/8620
jkosh44 committed Jan 9, 2025
commit dff8199644b8cc0ea653fa28c980b1c67e35a85a
110 changes: 52 additions & 58 deletions src/storage/src/storage_state/async_storage_worker.rs
Original file line number Diff line number Diff line change
@@ -217,9 +217,55 @@ impl<T: Timestamp + Lattice + Codec64 + Display + Sync> AsyncStorageWorker<T> {
// arbitrarily hold back collections to perform historical queries and when
// the storage command protocol is updated such that these calculations are
// performed by the controller and not here.
let mut as_of = Antichain::new();
let mut resume_uppers = BTreeMap::new();
let mut seen_remap_shard = None;

// TODO(petrosagg): The as_of of the ingestion should normally be based
// on the since frontiers of its outputs. Even though the storage
// controller makes sure to make downgrade decisions in an organized
// and ordered fashion, it then proceeds to persist them in an
// asynchronous and disorganized fashion to persist. The net effect is
// that upon restart, or upon observing the persist state like this
// function, one can see non-sensical results like the since of A be in
// advance of B even when B depends on A! This can happen because the
// downgrade of B gets reordered and lost. Here is our best attempt at
// playing detective of what the controller meant to do by blindly
// assuming that the since of the remap shard is a suitable since
// frontier without consulting the since frontier of the outputs. One
// day we will enforce order to chaos and this comment will be deleted.
let remap_shard = ingestion_description
.ingestion_metadata
.remap_shard
.expect("ingestions must have a remap shard");
let client = persist_clients
.open(
ingestion_description
.ingestion_metadata
.persist_location
.clone(),
)
.await
.expect("error creating persist client");
let read_handle = client
.open_leased_reader::<SourceData, (), T, Diff>(
remap_shard,
Arc::new(ingestion_description.desc.connection.timestamp_desc()),
Arc::new(UnitSchema),
Diagnostics {
shard_name: ingestion_description
.remap_collection_id
.to_string(),
handle_purpose: format!("resumption data for {}", id),
},
false,
)
.await
.unwrap();
let as_of = read_handle.since().clone();
mz_ore::task::spawn(move || "deferred_expire", async move {
tokio::time::sleep(std::time::Duration::from_secs(300)).await;
read_handle.expire().await;
});
let seen_remap_shard = remap_shard.clone();

for (id, export) in ingestion_description.source_exports.iter() {
// Explicit destructuring to force a compile error when the metadata change
@@ -263,63 +309,11 @@ impl<T: Timestamp + Lattice + Codec64 + Display + Sync> AsyncStorageWorker<T> {
resume_uppers.insert(*id, upper);
write_handle.expire().await;

// TODO(petrosagg): The as_of of the ingestion should normally be based
// on the since frontiers of its outputs. Even though the storage
// controller makes sure to make downgrade decisions in an organized
// and ordered fashion, it then proceeds to persist them in an
// asynchronous and disorganized fashion to persist. The net effect is
// that upon restart, or upon observing the persist state like this
// function, one can see non-sensical results like the since of A be in
// advance of B even when B depends on A! This can happen because the
// downgrade of B gets reordered and lost. Here is our best attempt at
// playing detective of what the controller meant to do by blindly
// assuming that the since of the remap shard is a suitable since
// frontier without consulting the since frontier of the outputs. One
// day we will enforce order to chaos and this comment will be deleted.
if let Some(remap_shard) = remap_shard {
match seen_remap_shard.as_ref() {
None => {
let read_handle = client
.open_leased_reader::<SourceData, (), T, Diff>(
*remap_shard,
Arc::new(
ingestion_description
.desc
.connection
.timestamp_desc(),
),
Arc::new(UnitSchema),
Diagnostics {
shard_name: ingestion_description
.remap_collection_id
.to_string(),
handle_purpose: format!(
"resumption data for {}",
id
),
},
false,
)
.await
.unwrap();
as_of.clone_from(read_handle.since());
mz_ore::task::spawn(
move || "deferred_expire",
async move {
tokio::time::sleep(std::time::Duration::from_secs(
300,
))
.await;
read_handle.expire().await;
},
);
seen_remap_shard = Some(remap_shard.clone());
}
Some(shard) => assert_eq!(
shard, remap_shard,
"ingestion with multiple remap shards"
),
}
assert_eq!(
seen_remap_shard, *remap_shard,
"ingestion with multiple remap shards"
);
}
}