Skip to content

Commit

Permalink
Merge pull request #25119 from teskje/mv-as-of
Browse files Browse the repository at this point in the history
adapter: use initial MV as-of during bootstrap
  • Loading branch information
teskje authored Feb 15, 2024
2 parents 640baaa + d474190 commit a6d529d
Show file tree
Hide file tree
Showing 15 changed files with 159 additions and 59 deletions.
1 change: 1 addition & 0 deletions src/adapter/src/catalog/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1858,6 +1858,7 @@ mod builtin_migration_tests {
non_null_assertions: vec![],
custom_logical_compaction_window: None,
refresh_schedule: None,
initial_as_of: None,
})
}
SimplifiedItem::Index { on } => {
Expand Down
7 changes: 7 additions & 0 deletions src/adapter/src/catalog/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use mz_adapter_types::compaction::CompactionWindow;
use mz_adapter_types::connection::ConnectionId;
use once_cell::sync::Lazy;
use serde::Serialize;
use timely::progress::Antichain;
use tokio::sync::mpsc;
use tracing::{info, warn};

Expand Down Expand Up @@ -986,6 +987,11 @@ impl CatalogState {
typ.column_types[i].nullable = false;
}
let desc = RelationDesc::new(typ, materialized_view.column_names);

let initial_as_of = materialized_view
.as_of
.map(|time| Antichain::from_elem(time.into()));

CatalogItem::MaterializedView(MaterializedView {
create_sql: materialized_view.create_sql,
raw_expr,
Expand All @@ -996,6 +1002,7 @@ impl CatalogState {
non_null_assertions: materialized_view.non_null_assertions,
custom_logical_compaction_window: materialized_view.compaction_window,
refresh_schedule: materialized_view.refresh_schedule,
initial_as_of,
})
}
Plan::CreateIndex(CreateIndexPlan { index, .. }) => CatalogItem::Index(Index {
Expand Down
10 changes: 6 additions & 4 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1930,10 +1930,12 @@ impl Coordinator {
Some((id, collection_desc))
}
CatalogItem::MaterializedView(mv) => {
let collection_desc = CollectionDescription::from_desc(
mv.desc.clone(),
DataSourceOther::Compute,
);
let collection_desc = CollectionDescription {
desc: mv.desc.clone(),
data_source: DataSource::Other(DataSourceOther::Compute),
since: mv.initial_as_of.clone(),
status_collection_id: None,
};
Some((id, collection_desc))
}
_ => None,
Expand Down
9 changes: 9 additions & 0 deletions src/adapter/src/coord/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -688,6 +688,14 @@ impl Coordinator {
}

Statement::CreateMaterializedView(mut cmvs) => {
// `CREATE MATERIALIZED VIEW ... AS OF ...` syntax is disallowed for users and is
// only used for storing initial frontiers in the catalog.
if cmvs.as_of.is_some() {
return ctx.retire(Err(AdapterError::Unsupported(
"CREATE MATERIALIZED VIEW ... AS OF statements",
)));
}

let mz_now = match self
.resolve_mz_now_for_create_materialized_view(
&cmvs,
Expand Down Expand Up @@ -719,6 +727,7 @@ impl Coordinator {
in_cluster: cmvs.in_cluster,
query: cmvs.query,
with_options: cmvs.with_options,
as_of: None,
});

// (Purifying CreateMaterializedView doesn't happen async, so no need to send
Expand Down
109 changes: 69 additions & 40 deletions src/adapter/src/coord/sequencer/inner/create_materialized_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,19 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use differential_dataflow::lattice::Lattice;
use maplit::btreemap;
use mz_adapter_types::compaction::CompactionWindow;
use mz_catalog::memory::objects::{CatalogItem, MaterializedView};
use mz_expr::refresh_schedule::RefreshSchedule;
use mz_expr::CollectionPlan;
use mz_ore::collections::CollectionExt;
use mz_ore::soft_panic_or_log;
use mz_repr::explain::{ExprHumanizerExt, TransientItem};
use mz_sql::catalog::CatalogError;
use mz_sql::names::{ObjectId, ResolvedIds};
use mz_sql::plan;
use mz_sql_parser::ast;
use mz_sql_parser::ast::display::AstDisplay;
use mz_storage_client::controller::{CollectionDescription, DataSource, DataSourceOther};
use timely::progress::Antichain;
use tracing::{instrument, Span};
Expand All @@ -34,7 +37,7 @@ use crate::optimize::dataflows::dataflow_import_id_bundle;
use crate::optimize::{self, Optimize, OverrideFrom};
use crate::session::Session;
use crate::util::ResultExt;
use crate::{catalog, AdapterNotice, ExecuteContext, TimestampProvider};
use crate::{catalog, AdapterNotice, CollectionIdBundle, ExecuteContext, TimestampProvider};

#[async_trait::async_trait(?Send)]
impl Staged for CreateMaterializedViewStage {
Expand Down Expand Up @@ -394,7 +397,7 @@ impl Coordinator {
name,
materialized_view:
plan::MaterializedView {
create_sql,
mut create_sql,
expr: raw_expr,
cluster_id,
non_null_assertions,
Expand All @@ -413,6 +416,24 @@ impl Coordinator {
..
}: CreateMaterializedViewFinish,
) -> Result<StageResult<Box<CreateMaterializedViewStage>>, AdapterError> {
// Timestamp selection
let id_bundle = dataflow_import_id_bundle(global_lir_plan.df_desc(), cluster_id);
let (as_of, until) = self.select_timestamps(id_bundle, refresh_schedule.as_ref())?;

// Update the `create_sql` with the selected `as_of`. This is how we make sure the `as_of`
// is persisted to the catalog and can be relied on during bootstrapping.
if let Some(as_of_ts) = as_of.as_option() {
let stmt = mz_sql::parse::parse(&create_sql)
.expect("create_sql is valid")
.into_element()
.ast;
let ast::Statement::CreateMaterializedView(mut stmt) = stmt else {
panic!("unexpected statement type");
};
stmt.as_of = Some(as_of_ts.into());
create_sql = stmt.to_ast_string_stable();
}

let ops = itertools::chain(
drop_ids
.into_iter()
Expand All @@ -430,45 +451,14 @@ impl Coordinator {
cluster_id,
non_null_assertions,
custom_logical_compaction_window: compaction_window,
refresh_schedule: refresh_schedule.clone(),
refresh_schedule,
initial_as_of: Some(as_of.clone()),
}),
owner_id: *session.current_role_id(),
}),
)
.collect::<Vec<_>>();

// Timestamp selection
let as_of = {
// Normally, `as_of` should be the least_valid_read.
let id_bundle = dataflow_import_id_bundle(global_lir_plan.df_desc(), cluster_id);
let mut as_of = self.least_valid_read(&id_bundle);
// But for MVs with non-trivial REFRESH schedules, it's important to set the
// `as_of` to the first refresh. This is because we'd like queries on the MV to
// block until the first refresh (rather than to show an empty MV).
if let Some(refresh_schedule) = &refresh_schedule {
if let Some(as_of_ts) = as_of.as_option() {
let Some(rounded_up_ts) = refresh_schedule.round_up_timestamp(*as_of_ts) else {
return Err(AdapterError::MaterializedViewWouldNeverRefresh(
refresh_schedule.last_refresh().expect("if round_up_timestamp returned None, then there should be a last refresh"),
*as_of_ts
));
};
as_of = Antichain::from_elem(rounded_up_ts);
} else {
// The `as_of` should never be empty, because then the MV would be unreadable.
soft_panic_or_log!("creating a materialized view with an empty `as_of`");
}
}
as_of
};

// If we have a refresh schedule that has a last refresh, then set the `until` to the last refresh.
// (If the `try_step_forward` fails, then no need to set an `until`, because it's not possible to get any data
// beyond that last refresh time, because there are no times beyond that time.)
let until = refresh_schedule
.and_then(|s| s.last_refresh())
.and_then(|r| r.try_step_forward());

// Pre-allocate a vector of transient GlobalIds for each notice.
let notice_ids = std::iter::repeat_with(|| self.allocate_transient_id())
.take(global_lir_plan.df_meta().optimizer_notices.len())
Expand All @@ -488,10 +478,7 @@ impl Coordinator {
let (mut df_desc, df_meta) = global_lir_plan.unapply();

df_desc.set_as_of(as_of.clone());

if let Some(until) = until {
df_desc.until.meet_assign(&Antichain::from_elem(until));
}
df_desc.until = until;

// Emit notices.
coord.emit_optimizer_notices(session, &df_meta.optimizer_notices);
Expand Down Expand Up @@ -575,6 +562,48 @@ impl Coordinator {
.map(StageResult::Response)
}

/// Select the initial `as_of` and `until` frontiers for a materialized view.
fn select_timestamps(
&self,
id_bundle: CollectionIdBundle,
refresh_schedule: Option<&RefreshSchedule>,
) -> Result<(Antichain<mz_repr::Timestamp>, Antichain<mz_repr::Timestamp>), AdapterError> {
// Normally, `as_of` should be the least_valid_read.
let mut as_of = self.least_valid_read(&id_bundle);

// But for MVs with non-trivial REFRESH schedules, it's important to set the `as_of` to the
// first refresh. This is because we'd like queries on the MV to block until the first
// refresh (rather than to show an empty MV).
if let Some(refresh_schedule) = &refresh_schedule {
if let Some(as_of_ts) = as_of.as_option() {
if let Some(rounded_up_ts) = refresh_schedule.round_up_timestamp(*as_of_ts) {
as_of = Antichain::from_elem(rounded_up_ts);
} else {
let last_refresh = refresh_schedule.last_refresh().expect(
"if round_up_timestamp returned None, then there should be a last refresh",
);
return Err(AdapterError::MaterializedViewWouldNeverRefresh(
last_refresh,
*as_of_ts,
));
}
} else {
// The `as_of` should never be empty, because then the MV would be unreadable.
soft_panic_or_log!("creating a materialized view with an empty `as_of`");
}
}

// If we have a refresh schedule that has a last refresh, then set the `until` to the last refresh.
// (If the `try_step_forward` fails, then no need to set an `until`, because it's not possible to get any data
// beyond that last refresh time, because there are no times beyond that time.)
let until_ts = refresh_schedule
.and_then(|s| s.last_refresh())
.and_then(|r| r.try_step_forward());
let until = Antichain::from_iter(until_ts);

Ok((as_of, until))
}

#[instrument(skip_all)]
fn create_materialized_view_explain(
&mut self,
Expand Down
2 changes: 2 additions & 0 deletions src/catalog/src/memory/objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use mz_adapter_types::connection::ConnectionId;
use once_cell::sync::Lazy;
use serde::ser::SerializeSeq;
use serde::{Deserialize, Serialize};
use timely::progress::Antichain;

use mz_compute_client::logging::LogVariant;
use mz_controller::clusters::{
Expand Down Expand Up @@ -690,6 +691,7 @@ pub struct MaterializedView {
pub non_null_assertions: Vec<usize>,
pub custom_logical_compaction_window: Option<CompactionWindow>,
pub refresh_schedule: Option<RefreshSchedule>,
pub initial_as_of: Option<Antichain<mz_repr::Timestamp>>,
}

#[derive(Debug, Clone, Serialize)]
Expand Down
6 changes: 6 additions & 0 deletions src/sql-parser/src/ast/defs/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1272,6 +1272,7 @@ pub struct CreateMaterializedViewStatement<T: AstInfo> {
pub columns: Vec<Ident>,
pub in_cluster: Option<T::ClusterName>,
pub query: Query<T>,
pub as_of: Option<u64>,
pub with_options: Vec<MaterializedViewOption<T>>,
}

Expand Down Expand Up @@ -1310,6 +1311,11 @@ impl<T: AstInfo> AstDisplay for CreateMaterializedViewStatement<T> {

f.write_str(" AS ");
f.write_node(&self.query);

if let Some(time) = &self.as_of {
f.write_str(" AS OF ");
f.write_str(time);
}
}
}
impl_display_t!(CreateMaterializedViewStatement);
Expand Down
26 changes: 26 additions & 0 deletions src/sql-parser/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3255,6 +3255,7 @@ impl<'a> Parser<'a> {

self.expect_keyword(AS)?;
let query = self.parse_query()?;
let as_of = self.parse_optional_internal_as_of()?;

Ok(Statement::CreateMaterializedView(
CreateMaterializedViewStatement {
Expand All @@ -3263,6 +3264,7 @@ impl<'a> Parser<'a> {
columns,
in_cluster,
query,
as_of,
with_options,
},
))
Expand Down Expand Up @@ -6999,6 +7001,30 @@ impl<'a> Parser<'a> {
}
}

/// Parse `AS OF`, if present.
///
/// In contrast to `parse_optional_as_of`, this parser only supports `AS OF <time>` syntax and
/// directly returns an `u64`. It is only meant to be used for internal SQL syntax.
fn parse_optional_internal_as_of(&mut self) -> Result<Option<u64>, ParserError> {
fn try_parse_u64(parser: &mut Parser) -> Option<u64> {
let value = parser.parse_value().ok()?;
let Value::Number(s) = value else { return None };
s.parse().ok()
}

if self.parse_keywords(&[AS, OF]) {
match try_parse_u64(self) {
Some(time) => Ok(Some(time)),
None => {
self.prev_token();
self.expected(self.peek_pos(), "`u64` literal", self.peek_token())
}
}
} else {
Ok(None)
}
}

/// Parse a comma-delimited list of projections after SELECT
fn parse_select_item(&mut self) -> Result<SelectItem<Raw>, ParserError> {
if self.consume_token(&Token::Star) {
Expand Down
Loading

0 comments on commit a6d529d

Please sign in to comment.