Skip to content

Commit

Permalink
adapter: refactor peek timestamps
Browse files Browse the repository at this point in the history
Refactor peek timestamps. The previous problems were:

- Optimize stats determined their own timestamp
  early on, and was possibly not the final
  timestamp. This doesn't matter hugely, but also
  duplicated a bit of work.
- We were not able to fill in the final timestamp
  until very late in LIR optimization, missing
  potential optimization opportunities and doing
  the scalar walk twice.
- It was generally confusing when timestamps were
  acquired because we also have realtime and
  linear timestamps.

Fix these by adding a new stage, and moving and
renaming other stages. The new stage does the
final timestamp determination and acquires read
holds. Now all three timestamp stages happen first
(linearization, realtime recency, final
determination and read holds), then both optimizer
stages, then final execution.

We did not do this before because we always
delayed getting read holds until some end step
with no off-thread work after it. Now that we
always acquire read holds immediately upon getting
a timestamp, this is no longer a worry.

In a future PR we could now maybe merge the
optimization stages, and remove the double scalar
expr prep.
  • Loading branch information
maddyblue committed Feb 2, 2024
1 parent a9888e3 commit cdb1889
Show file tree
Hide file tree
Showing 4 changed files with 141 additions and 153 deletions.
48 changes: 24 additions & 24 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,9 +365,7 @@ pub enum RealTimeRecencyContext {
timeline_context: TimelineContext,
oracle_read_ts: Option<Timestamp>,
source_ids: BTreeSet<GlobalId>,
in_immediate_multi_stmt_txn: bool,
optimizer: optimize::peek::Optimizer,
global_mir_plan: optimize::peek::GlobalMirPlan,
peek_ctx: PeekContext,
},
}
Expand All @@ -392,9 +390,10 @@ pub enum PeekContext {
#[derive(Debug)]
pub enum PeekStage {
Validate(PeekStageValidate),
Timestamp(PeekStageTimestamp),
OptimizeMir(PeekStageOptimizeMir),
LinearizeTimestamp(PeekStageLinearizeTimestamp),
RealTimeRecency(PeekStageRealTimeRecency),
TimestampReadHold(PeekStageTimestampReadHold),
OptimizeMir(PeekStageOptimizeMir),
OptimizeLir(PeekStageOptimizeLir),
Finish(PeekStageFinish),
Explain(PeekStageExplain),
Expand All @@ -404,9 +403,10 @@ impl PeekStage {
fn validity(&mut self) -> Option<&mut PlanValidity> {
match self {
PeekStage::Validate(_) => None,
PeekStage::Timestamp(PeekStageTimestamp { validity, .. })
| PeekStage::OptimizeMir(PeekStageOptimizeMir { validity, .. })
PeekStage::LinearizeTimestamp(PeekStageLinearizeTimestamp { validity, .. })
| PeekStage::RealTimeRecency(PeekStageRealTimeRecency { validity, .. })
| PeekStage::TimestampReadHold(PeekStageTimestampReadHold { validity, .. })
| PeekStage::OptimizeMir(PeekStageOptimizeMir { validity, .. })
| PeekStage::OptimizeLir(PeekStageOptimizeLir { validity, .. })
| PeekStage::Finish(PeekStageFinish { validity, .. }) => Some(validity),
PeekStage::Explain(PeekStageExplain { validity, .. }) => Some(validity),
Expand All @@ -418,66 +418,67 @@ impl PeekStage {
pub struct PeekStageValidate {
plan: mz_sql::plan::SelectPlan,
target_cluster: TargetCluster,
/// Context from where this peek initiated.
peek_ctx: PeekContext,
}

#[derive(Debug)]
pub struct PeekStageTimestamp {
pub struct PeekStageLinearizeTimestamp {
validity: PlanValidity,
plan: mz_sql::plan::SelectPlan,
source_ids: BTreeSet<GlobalId>,
target_replica: Option<ReplicaId>,
timeline_context: TimelineContext,
in_immediate_multi_stmt_txn: bool,
optimizer: optimize::peek::Optimizer,
/// Context from where this peek initiated.
peek_ctx: PeekContext,
}

#[derive(Debug)]
pub struct PeekStageOptimizeMir {
pub struct PeekStageRealTimeRecency {
validity: PlanValidity,
plan: mz_sql::plan::SelectPlan,
source_ids: BTreeSet<GlobalId>,
target_replica: Option<ReplicaId>,
timeline_context: TimelineContext,
oracle_read_ts: Option<Timestamp>,
in_immediate_multi_stmt_txn: bool,
optimizer: optimize::peek::Optimizer,
/// Context from where this peek initiated.
peek_ctx: PeekContext,
}

#[derive(Debug)]
pub struct PeekStageRealTimeRecency {
pub struct PeekStageTimestampReadHold {
validity: PlanValidity,
plan: mz_sql::plan::SelectPlan,
source_ids: BTreeSet<GlobalId>,
id_bundle: CollectionIdBundle,
target_replica: Option<ReplicaId>,
timeline_context: TimelineContext,
oracle_read_ts: Option<Timestamp>,
in_immediate_multi_stmt_txn: bool,
real_time_recency_ts: Option<mz_repr::Timestamp>,
optimizer: optimize::peek::Optimizer,
peek_ctx: PeekContext,
}

#[derive(Debug)]
pub struct PeekStageOptimizeMir {
validity: PlanValidity,
plan: mz_sql::plan::SelectPlan,
source_ids: BTreeSet<GlobalId>,
id_bundle: CollectionIdBundle,
target_replica: Option<ReplicaId>,
determination: TimestampDetermination<mz_repr::Timestamp>,
optimizer: optimize::peek::Optimizer,
global_mir_plan: optimize::peek::GlobalMirPlan,
/// Context from where this peek initiated.
peek_ctx: PeekContext,
}

#[derive(Debug)]
pub struct PeekStageOptimizeLir {
validity: PlanValidity,
plan: mz_sql::plan::SelectPlan,
id_bundle: Option<CollectionIdBundle>,
id_bundle: CollectionIdBundle,
target_replica: Option<ReplicaId>,
timeline_context: TimelineContext,
oracle_read_ts: Option<Timestamp>,
determination: TimestampDetermination<mz_repr::Timestamp>,
source_ids: BTreeSet<GlobalId>,
real_time_recency_ts: Option<mz_repr::Timestamp>,
optimizer: optimize::peek::Optimizer,
global_mir_plan: optimize::peek::GlobalMirPlan,
/// Context from where this peek initiated.
peek_ctx: PeekContext,
}

Expand All @@ -489,7 +490,6 @@ pub struct PeekStageFinish {
target_replica: Option<ReplicaId>,
source_ids: BTreeSet<GlobalId>,
determination: TimestampDetermination<mz_repr::Timestamp>,
timestamp_context: TimestampContext<mz_repr::Timestamp>,
optimizer: optimize::peek::Optimizer,
global_lir_plan: optimize::peek::GlobalLirPlan,
}
Expand Down
8 changes: 2 additions & 6 deletions src/adapter/src/coord/message_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use crate::command::Command;
use crate::coord::appends::Deferred;
use crate::coord::statement_logging::StatementLoggingId;
use crate::coord::{
Coordinator, CreateConnectionValidationReady, Message, PeekStage, PeekStageOptimizeLir,
Coordinator, CreateConnectionValidationReady, Message, PeekStage, PeekStageTimestampReadHold,
PendingReadTxn, PlanValidity, PurifiedStatementReady, RealTimeRecencyContext,
};
use crate::session::Session;
Expand Down Expand Up @@ -854,25 +854,21 @@ impl Coordinator {
timeline_context,
oracle_read_ts,
source_ids,
in_immediate_multi_stmt_txn: _,
optimizer,
global_mir_plan,
peek_ctx,
} => {
self.execute_peek_stage(
ctx,
root_otel_ctx,
PeekStage::OptimizeLir(PeekStageOptimizeLir {
PeekStage::TimestampReadHold(PeekStageTimestampReadHold {
validity,
plan,
id_bundle: None,
target_replica,
timeline_context,
oracle_read_ts,
source_ids,
real_time_recency_ts: Some(real_time_recency_ts),
optimizer,
global_mir_plan,
peek_ctx,
}),
)
Expand Down
4 changes: 2 additions & 2 deletions src/adapter/src/coord/sequencer/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4199,7 +4199,7 @@ impl Coordinator {
&self,
session: &Session,
source_ids: &BTreeSet<GlobalId>,
query_as_of: Antichain<Timestamp>,
query_as_of: &Antichain<Timestamp>,
is_oneshot: bool,
) -> Result<Box<dyn mz_transform::StatisticsOracle>, AdapterError> {
if !session.vars().enable_session_cardinality_estimates() {
Expand All @@ -4217,7 +4217,7 @@ impl Coordinator {

let cached_stats = mz_ore::future::timeout(
timeout,
CachedStatisticsOracle::new(source_ids, &query_as_of, self.controller.storage.as_ref()),
CachedStatisticsOracle::new(source_ids, query_as_of, self.controller.storage.as_ref()),
)
.await;

Expand Down
Loading

0 comments on commit cdb1889

Please sign in to comment.