Skip to content

Commit

Permalink
feat(snapshot): persist snapshot backfill epoch in fragment stream no…
Browse files Browse the repository at this point in the history
…de (#19751)
  • Loading branch information
wenym1 authored Dec 25, 2024
1 parent ef5835e commit ae48d9d
Show file tree
Hide file tree
Showing 9 changed files with 208 additions and 54 deletions.
2 changes: 2 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,8 @@ message StreamScanNode {
// The state table used by ArrangementBackfill to replicate upstream mview's state table.
// Used iff `ChainType::ArrangementBackfill`.
catalog.Table arrangement_table = 10;

optional uint64 snapshot_backfill_epoch = 11;
}

// Config options for CDC backfill
Expand Down
41 changes: 35 additions & 6 deletions src/meta/src/barrier/checkpoint/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use crate::barrier::{
};
use crate::manager::ActiveStreamingWorkerNodes;
use crate::rpc::metrics::GLOBAL_META_METRICS;
use crate::stream::fill_snapshot_backfill_epoch;
use crate::{MetaError, MetaResult};

#[derive(Default)]
Expand Down Expand Up @@ -695,8 +696,10 @@ impl DatabaseCheckpointControl {
progress_epoch,
creating_job
.snapshot_backfill_info
.upstream_mv_table_ids
.clone(),
.upstream_mv_table_id_to_backfill_epoch
.keys()
.cloned()
.collect(),
),
)
})
Expand Down Expand Up @@ -977,7 +980,7 @@ impl DatabaseCheckpointControl {
if let Some(Command::CreateStreamingJob {
job_type: CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info),
info,
}) = &command
}) = &mut command
{
if self.state.paused_reason().is_some() {
warn!("cannot create streaming job with snapshot backfill when paused");
Expand All @@ -989,18 +992,44 @@ impl DatabaseCheckpointControl {
}
return Ok(());
}
// set snapshot epoch of upstream table for snapshot backfill
for snapshot_backfill_epoch in snapshot_backfill_info
.upstream_mv_table_id_to_backfill_epoch
.values_mut()
{
assert!(
snapshot_backfill_epoch
.replace(barrier_info.prev_epoch())
.is_none(),
"must not set previously"
);
}
for stream_actor in info
.stream_job_fragments
.fragments
.values_mut()
.flat_map(|fragment| fragment.actors.iter_mut())
{
fill_snapshot_backfill_epoch(
stream_actor.nodes.as_mut().expect("should exist"),
&snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch,
)?;
}
let info = info.clone();
let job_id = info.stream_job_fragments.stream_job_id();
let snapshot_backfill_info = snapshot_backfill_info.clone();
let mutation = command
.as_ref()
.expect("checked Some")
.to_mutation(None)
.expect("should have some mutation in `CreateStreamingJob` command");
let job_id = info.stream_job_fragments.stream_job_id();

control_stream_manager.add_partial_graph(self.database_id, Some(job_id))?;
self.creating_streaming_job_controls.insert(
job_id,
CreatingStreamingJobControl::new(
info.clone(),
snapshot_backfill_info.clone(),
info,
snapshot_backfill_info,
barrier_info.prev_epoch(),
hummock_version_stats,
mutation,
Expand Down
18 changes: 11 additions & 7 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,10 @@ impl CreateStreamingJobCommandInfo {

#[derive(Debug, Clone)]
pub struct SnapshotBackfillInfo {
pub upstream_mv_table_ids: HashSet<TableId>,
/// `table_id` -> `Some(snapshot_backfill_epoch)`
/// The `snapshot_backfill_epoch` should be None at the beginning, and be filled
/// by global barrier worker when handling the command.
pub upstream_mv_table_id_to_backfill_epoch: HashMap<TableId, Option<u64>>,
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -697,8 +700,8 @@ impl Command {
job_type
{
snapshot_backfill_info
.upstream_mv_table_ids
.iter()
.upstream_mv_table_id_to_backfill_epoch
.keys()
.map(|table_id| SubscriptionUpstreamInfo {
subscriber_id: table_fragments.stream_job_id().table_id,
upstream_mv_table_id: table_id.table_id,
Expand Down Expand Up @@ -747,12 +750,13 @@ impl Command {
info: jobs_to_merge
.iter()
.flat_map(|(table_id, (backfill_info, _))| {
backfill_info.upstream_mv_table_ids.iter().map(
move |upstream_table_id| SubscriptionUpstreamInfo {
backfill_info
.upstream_mv_table_id_to_backfill_epoch
.keys()
.map(move |upstream_table_id| SubscriptionUpstreamInfo {
subscriber_id: table_id.table_id,
upstream_mv_table_id: upstream_table_id.table_id,
},
)
})
})
.collect(),
}))
Expand Down
77 changes: 51 additions & 26 deletions src/meta/src/barrier/context/context_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use risingwave_common::catalog::DatabaseId;
use risingwave_pb::common::WorkerNode;
use risingwave_pb::hummock::HummockVersionStats;
use risingwave_pb::meta::PausedReason;
use risingwave_pb::stream_plan::PbFragmentTypeFlag;
use risingwave_pb::stream_service::streaming_control_stream_request::PbInitRequest;
use risingwave_pb::stream_service::WaitEpochCommitRequest;
use risingwave_rpc_client::StreamingControlHandle;
Expand Down Expand Up @@ -178,8 +179,57 @@ impl CommandContext {
.unregister_table_ids(unregistered_state_table_ids.iter().cloned())
.await?;
}

Command::CreateStreamingJob { info, job_type } => {
let mut fragment_replacements = None;
let mut dropped_actors = None;
match job_type {
CreateStreamingJobType::SinkIntoTable(
replace_plan @ ReplaceStreamJobPlan {
new_fragments,
dispatchers,
init_split_assignment,
..
},
) => {
barrier_manager_context
.metadata_manager
.catalog_controller
.post_collect_job_fragments(
new_fragments.stream_job_id().table_id as _,
new_fragments.actor_ids(),
dispatchers.clone(),
init_split_assignment,
)
.await?;
fragment_replacements = Some(replace_plan.fragment_replacements());
dropped_actors = Some(replace_plan.dropped_actors());
}
CreateStreamingJobType::Normal => {}
CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) => {
barrier_manager_context
.metadata_manager
.catalog_controller
.fill_snapshot_backfill_epoch(
info.stream_job_fragments.fragments.iter().filter_map(
|(fragment_id, fragment)| {
if (fragment.fragment_type_mask
& PbFragmentTypeFlag::SnapshotBackfillStreamScan as u32)
!= 0
{
Some(*fragment_id as _)
} else {
None
}
},
),
&snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch,
)
.await?
}
}

// Do `post_collect_job_fragments` of the original streaming job in the end, so that in any previous failure,
// we won't mark the job as `Creating`, and then the job will be later clean by the recovery triggered by the returned error.
let CreateStreamingJobCommandInfo {
stream_job_fragments,
dispatchers,
Expand All @@ -197,31 +247,6 @@ impl CommandContext {
)
.await?;

let mut fragment_replacements = None;
let mut dropped_actors = None;
if let CreateStreamingJobType::SinkIntoTable(
replace_plan @ ReplaceStreamJobPlan {
new_fragments,
dispatchers,
init_split_assignment,
..
},
) = job_type
{
barrier_manager_context
.metadata_manager
.catalog_controller
.post_collect_job_fragments(
new_fragments.stream_job_id().table_id as _,
new_fragments.actor_ids(),
dispatchers.clone(),
init_split_assignment,
)
.await?;
fragment_replacements = Some(replace_plan.fragment_replacements());
dropped_actors = Some(replace_plan.dropped_actors());
}

// Extract the fragments that include source operators.
let source_fragments = stream_job_fragments.stream_source_fragments();
let backfill_fragments = stream_job_fragments.source_backfill_fragments()?;
Expand Down
28 changes: 28 additions & 0 deletions src/meta/src/controller/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1293,6 +1293,34 @@ impl CatalogController {
Ok(())
}

pub async fn fill_snapshot_backfill_epoch(
&self,
fragment_ids: impl Iterator<Item = FragmentId>,
upstream_mv_snapshot_epoch: &HashMap<risingwave_common::catalog::TableId, Option<u64>>,
) -> MetaResult<()> {
let inner = self.inner.write().await;
let txn = inner.db.begin().await?;
for fragment_id in fragment_ids {
let fragment = Fragment::find_by_id(fragment_id)
.one(&txn)
.await?
.context(format!("fragment {} not found", fragment_id))?;
let mut node = fragment.stream_node.to_protobuf();
if crate::stream::fill_snapshot_backfill_epoch(&mut node, upstream_mv_snapshot_epoch)? {
let node = StreamNode::from(&node);
Fragment::update(fragment::ActiveModel {
fragment_id: Set(fragment_id),
stream_node: Set(node),
..Default::default()
})
.exec(&txn)
.await?;
}
}
txn.commit().await?;
Ok(())
}

/// Get the actor ids of the fragment with `fragment_id` with `Running` status.
pub async fn get_running_actors_of_fragment(
&self,
Expand Down
4 changes: 3 additions & 1 deletion src/meta/src/stream/stream_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,7 @@ mod id;
mod schedule;

pub use actor::{ActorGraphBuildResult, ActorGraphBuilder};
pub use fragment::{CompleteStreamFragmentGraph, StreamFragmentGraph};
pub use fragment::{
fill_snapshot_backfill_epoch, CompleteStreamFragmentGraph, StreamFragmentGraph,
};
pub use schedule::Locations;
54 changes: 47 additions & 7 deletions src/meta/src/stream/stream_graph/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ use risingwave_common::catalog::{
use risingwave_common::hash::VnodeCount;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common::util::stream_graph_visitor;
use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
use risingwave_common::util::stream_graph_visitor::{
visit_stream_node_cont, visit_stream_node_cont_mut,
};
use risingwave_meta_model::WorkerId;
use risingwave_pb::catalog::Table;
use risingwave_pb::ddl_service::TableJobType;
Expand All @@ -38,7 +40,7 @@ use risingwave_pb::stream_plan::stream_fragment_graph::{
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::{
DispatchStrategy, DispatcherType, FragmentTypeFlag, StreamActor,
StreamFragmentGraph as StreamFragmentGraphProto, StreamScanNode, StreamScanType,
StreamFragmentGraph as StreamFragmentGraphProto, StreamNode, StreamScanNode, StreamScanType,
};

use crate::barrier::SnapshotBackfillInfo;
Expand Down Expand Up @@ -602,8 +604,8 @@ impl StreamFragmentGraph {
match (prev_snapshot_backfill_info, is_snapshot_backfill) {
(Some(prev_snapshot_backfill_info), true) => {
prev_snapshot_backfill_info
.upstream_mv_table_ids
.insert(TableId::new(stream_scan.table_id));
.upstream_mv_table_id_to_backfill_epoch
.insert(TableId::new(stream_scan.table_id), None);
true
}
(None, false) => true,
Expand All @@ -617,9 +619,9 @@ impl StreamFragmentGraph {
prev_stream_scan = Some((
if is_snapshot_backfill {
Some(SnapshotBackfillInfo {
upstream_mv_table_ids: HashSet::from_iter([TableId::new(
stream_scan.table_id,
)]),
upstream_mv_table_id_to_backfill_epoch: HashMap::from_iter(
[(TableId::new(stream_scan.table_id), None)],
),
})
} else {
None
Expand All @@ -642,6 +644,44 @@ impl StreamFragmentGraph {
}
}

/// Fill snapshot epoch for `StreamScanNode` of `SnapshotBackfill`.
/// Return `true` when has change applied.
pub fn fill_snapshot_backfill_epoch(
node: &mut StreamNode,
upstream_mv_table_snapshot_epoch: &HashMap<TableId, Option<u64>>,
) -> MetaResult<bool> {
let mut result = Ok(());
let mut applied = false;
visit_stream_node_cont_mut(node, |node| {
if let Some(NodeBody::StreamScan(stream_scan)) = node.node_body.as_mut()
&& stream_scan.stream_scan_type == StreamScanType::SnapshotBackfill as i32
{
result = try {
let table_id = TableId::new(stream_scan.table_id);
let snapshot_epoch = upstream_mv_table_snapshot_epoch
.get(&table_id)
.ok_or_else(|| anyhow!("upstream table id not covered: {}", table_id))?
.ok_or_else(|| anyhow!("upstream table id not set: {}", table_id))?;
if let Some(prev_snapshot_epoch) =
stream_scan.snapshot_backfill_epoch.replace(snapshot_epoch)
{
Err(anyhow!(
"snapshot backfill epoch set again: {} {} {}",
table_id,
prev_snapshot_epoch,
snapshot_epoch
))?;
}
applied = true;
};
result.is_ok()
} else {
true
}
});
result.map(|_| applied)
}

static EMPTY_HASHMAP: LazyLock<HashMap<GlobalFragmentId, StreamFragmentEdge>> =
LazyLock::new(HashMap::new);

Expand Down
Loading

0 comments on commit ae48d9d

Please sign in to comment.