diff --git a/src/adapter/src/coord.rs b/src/adapter/src/coord.rs index 809c5fff2cb2c..a994b52ee2706 100644 --- a/src/adapter/src/coord.rs +++ b/src/adapter/src/coord.rs @@ -365,9 +365,7 @@ pub enum RealTimeRecencyContext { timeline_context: TimelineContext, oracle_read_ts: Option, source_ids: BTreeSet, - in_immediate_multi_stmt_txn: bool, optimizer: optimize::peek::Optimizer, - global_mir_plan: optimize::peek::GlobalMirPlan, peek_ctx: PeekContext, }, } @@ -392,9 +390,10 @@ pub enum PeekContext { #[derive(Debug)] pub enum PeekStage { Validate(PeekStageValidate), - Timestamp(PeekStageTimestamp), - OptimizeMir(PeekStageOptimizeMir), + LinearizeTimestamp(PeekStageLinearizeTimestamp), RealTimeRecency(PeekStageRealTimeRecency), + TimestampReadHold(PeekStageTimestampReadHold), + OptimizeMir(PeekStageOptimizeMir), OptimizeLir(PeekStageOptimizeLir), Finish(PeekStageFinish), Explain(PeekStageExplain), @@ -404,9 +403,10 @@ impl PeekStage { fn validity(&mut self) -> Option<&mut PlanValidity> { match self { PeekStage::Validate(_) => None, - PeekStage::Timestamp(PeekStageTimestamp { validity, .. }) - | PeekStage::OptimizeMir(PeekStageOptimizeMir { validity, .. }) + PeekStage::LinearizeTimestamp(PeekStageLinearizeTimestamp { validity, .. }) | PeekStage::RealTimeRecency(PeekStageRealTimeRecency { validity, .. }) + | PeekStage::TimestampReadHold(PeekStageTimestampReadHold { validity, .. }) + | PeekStage::OptimizeMir(PeekStageOptimizeMir { validity, .. }) | PeekStage::OptimizeLir(PeekStageOptimizeLir { validity, .. }) | PeekStage::Finish(PeekStageFinish { validity, .. }) => Some(validity), PeekStage::Explain(PeekStageExplain { validity, .. }) => Some(validity), @@ -418,50 +418,54 @@ impl PeekStage { pub struct PeekStageValidate { plan: mz_sql::plan::SelectPlan, target_cluster: TargetCluster, - /// Context from where this peek initiated. peek_ctx: PeekContext, } #[derive(Debug)] -pub struct PeekStageTimestamp { +pub struct PeekStageLinearizeTimestamp { validity: PlanValidity, plan: mz_sql::plan::SelectPlan, source_ids: BTreeSet, target_replica: Option, timeline_context: TimelineContext, - in_immediate_multi_stmt_txn: bool, optimizer: optimize::peek::Optimizer, - /// Context from where this peek initiated. peek_ctx: PeekContext, } #[derive(Debug)] -pub struct PeekStageOptimizeMir { +pub struct PeekStageRealTimeRecency { validity: PlanValidity, plan: mz_sql::plan::SelectPlan, source_ids: BTreeSet, target_replica: Option, timeline_context: TimelineContext, oracle_read_ts: Option, - in_immediate_multi_stmt_txn: bool, optimizer: optimize::peek::Optimizer, - /// Context from where this peek initiated. peek_ctx: PeekContext, } #[derive(Debug)] -pub struct PeekStageRealTimeRecency { +pub struct PeekStageTimestampReadHold { validity: PlanValidity, plan: mz_sql::plan::SelectPlan, source_ids: BTreeSet, - id_bundle: CollectionIdBundle, target_replica: Option, timeline_context: TimelineContext, oracle_read_ts: Option, - in_immediate_multi_stmt_txn: bool, + real_time_recency_ts: Option, + optimizer: optimize::peek::Optimizer, + peek_ctx: PeekContext, +} + +#[derive(Debug)] +pub struct PeekStageOptimizeMir { + validity: PlanValidity, + plan: mz_sql::plan::SelectPlan, + source_ids: BTreeSet, + id_bundle: CollectionIdBundle, + target_replica: Option, + determination: TimestampDetermination, optimizer: optimize::peek::Optimizer, - global_mir_plan: optimize::peek::GlobalMirPlan, - /// Context from where this peek initiated. peek_ctx: PeekContext, } @@ -469,15 +473,12 @@ pub struct PeekStageRealTimeRecency { pub struct PeekStageOptimizeLir { validity: PlanValidity, plan: mz_sql::plan::SelectPlan, - id_bundle: Option, + id_bundle: CollectionIdBundle, target_replica: Option, - timeline_context: TimelineContext, - oracle_read_ts: Option, + determination: TimestampDetermination, source_ids: BTreeSet, - real_time_recency_ts: Option, optimizer: optimize::peek::Optimizer, global_mir_plan: optimize::peek::GlobalMirPlan, - /// Context from where this peek initiated. peek_ctx: PeekContext, } @@ -489,7 +490,6 @@ pub struct PeekStageFinish { target_replica: Option, source_ids: BTreeSet, determination: TimestampDetermination, - timestamp_context: TimestampContext, optimizer: optimize::peek::Optimizer, global_lir_plan: optimize::peek::GlobalLirPlan, } diff --git a/src/adapter/src/coord/message_handler.rs b/src/adapter/src/coord/message_handler.rs index 1e148cfec75aa..543561306c7df 100644 --- a/src/adapter/src/coord/message_handler.rs +++ b/src/adapter/src/coord/message_handler.rs @@ -34,7 +34,7 @@ use crate::command::Command; use crate::coord::appends::Deferred; use crate::coord::statement_logging::StatementLoggingId; use crate::coord::{ - Coordinator, CreateConnectionValidationReady, Message, PeekStage, PeekStageOptimizeLir, + Coordinator, CreateConnectionValidationReady, Message, PeekStage, PeekStageTimestampReadHold, PendingReadTxn, PlanValidity, PurifiedStatementReady, RealTimeRecencyContext, }; use crate::session::Session; @@ -854,25 +854,21 @@ impl Coordinator { timeline_context, oracle_read_ts, source_ids, - in_immediate_multi_stmt_txn: _, optimizer, - global_mir_plan, peek_ctx, } => { self.execute_peek_stage( ctx, root_otel_ctx, - PeekStage::OptimizeLir(PeekStageOptimizeLir { + PeekStage::TimestampReadHold(PeekStageTimestampReadHold { validity, plan, - id_bundle: None, target_replica, timeline_context, oracle_read_ts, source_ids, real_time_recency_ts: Some(real_time_recency_ts), optimizer, - global_mir_plan, peek_ctx, }), ) diff --git a/src/adapter/src/coord/sequencer/inner.rs b/src/adapter/src/coord/sequencer/inner.rs index d7374ffc9e01b..dc481abaa1b89 100644 --- a/src/adapter/src/coord/sequencer/inner.rs +++ b/src/adapter/src/coord/sequencer/inner.rs @@ -4199,7 +4199,7 @@ impl Coordinator { &self, session: &Session, source_ids: &BTreeSet, - query_as_of: Antichain, + query_as_of: &Antichain, is_oneshot: bool, ) -> Result, AdapterError> { if !session.vars().enable_session_cardinality_estimates() { @@ -4217,7 +4217,7 @@ impl Coordinator { let cached_stats = mz_ore::future::timeout( timeout, - CachedStatisticsOracle::new(source_ids, &query_as_of, self.controller.storage.as_ref()), + CachedStatisticsOracle::new(source_ids, query_as_of, self.controller.storage.as_ref()), ) .await; diff --git a/src/adapter/src/coord/sequencer/inner/peek.rs b/src/adapter/src/coord/sequencer/inner/peek.rs index 4ce992340d99e..cdc71846994b3 100644 --- a/src/adapter/src/coord/sequencer/inner/peek.rs +++ b/src/adapter/src/coord/sequencer/inner/peek.rs @@ -37,8 +37,9 @@ use crate::coord::timestamp_selection::{ }; use crate::coord::{ Coordinator, ExecuteContext, ExplainContext, Message, PeekContext, PeekStage, PeekStageExplain, - PeekStageFinish, PeekStageOptimizeLir, PeekStageOptimizeMir, PeekStageRealTimeRecency, - PeekStageTimestamp, PeekStageValidate, PlanValidity, RealTimeRecencyContext, TargetCluster, + PeekStageFinish, PeekStageLinearizeTimestamp, PeekStageOptimizeLir, PeekStageOptimizeMir, + PeekStageRealTimeRecency, PeekStageTimestampReadHold, PeekStageValidate, PlanValidity, + RealTimeRecencyContext, TargetCluster, }; use crate::error::AdapterError; use crate::explain::optimizer_trace::OptimizerTrace; @@ -149,24 +150,32 @@ impl Coordinator { let next = return_if_err!(self.peek_stage_validate(ctx.session_mut(), stage), ctx); - (ctx, PeekStage::Timestamp(next)) + (ctx, PeekStage::LinearizeTimestamp(next)) } - Timestamp(stage) => { - self.peek_stage_timestamp(ctx, root_otel_ctx.clone(), stage) - .await; - return; - } - OptimizeMir(stage) => { - self.peek_stage_optimize_mir(ctx, root_otel_ctx.clone(), stage) + LinearizeTimestamp(stage) => { + self.peek_stage_linearize_timestamp(ctx, root_otel_ctx.clone(), stage) .await; return; } RealTimeRecency(stage) => { match self.peek_stage_real_time_recency(ctx, root_otel_ctx.clone(), stage) { - Some((ctx, next)) => (ctx, PeekStage::OptimizeLir(next)), + Some((ctx, next)) => (ctx, PeekStage::TimestampReadHold(next)), None => return, } } + TimestampReadHold(stage) => { + let next = return_if_err!( + self.peek_stage_timestamp_read_hold(ctx.session_mut(), stage) + .await, + ctx + ); + (ctx, PeekStage::OptimizeMir(next)) + } + OptimizeMir(stage) => { + self.peek_stage_optimize_mir(ctx, root_otel_ctx.clone(), stage) + .await; + return; + } OptimizeLir(stage) => { self.peek_stage_optimize_lir(ctx, root_otel_ctx.clone(), stage) .await; @@ -196,7 +205,7 @@ impl Coordinator { target_cluster, peek_ctx, }: PeekStageValidate, - ) -> Result { + ) -> Result { // Collect optimizer parameters. let catalog = self.owned_catalog(); let cluster = catalog.resolve_target_cluster(target_cluster, session)?; @@ -249,8 +258,6 @@ impl Coordinator { // required because `source_ids` doesn't contain functions. timeline_context = TimelineContext::TimestampDependent; } - let in_immediate_multi_stmt_txn = session.transaction().is_in_multi_statement_transaction() - && plan.when == QueryWhen::Immediately; let notices = check_log_reads( &catalog, @@ -269,35 +276,32 @@ impl Coordinator { role_metadata: session.role_metadata().clone(), }; - Ok(PeekStageTimestamp { + Ok(PeekStageLinearizeTimestamp { validity, plan, source_ids, target_replica, timeline_context, - in_immediate_multi_stmt_txn, optimizer, peek_ctx, }) } - /// Determine a linearized read timestamp (from a `TimestampOracle`), if - /// needed. + /// Possibly linearize a timestamp from a `TimestampOracle`. #[tracing::instrument(level = "debug", skip_all)] - async fn peek_stage_timestamp( + async fn peek_stage_linearize_timestamp( &mut self, ctx: ExecuteContext, root_otel_ctx: OpenTelemetryContext, - PeekStageTimestamp { + PeekStageLinearizeTimestamp { validity, source_ids, plan, target_replica, timeline_context, - in_immediate_multi_stmt_txn, optimizer, peek_ctx, - }: PeekStageTimestamp, + }: PeekStageLinearizeTimestamp, ) { let isolation_level = ctx.session.vars().transaction_isolation().clone(); let linearized_timeline = @@ -305,20 +309,16 @@ impl Coordinator { let internal_cmd_tx = self.internal_cmd_tx.clone(); - let build_optimize_stage = - move |oracle_read_ts: Option| -> PeekStageOptimizeMir { - PeekStageOptimizeMir { - validity, - plan, - source_ids, - target_replica, - timeline_context, - oracle_read_ts, - in_immediate_multi_stmt_txn, - optimizer, - peek_ctx, - } - }; + let build_stage = move |oracle_read_ts: Option| PeekStageRealTimeRecency { + validity, + plan, + source_ids, + target_replica, + timeline_context, + oracle_read_ts, + optimizer, + peek_ctx, + }; match linearized_timeline { Some(timeline) => { @@ -331,9 +331,9 @@ impl Coordinator { let span = tracing::debug_span!("linearized timestamp task"); mz_ore::task::spawn(|| "linearized timestamp task", async move { let oracle_read_ts = shared_oracle.read_ts().instrument(span).await; - let stage = build_optimize_stage(Some(oracle_read_ts)); + let stage = build_stage(Some(oracle_read_ts)); - let stage = PeekStage::OptimizeMir(stage); + let stage = PeekStage::RealTimeRecency(stage); // Ignore errors if the coordinator has shut down. let _ = internal_cmd_tx.send(Message::PeekStageReady { ctx, @@ -346,9 +346,9 @@ impl Coordinator { // have to do it here. let oracle = self.get_timestamp_oracle(&timeline); let oracle_read_ts = oracle.read_ts().await; - let stage = build_optimize_stage(Some(oracle_read_ts)); + let stage = build_stage(Some(oracle_read_ts)); - let stage = PeekStage::OptimizeMir(stage); + let stage = PeekStage::RealTimeRecency(stage); // Ignore errors if the coordinator has shut down. let _ = internal_cmd_tx.send(Message::PeekStageReady { ctx, @@ -358,8 +358,8 @@ impl Coordinator { } } None => { - let stage = build_optimize_stage(None); - let stage = PeekStage::OptimizeMir(stage); + let stage = build_stage(None); + let stage = PeekStage::RealTimeRecency(stage); // Ignore errors if the coordinator has shut down. let _ = internal_cmd_tx.send(Message::PeekStageReady { ctx, @@ -370,19 +370,69 @@ impl Coordinator { } } + /// Determine a read timestamp and create appropriate read holds. + #[tracing::instrument(level = "debug", skip_all)] + async fn peek_stage_timestamp_read_hold( + &mut self, + session: &mut Session, + PeekStageTimestampReadHold { + mut validity, + plan, + source_ids, + target_replica, + timeline_context, + oracle_read_ts, + real_time_recency_ts, + optimizer, + peek_ctx, + }: PeekStageTimestampReadHold, + ) -> Result { + let id_bundle = self + .dataflow_builder(optimizer.cluster_id()) + .sufficient_collections(&source_ids); + + // Although we have added `sources.depends_on()` to the validity already, also add the + // sufficient collections for safety. + validity.dependency_ids.extend(id_bundle.iter()); + + let determination = self + .sequence_peek_timestamp( + session, + &plan.when, + optimizer.cluster_id(), + timeline_context, + oracle_read_ts, + &id_bundle, + &source_ids, + real_time_recency_ts, + (&peek_ctx).into(), + ) + .await?; + + Ok(PeekStageOptimizeMir { + validity, + plan, + source_ids, + id_bundle, + target_replica, + determination, + optimizer, + peek_ctx, + }) + } + #[tracing::instrument(level = "debug", skip_all)] async fn peek_stage_optimize_mir( &mut self, ctx: ExecuteContext, root_otel_ctx: OpenTelemetryContext, PeekStageOptimizeMir { - mut validity, + validity, plan, source_ids, + id_bundle, target_replica, - timeline_context, - oracle_read_ts, - in_immediate_multi_stmt_txn, + determination, mut optimizer, peek_ctx, }: PeekStageOptimizeMir, @@ -391,40 +441,15 @@ impl Coordinator { // expensive optimizations. let internal_cmd_tx = self.internal_cmd_tx.clone(); - // TODO: Is there a way to avoid making two dataflow_builders (the second is in - // optimize_peek)? - let id_bundle = self - .dataflow_builder(optimizer.cluster_id()) - .sufficient_collections(&source_ids); - // Although we have added `sources.depends_on()` to the validity already, also add the - // sufficient collections for safety. - validity.dependency_ids.extend(id_bundle.iter()); - - let stats = { - match self - .determine_timestamp( - ctx.session(), - &id_bundle, - &plan.when, - optimizer.cluster_id(), - &timeline_context, - oracle_read_ts.clone(), - None, - ) - .await - { - Err(_) => Box::new(EmptyStatisticsOracle), - Ok(query_as_of) => self - .statistics_oracle( - ctx.session(), - &source_ids, - query_as_of.timestamp_context.antichain(), - true, - ) - .await - .unwrap_or_else(|_| Box::new(EmptyStatisticsOracle)), - } - }; + let stats = self + .statistics_oracle( + ctx.session(), + &source_ids, + &determination.timestamp_context.antichain(), + true, + ) + .await + .unwrap_or_else(|_| Box::new(EmptyStatisticsOracle)); mz_ore::task::spawn_blocking( || "optimize peek (MIR)", @@ -453,18 +478,16 @@ impl Coordinator { }; let stage = match pipeline() { - Ok(global_mir_plan) => PeekStage::RealTimeRecency(PeekStageRealTimeRecency { + Ok(global_mir_plan) => PeekStage::OptimizeLir(PeekStageOptimizeLir { validity, plan, source_ids, id_bundle, target_replica, - timeline_context, - oracle_read_ts, - in_immediate_multi_stmt_txn, optimizer, global_mir_plan, peek_ctx, + determination, }), // Internal optimizer errors are handled differently // depending on the caller. @@ -514,16 +537,13 @@ impl Coordinator { validity, plan, source_ids, - id_bundle, target_replica, timeline_context, oracle_read_ts, - in_immediate_multi_stmt_txn, optimizer, - global_mir_plan, peek_ctx, }: PeekStageRealTimeRecency, - ) -> Option<(ExecuteContext, PeekStageOptimizeLir)> { + ) -> Option<(ExecuteContext, PeekStageTimestampReadHold)> { match self.recent_timestamp(ctx.session(), source_ids.iter().cloned()) { Some(fut) => { let internal_cmd_tx = self.internal_cmd_tx.clone(); @@ -538,9 +558,7 @@ impl Coordinator { timeline_context, oracle_read_ts: oracle_read_ts.clone(), source_ids, - in_immediate_multi_stmt_txn, optimizer, - global_mir_plan, peek_ctx, }, ); @@ -560,18 +578,16 @@ impl Coordinator { } None => Some(( ctx, - PeekStageOptimizeLir { + PeekStageTimestampReadHold { validity, plan, - id_bundle: Some(id_bundle), target_replica, timeline_context, - oracle_read_ts, source_ids, - real_time_recency_ts: None, optimizer, - global_mir_plan, peek_ctx, + oracle_read_ts, + real_time_recency_ts: None, }, )), } @@ -587,10 +603,8 @@ impl Coordinator { plan, id_bundle, target_replica, - timeline_context, - oracle_read_ts, + determination, source_ids, - real_time_recency_ts, mut optimizer, global_mir_plan, peek_ctx, @@ -600,26 +614,6 @@ impl Coordinator { // expensive optimizations. let internal_cmd_tx = self.internal_cmd_tx.clone(); - let id_bundle = id_bundle.unwrap_or_else(|| { - self.index_oracle(optimizer.cluster_id()) - .sufficient_collections(&source_ids) - }); - - let determination = self - .sequence_peek_timestamp( - ctx.session_mut(), - &plan.when, - optimizer.cluster_id(), - timeline_context, - oracle_read_ts, - &id_bundle, - &source_ids, - real_time_recency_ts, - (&peek_ctx).into(), - ) - .await; - - let determination = return_if_err!(determination, ctx); let timestamp_context = determination.clone().timestamp_context; mz_ore::task::spawn_blocking( @@ -693,7 +687,6 @@ impl Coordinator { target_replica, source_ids, determination, - timestamp_context, optimizer, global_lir_plan, }), @@ -749,7 +742,6 @@ impl Coordinator { target_replica, source_ids, determination, - timestamp_context, optimizer, global_lir_plan, }: PeekStageFinish, @@ -780,7 +772,7 @@ impl Coordinator { } if let Some(uuid) = ctx.extra().contents() { - let ts = timestamp_context.timestamp_or_default(); + let ts = determination.timestamp_context.timestamp_or_default(); let mut transitive_storage_deps = BTreeSet::new(); let mut transitive_compute_deps = BTreeSet::new(); for id in id_bundle