Skip to content

Commit

Permalink
Merge pull request #24930 from mjibson/refactor-peek-timestamp
Browse files Browse the repository at this point in the history
adapter: refactor peek timestamps
  • Loading branch information
maddyblue authored Feb 5, 2024
2 parents 1bf8044 + cdb1889 commit b2c3964
Show file tree
Hide file tree
Showing 4 changed files with 141 additions and 153 deletions.
48 changes: 24 additions & 24 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,9 +365,7 @@ pub enum RealTimeRecencyContext {
timeline_context: TimelineContext,
oracle_read_ts: Option<Timestamp>,
source_ids: BTreeSet<GlobalId>,
in_immediate_multi_stmt_txn: bool,
optimizer: optimize::peek::Optimizer,
global_mir_plan: optimize::peek::GlobalMirPlan,
peek_ctx: PeekContext,
},
}
Expand All @@ -392,9 +390,10 @@ pub enum PeekContext {
#[derive(Debug)]
pub enum PeekStage {
Validate(PeekStageValidate),
Timestamp(PeekStageTimestamp),
OptimizeMir(PeekStageOptimizeMir),
LinearizeTimestamp(PeekStageLinearizeTimestamp),
RealTimeRecency(PeekStageRealTimeRecency),
TimestampReadHold(PeekStageTimestampReadHold),
OptimizeMir(PeekStageOptimizeMir),
OptimizeLir(PeekStageOptimizeLir),
Finish(PeekStageFinish),
Explain(PeekStageExplain),
Expand All @@ -404,9 +403,10 @@ impl PeekStage {
fn validity(&mut self) -> Option<&mut PlanValidity> {
match self {
PeekStage::Validate(_) => None,
PeekStage::Timestamp(PeekStageTimestamp { validity, .. })
| PeekStage::OptimizeMir(PeekStageOptimizeMir { validity, .. })
PeekStage::LinearizeTimestamp(PeekStageLinearizeTimestamp { validity, .. })
| PeekStage::RealTimeRecency(PeekStageRealTimeRecency { validity, .. })
| PeekStage::TimestampReadHold(PeekStageTimestampReadHold { validity, .. })
| PeekStage::OptimizeMir(PeekStageOptimizeMir { validity, .. })
| PeekStage::OptimizeLir(PeekStageOptimizeLir { validity, .. })
| PeekStage::Finish(PeekStageFinish { validity, .. }) => Some(validity),
PeekStage::Explain(PeekStageExplain { validity, .. }) => Some(validity),
Expand All @@ -418,66 +418,67 @@ impl PeekStage {
pub struct PeekStageValidate {
plan: mz_sql::plan::SelectPlan,
target_cluster: TargetCluster,
/// Context from where this peek initiated.
peek_ctx: PeekContext,
}

#[derive(Debug)]
pub struct PeekStageTimestamp {
pub struct PeekStageLinearizeTimestamp {
validity: PlanValidity,
plan: mz_sql::plan::SelectPlan,
source_ids: BTreeSet<GlobalId>,
target_replica: Option<ReplicaId>,
timeline_context: TimelineContext,
in_immediate_multi_stmt_txn: bool,
optimizer: optimize::peek::Optimizer,
/// Context from where this peek initiated.
peek_ctx: PeekContext,
}

#[derive(Debug)]
pub struct PeekStageOptimizeMir {
pub struct PeekStageRealTimeRecency {
validity: PlanValidity,
plan: mz_sql::plan::SelectPlan,
source_ids: BTreeSet<GlobalId>,
target_replica: Option<ReplicaId>,
timeline_context: TimelineContext,
oracle_read_ts: Option<Timestamp>,
in_immediate_multi_stmt_txn: bool,
optimizer: optimize::peek::Optimizer,
/// Context from where this peek initiated.
peek_ctx: PeekContext,
}

#[derive(Debug)]
pub struct PeekStageRealTimeRecency {
pub struct PeekStageTimestampReadHold {
validity: PlanValidity,
plan: mz_sql::plan::SelectPlan,
source_ids: BTreeSet<GlobalId>,
id_bundle: CollectionIdBundle,
target_replica: Option<ReplicaId>,
timeline_context: TimelineContext,
oracle_read_ts: Option<Timestamp>,
in_immediate_multi_stmt_txn: bool,
real_time_recency_ts: Option<mz_repr::Timestamp>,
optimizer: optimize::peek::Optimizer,
peek_ctx: PeekContext,
}

#[derive(Debug)]
pub struct PeekStageOptimizeMir {
validity: PlanValidity,
plan: mz_sql::plan::SelectPlan,
source_ids: BTreeSet<GlobalId>,
id_bundle: CollectionIdBundle,
target_replica: Option<ReplicaId>,
determination: TimestampDetermination<mz_repr::Timestamp>,
optimizer: optimize::peek::Optimizer,
global_mir_plan: optimize::peek::GlobalMirPlan,
/// Context from where this peek initiated.
peek_ctx: PeekContext,
}

#[derive(Debug)]
pub struct PeekStageOptimizeLir {
validity: PlanValidity,
plan: mz_sql::plan::SelectPlan,
id_bundle: Option<CollectionIdBundle>,
id_bundle: CollectionIdBundle,
target_replica: Option<ReplicaId>,
timeline_context: TimelineContext,
oracle_read_ts: Option<Timestamp>,
determination: TimestampDetermination<mz_repr::Timestamp>,
source_ids: BTreeSet<GlobalId>,
real_time_recency_ts: Option<mz_repr::Timestamp>,
optimizer: optimize::peek::Optimizer,
global_mir_plan: optimize::peek::GlobalMirPlan,
/// Context from where this peek initiated.
peek_ctx: PeekContext,
}

Expand All @@ -489,7 +490,6 @@ pub struct PeekStageFinish {
target_replica: Option<ReplicaId>,
source_ids: BTreeSet<GlobalId>,
determination: TimestampDetermination<mz_repr::Timestamp>,
timestamp_context: TimestampContext<mz_repr::Timestamp>,
optimizer: optimize::peek::Optimizer,
global_lir_plan: optimize::peek::GlobalLirPlan,
}
Expand Down
8 changes: 2 additions & 6 deletions src/adapter/src/coord/message_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use crate::command::Command;
use crate::coord::appends::Deferred;
use crate::coord::statement_logging::StatementLoggingId;
use crate::coord::{
Coordinator, CreateConnectionValidationReady, Message, PeekStage, PeekStageOptimizeLir,
Coordinator, CreateConnectionValidationReady, Message, PeekStage, PeekStageTimestampReadHold,
PendingReadTxn, PlanValidity, PurifiedStatementReady, RealTimeRecencyContext,
};
use crate::session::Session;
Expand Down Expand Up @@ -854,25 +854,21 @@ impl Coordinator {
timeline_context,
oracle_read_ts,
source_ids,
in_immediate_multi_stmt_txn: _,
optimizer,
global_mir_plan,
peek_ctx,
} => {
self.execute_peek_stage(
ctx,
root_otel_ctx,
PeekStage::OptimizeLir(PeekStageOptimizeLir {
PeekStage::TimestampReadHold(PeekStageTimestampReadHold {
validity,
plan,
id_bundle: None,
target_replica,
timeline_context,
oracle_read_ts,
source_ids,
real_time_recency_ts: Some(real_time_recency_ts),
optimizer,
global_mir_plan,
peek_ctx,
}),
)
Expand Down
4 changes: 2 additions & 2 deletions src/adapter/src/coord/sequencer/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4209,7 +4209,7 @@ impl Coordinator {
&self,
session: &Session,
source_ids: &BTreeSet<GlobalId>,
query_as_of: Antichain<Timestamp>,
query_as_of: &Antichain<Timestamp>,
is_oneshot: bool,
) -> Result<Box<dyn mz_transform::StatisticsOracle>, AdapterError> {
if !session.vars().enable_session_cardinality_estimates() {
Expand All @@ -4227,7 +4227,7 @@ impl Coordinator {

let cached_stats = mz_ore::future::timeout(
timeout,
CachedStatisticsOracle::new(source_ids, &query_as_of, self.controller.storage.as_ref()),
CachedStatisticsOracle::new(source_ids, query_as_of, self.controller.storage.as_ref()),
)
.await;

Expand Down
Loading

0 comments on commit b2c3964

Please sign in to comment.