Skip to content

Commit

Permalink
Merge pull request #25231 from ggevay/refresh_mz_now_least_valid_read
Browse files Browse the repository at this point in the history
Advance REFRESH `mz_now` resolution to `least_valid_read`
  • Loading branch information
ggevay authored Feb 14, 2024
2 parents 78446a6 + 7689cca commit 42f2894
Showing 1 changed file with 38 additions and 36 deletions.
74 changes: 38 additions & 36 deletions src/adapter/src/coord/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
//! Logic for processing client [`Command`]s. Each [`Command`] is initiated by a
//! client via some external Materialize API (ex: HTTP and psql).
use differential_dataflow::lattice::Lattice;
use std::collections::{BTreeMap, BTreeSet};
use std::sync::Arc;

Expand Down Expand Up @@ -47,7 +48,7 @@ use mz_sql_parser::ast::{
use mz_storage_types::sources::Timeline;
use opentelemetry::trace::TraceContextExt;
use tokio::sync::{mpsc, oneshot, watch};
use tracing::{debug_span, instrument, Instrument};
use tracing::{debug_span, instrument, warn, Instrument};
use tracing_opentelemetry::OpenTelemetrySpanExt;

use crate::command::{
Expand All @@ -63,7 +64,7 @@ use crate::util::{ClientTransmitter, ResultExt};
use crate::webhook::{
AppendWebhookResponse, AppendWebhookValidator, WebhookAppender, WebhookAppenderInvalidator,
};
use crate::{catalog, metrics, AppendWebhookError, ExecuteContext};
use crate::{catalog, metrics, AppendWebhookError, ExecuteContext, TimestampProvider};

use super::ExecuteContextExtra;

Expand Down Expand Up @@ -697,7 +698,8 @@ impl Coordinator {
.resolve_mz_now_for_create_materialized_view(
&cmvs,
&resolved_ids,
Some(ctx.session_mut()),
ctx.session_mut(),
true,
)
.await
{
Expand Down Expand Up @@ -738,7 +740,12 @@ impl Coordinator {
}) => {
let mut cmvs = *box_cmvs;
let mz_now = match self
.resolve_mz_now_for_create_materialized_view(&cmvs, &resolved_ids, None)
.resolve_mz_now_for_create_materialized_view(
&cmvs,
&resolved_ids,
ctx.session_mut(),
false,
)
.await
{
Ok(mz_now) => mz_now,
Expand Down Expand Up @@ -784,7 +791,8 @@ impl Coordinator {
&mut self,
cmvs: &CreateMaterializedViewStatement<Aug>,
resolved_ids: &ResolvedIds,
acquire_read_holds_for: Option<&mut Session>,
session: &mut Session,
acquire_read_holds: bool,
) -> Result<Option<Timestamp>, AdapterError> {
// (This won't be the same timestamp as the system table inserts, unfortunately.)
if cmvs
Expand All @@ -804,40 +812,34 @@ impl Coordinator {
let timeline = timeline_context
.timeline()
.unwrap_or(&Timeline::EpochMilliseconds);
let timestamp = self.get_timestamp_oracle(timeline).read_ts().await;
// TODO: It might be good to take into account `least_valid_read` in addition to
// the oracle's `read_ts`, but there are two problems:
// 1. At this point, we don't know which indexes would be used. We could do an
// overestimation here by grabbing the ids of all indexes that are on ids
// involved in the query (`sufficient_collections_all_clusters`).
// 2. For a peek, when the `least_valid_read` is later than the oracle's
// `read_ts`, then the peek doesn't return before it completes at the chosen
// timestamp. However, for a CRATE MATERIALIZED VIEW statement, it's not clear
// whether we want to make it block until the chosen time. If it doesn't block,
// then a REFRESH AT CREATION wouldn't be linearized with the CREATE MATERIALIZED
// VIEW statement, in the sense that a query from the MV after its creation might
// see input changes that happened after the CRATE MATERIALIZED VIEW statement
// returned.
//
// Note: The Adapter is usually keeping a read hold of all objects at the oracle
// read timestamp, so `least_valid_read` usually won't actually be later than
// the oracle's `read_ts`. (see `Coordinator::advance_timelines`)
//
// Note 2: If we choose a timestamp here that is earlier than
// `least_valid_read`, that is somewhat bad, but not catastrophic: The only
// bad thing that happens is that we won't perform that refresh that was
// specified to be at `mz_now()` (which is usually the initial refresh)
// (similarly to how we don't perform refreshes that were specified to be in the
// past).

if let Some(session) = acquire_read_holds_for {
let catalog = self.catalog().for_session(session);
let cluster = mz_sql::plan::resolve_cluster_for_materialized_view(&catalog, cmvs)?;
// Let's start with the timestamp oracle read timestamp.
let mut timestamp = self.get_timestamp_oracle(timeline).read_ts().await;

let ids = self
.index_oracle(cluster)
.sufficient_collections(resolved_ids.0.iter());
// If `least_valid_read` is later than the oracle, then advance to that time.
// If we didn't do this, then there would be a danger of missing the first refresh,
// which might cause the materialized view to be unreadable for hours. This might
// be what was happening here:
// https://github.com/MaterializeInc/materialize/issues/24288#issuecomment-1931856361
//
// In the long term, it would be good to actually block the MV creation statement
// until `least_valid_read`. https://github.com/MaterializeInc/materialize/issues/25127
// Without blocking, we have the problem that a REFRESH AT CREATION is not linearized
// with the CREATE MATERIALIZED VIEW statement, in the sense that a query from the MV
// after its creation might see input changes that happened after the CRATE MATERIALIZED
// VIEW statement returned.
let catalog = self.catalog().for_session(session);
let cluster = mz_sql::plan::resolve_cluster_for_materialized_view(&catalog, cmvs)?;
let ids = self
.index_oracle(cluster)
.sufficient_collections(resolved_ids.0.iter());
let oracle_timestamp = timestamp;
timestamp.advance_by(self.least_valid_read(&ids).borrow());
if oracle_timestamp != timestamp {
warn!(%cmvs.name, %oracle_timestamp, %timestamp, "REFRESH MV's inputs are not readable at the oracle read ts");
}

if acquire_read_holds {
self.acquire_read_holds_auto_cleanup(session, timestamp, &ids);
}

Expand Down

0 comments on commit 42f2894

Please sign in to comment.