From 00d90c8f52ccbed991e294d4d556320e9fac153e Mon Sep 17 00:00:00 2001 From: xxchan Date: Wed, 8 Jan 2025 14:09:02 +0800 Subject: [PATCH] refactor(meta): splits source_manager into smaller mods Signed-off-by: xxchan --- src/meta/src/rpc/ddl_controller.rs | 4 +- src/meta/src/stream/source_manager.rs | 1131 +---------------- .../stream/source_manager/split_assignment.rs | 830 ++++++++++++ src/meta/src/stream/source_manager/worker.rs | 320 +++++ 4 files changed, 1159 insertions(+), 1126 deletions(-) create mode 100644 src/meta/src/stream/source_manager/split_assignment.rs create mode 100644 src/meta/src/stream/source_manager/worker.rs diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 70419e6118b13..5c203df91748f 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -75,7 +75,7 @@ use crate::manager::{ }; use crate::model::{StreamContext, StreamJobFragments, TableParallelism}; use crate::stream::{ - create_source_worker_handle, validate_sink, ActorGraphBuildResult, ActorGraphBuilder, + create_source_worker, validate_sink, ActorGraphBuildResult, ActorGraphBuilder, CompleteStreamFragmentGraph, CreateStreamingJobContext, CreateStreamingJobOption, GlobalStreamManagerRef, ReplaceStreamJobContext, SourceChange, SourceManagerRef, StreamFragmentGraph, @@ -441,7 +441,7 @@ impl DdlController { /// Shared source is handled in [`Self::create_streaming_job`] async fn create_non_shared_source(&self, source: Source) -> MetaResult { - let handle = create_source_worker_handle(&source, self.source_manager.metrics.clone()) + let handle = create_source_worker(&source, self.source_manager.metrics.clone()) .await .context("failed to create source worker")?; diff --git a/src/meta/src/stream/source_manager.rs b/src/meta/src/stream/source_manager.rs index 958ce8a802bfc..6d2ea50d0a5cd 100644 --- a/src/meta/src/stream/source_manager.rs +++ b/src/meta/src/stream/source_manager.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod split_assignment; +mod worker; use std::borrow::BorrowMut; use std::cmp::Ordering; use std::collections::hash_map::Entry; @@ -40,6 +42,8 @@ use tokio::sync::{oneshot, Mutex}; use tokio::task::JoinHandle; use tokio::time::MissedTickBehavior; use tokio::{select, time}; +pub use worker::create_source_worker; +use worker::{create_source_worker_async, ConnectorSourceWorkerHandle}; use crate::barrier::{BarrierScheduler, Command}; use crate::manager::MetadataManager; @@ -51,7 +55,7 @@ pub type SourceManagerRef = Arc; pub type SplitAssignment = HashMap>>; pub type ThrottleConfig = HashMap>>; -/// `SourceManager` keeps fetching the latest split metadata from the external source services ([`ConnectorSourceWorker::tick`]), +/// `SourceManager` keeps fetching the latest split metadata from the external source services ([`worker::ConnectorSourceWorker::tick`]), /// and sends a split assignment command if split changes detected ([`Self::tick`]). pub struct SourceManager { pub paused: Mutex<()>, @@ -59,226 +63,6 @@ pub struct SourceManager { core: Mutex, pub metrics: Arc, } - -const MAX_FAIL_CNT: u32 = 10; -const DEFAULT_SOURCE_TICK_TIMEOUT: Duration = Duration::from_secs(10); - -struct SharedSplitMap { - splits: Option>, -} - -type SharedSplitMapRef = Arc>; - -/// `ConnectorSourceWorker` keeps fetching the latest split metadata from the external source service ([`Self::tick`]), -/// and maintains it in `current_splits`. -struct ConnectorSourceWorker { - source_id: SourceId, - source_name: String, - current_splits: SharedSplitMapRef, - enumerator: P::SplitEnumerator, - period: Duration, - metrics: Arc, - connector_properties: P, - fail_cnt: u32, - source_is_up: LabelGuardedIntGauge<2>, -} - -fn extract_prop_from_existing_source(source: &Source) -> ConnectorResult { - let options_with_secret = - WithOptionsSecResolved::new(source.with_properties.clone(), source.secret_refs.clone()); - let mut properties = ConnectorProperties::extract(options_with_secret, false)?; - properties.init_from_pb_source(source); - Ok(properties) -} -fn extract_prop_from_new_source(source: &Source) -> ConnectorResult { - let options_with_secret = - WithOptionsSecResolved::new(source.with_properties.clone(), source.secret_refs.clone()); - let mut properties = ConnectorProperties::extract(options_with_secret, true)?; - properties.init_from_pb_source(source); - Ok(properties) -} - -/// Used to create a new `ConnectorSourceWorkerHandle` for a new source. -/// -/// It will call `ConnectorSourceWorker::tick()` to fetch split metadata once before returning. -pub async fn create_source_worker_handle( - source: &Source, - metrics: Arc, -) -> MetaResult { - tracing::info!("spawning new watcher for source {}", source.id); - - let splits = Arc::new(Mutex::new(SharedSplitMap { splits: None })); - let current_splits_ref = splits.clone(); - - let connector_properties = extract_prop_from_new_source(source)?; - let enable_scale_in = connector_properties.enable_drop_split(); - let enable_adaptive_splits = connector_properties.enable_adaptive_splits(); - let (sync_call_tx, sync_call_rx) = tokio::sync::mpsc::unbounded_channel(); - let handle = dispatch_source_prop!(connector_properties, prop, { - let mut worker = ConnectorSourceWorker::create( - source, - *prop, - DEFAULT_SOURCE_WORKER_TICK_INTERVAL, - current_splits_ref.clone(), - metrics, - ) - .await?; - - // if fail to fetch meta info, will refuse to create source - - // todo: make the timeout configurable, longer than `properties.sync.call.timeout` - // in kafka - tokio::time::timeout(DEFAULT_SOURCE_TICK_TIMEOUT, worker.tick()) - .await - .ok() - .with_context(|| { - format!( - "failed to fetch meta info for source {}, timeout {:?}", - source.id, DEFAULT_SOURCE_TICK_TIMEOUT - ) - })??; - - tokio::spawn(async move { worker.run(sync_call_rx).await }) - }); - Ok(ConnectorSourceWorkerHandle { - handle, - sync_call_tx, - splits, - enable_drop_split: enable_scale_in, - enable_adaptive_splits, - }) -} - -const DEFAULT_SOURCE_WORKER_TICK_INTERVAL: Duration = Duration::from_secs(30); - -impl ConnectorSourceWorker

{ - /// Recreate the `SplitEnumerator` to establish a new connection to the external source service. - async fn refresh(&mut self) -> MetaResult<()> { - let enumerator = P::SplitEnumerator::new( - self.connector_properties.clone(), - Arc::new(SourceEnumeratorContext { - metrics: self.metrics.source_enumerator_metrics.clone(), - info: SourceEnumeratorInfo { - source_id: self.source_id as u32, - }, - }), - ) - .await - .context("failed to create SplitEnumerator")?; - self.enumerator = enumerator; - self.fail_cnt = 0; - tracing::info!("refreshed source enumerator: {}", self.source_name); - Ok(()) - } - - /// On creation, connection to the external source service will be established, but `splits` - /// will not be updated until `tick` is called. - pub async fn create( - source: &Source, - connector_properties: P, - period: Duration, - splits: Arc>, - metrics: Arc, - ) -> MetaResult { - let enumerator = P::SplitEnumerator::new( - connector_properties.clone(), - Arc::new(SourceEnumeratorContext { - metrics: metrics.source_enumerator_metrics.clone(), - info: SourceEnumeratorInfo { - source_id: source.id, - }, - }), - ) - .await - .context("failed to create SplitEnumerator")?; - - let source_is_up = metrics - .source_is_up - .with_guarded_label_values(&[source.id.to_string().as_str(), &source.name]); - - Ok(Self { - source_id: source.id as SourceId, - source_name: source.name.clone(), - current_splits: splits, - enumerator, - period, - metrics, - connector_properties, - fail_cnt: 0, - source_is_up, - }) - } - - pub async fn run( - &mut self, - mut sync_call_rx: UnboundedReceiver>>, - ) { - let mut interval = time::interval(self.period); - interval.set_missed_tick_behavior(MissedTickBehavior::Skip); - loop { - select! { - biased; - tx = sync_call_rx.borrow_mut().recv() => { - if let Some(tx) = tx { - let _ = tx.send(self.tick().await); - } - } - _ = interval.tick() => { - if self.fail_cnt > MAX_FAIL_CNT { - if let Err(e) = self.refresh().await { - tracing::error!(error = %e.as_report(), "error happened when refresh from connector source worker"); - } - } - if let Err(e) = self.tick().await { - tracing::error!(error = %e.as_report(), "error happened when tick from connector source worker"); - } - } - } - } - } - - /// Uses [`SplitEnumerator`] to fetch the latest split metadata from the external source service. - async fn tick(&mut self) -> MetaResult<()> { - let source_is_up = |res: i64| { - self.source_is_up.set(res); - }; - let splits = self.enumerator.list_splits().await.inspect_err(|_| { - source_is_up(0); - self.fail_cnt += 1; - })?; - source_is_up(1); - self.fail_cnt = 0; - let mut current_splits = self.current_splits.lock().await; - current_splits.splits.replace( - splits - .into_iter() - .map(|split| (split.id(), P::Split::into(split))) - .collect(), - ); - - Ok(()) - } -} - -/// Handle for a running [`ConnectorSourceWorker`]. -pub struct ConnectorSourceWorkerHandle { - handle: JoinHandle<()>, - sync_call_tx: UnboundedSender>>, - splits: SharedSplitMapRef, - enable_drop_split: bool, - enable_adaptive_splits: bool, -} - -impl ConnectorSourceWorkerHandle { - async fn discovered_splits(&self) -> Option> { - self.splits.lock().await.splits.clone() - } - - pub fn get_enable_adaptive_splits(&self) -> bool { - self.enable_adaptive_splits - } -} - pub struct SourceManagerCore { metadata_manager: MetadataManager, @@ -300,36 +84,6 @@ pub struct SourceManagerRunningInfo { pub actor_splits: HashMap>, } -async fn handle_discover_splits( - handle: &ConnectorSourceWorkerHandle, - source_id: SourceId, - actors: &HashSet, -) -> MetaResult, SplitImpl>> { - let Some(mut discovered_splits) = handle.discovered_splits().await else { - tracing::info!( - "The discover loop for source {} is not ready yet; we'll wait for the next run", - source_id - ); - return Ok(BTreeMap::new()); - }; - if discovered_splits.is_empty() { - tracing::warn!("No splits discovered for source {}", source_id); - } - - if handle.enable_adaptive_splits { - // Connector supporting adaptive splits returns just one split, and we need to make the number of splits equal to the number of actors in this fragment. - // Because we Risingwave consume the splits statelessly and we do not need to keep the id internally, we always use actor_id as split_id. - // And prev splits record should be dropped via CN. - - debug_assert!(handle.enable_drop_split); - debug_assert!(discovered_splits.len() == 1); - discovered_splits = - fill_adaptive_split(discovered_splits.values().next().unwrap(), actors)?; - } - - Ok(discovered_splits) -} - impl SourceManagerCore { fn new( metadata_manager: MetadataManager, @@ -347,117 +101,6 @@ impl SourceManagerCore { } } - /// Checks whether the external source metadata has changed, - /// and re-assigns splits if there's a diff. - /// - /// `self.actor_splits` will not be updated. It will be updated by `Self::apply_source_change`, - /// after the mutation barrier has been collected. - async fn reassign_splits(&self) -> MetaResult> { - let mut split_assignment: SplitAssignment = HashMap::new(); - - 'loop_source: for (source_id, handle) in &self.managed_sources { - let source_fragment_ids = match self.source_fragments.get(source_id) { - Some(fragment_ids) if !fragment_ids.is_empty() => fragment_ids, - _ => { - continue; - } - }; - let backfill_fragment_ids = self.backfill_fragments.get(source_id); - - 'loop_fragment: for &fragment_id in source_fragment_ids { - let actors = match self - .metadata_manager - .get_running_actors_of_fragment(fragment_id) - .await - { - Ok(actors) => { - if actors.is_empty() { - tracing::warn!("No actors found for fragment {}", fragment_id); - continue 'loop_fragment; - } - actors - } - Err(err) => { - tracing::warn!(error = %err.as_report(), "Failed to get the actor of the fragment, maybe the fragment doesn't exist anymore"); - continue 'loop_fragment; - } - }; - - let discovered_splits = handle_discover_splits(handle, *source_id, &actors).await?; - if discovered_splits.is_empty() { - // The discover loop for this source is not ready yet; we'll wait for the next run - continue 'loop_source; - } - - let prev_actor_splits: HashMap<_, _> = actors - .into_iter() - .map(|actor_id| { - ( - actor_id, - self.actor_splits - .get(&actor_id) - .cloned() - .unwrap_or_default(), - ) - }) - .collect(); - - if let Some(new_assignment) = reassign_splits( - fragment_id, - prev_actor_splits, - &discovered_splits, - SplitDiffOptions { - enable_scale_in: handle.enable_drop_split, - enable_adaptive: handle.enable_adaptive_splits, - }, - ) { - split_assignment.insert(fragment_id, new_assignment); - } - } - - if let Some(backfill_fragment_ids) = backfill_fragment_ids { - // align splits for backfill fragments with its upstream source fragment - for (fragment_id, upstream_fragment_id) in backfill_fragment_ids { - let Some(upstream_assignment) = split_assignment.get(upstream_fragment_id) - else { - // upstream fragment unchanged, do not update backfill fragment too - continue; - }; - let actors = match self - .metadata_manager - .get_running_actors_for_source_backfill(*fragment_id) - .await - { - Ok(actors) => { - if actors.is_empty() { - tracing::warn!("No actors found for fragment {}", fragment_id); - continue; - } - actors - } - Err(err) => { - tracing::warn!(error = %err.as_report(),"Failed to get the actor of the fragment, maybe the fragment doesn't exist anymore"); - continue; - } - }; - split_assignment.insert( - *fragment_id, - align_splits( - actors, - upstream_assignment, - *fragment_id, - *upstream_fragment_id, - )?, - ); - } - } - } - - self.metadata_manager - .split_fragment_map_by_database(split_assignment) - .await - } - /// Updates states after all kinds of source change. pub fn apply_source_change(&mut self, source_change: SourceChange) { let mut added_source_fragments = Default::default(); @@ -593,203 +236,6 @@ impl SourceManagerCore { } } -/// Note: the `PartialEq` and `Ord` impl just compares the number of splits. -#[derive(Debug)] -struct ActorSplitsAssignment { - actor_id: ActorId, - splits: Vec, -} - -impl Eq for ActorSplitsAssignment {} - -impl PartialEq for ActorSplitsAssignment { - fn eq(&self, other: &Self) -> bool { - self.splits.len() == other.splits.len() - } -} - -impl PartialOrd for ActorSplitsAssignment { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -impl Ord for ActorSplitsAssignment { - fn cmp(&self, other: &Self) -> Ordering { - // Note: this is reversed order, to make BinaryHeap a min heap. - other.splits.len().cmp(&self.splits.len()) - } -} - -#[derive(Debug)] -struct SplitDiffOptions { - enable_scale_in: bool, - - /// For most connectors, this should be false. When enabled, RisingWave will not track any progress. - enable_adaptive: bool, -} - -#[allow(clippy::derivable_impls)] -impl Default for SplitDiffOptions { - fn default() -> Self { - SplitDiffOptions { - enable_scale_in: false, - enable_adaptive: false, - } - } -} - -/// Reassigns splits if there are new splits or dropped splits, -/// i.e., `actor_splits` and `discovered_splits` differ, or actors are rescheduled. -/// -/// The existing splits will remain unmoved in their currently assigned actor. -/// -/// If an actor has an upstream actor, it should be a backfill executor, -/// and its splits should be aligned with the upstream actor. **`reassign_splits` should not be used in this case. -/// Use [`align_splits`] instead.** -/// -/// - `fragment_id`: just for logging -/// -/// ## Different connectors' behavior of split change -/// -/// ### Kafka and Pulsar -/// They only support increasing the number of splits via adding new empty splits. -/// Old data is not moved. -/// -/// ### Kinesis -/// It supports *pairwise* shard split and merge. -/// -/// In both cases, old data remain in the old shard(s) and the old shard is still available. -/// New data are routed to the new shard(s). -/// After the retention period has expired, the old shard will become `EXPIRED` and isn't -/// listed any more. In other words, the total number of shards will first increase and then decrease. -/// -/// See also: -/// - [Kinesis resharding doc](https://docs.aws.amazon.com/streams/latest/dev/kinesis-using-sdk-java-after-resharding.html#kinesis-using-sdk-java-resharding-data-routing) -/// - An example of how the shards can be like: -fn reassign_splits( - fragment_id: FragmentId, - actor_splits: HashMap>, - discovered_splits: &BTreeMap, - opts: SplitDiffOptions, -) -> Option>> -where - T: SplitMetaData + Clone, -{ - // if no actors, return - if actor_splits.is_empty() { - return None; - } - - let prev_split_ids: HashSet<_> = actor_splits - .values() - .flat_map(|splits| splits.iter().map(SplitMetaData::id)) - .collect(); - - tracing::trace!(fragment_id, prev_split_ids = ?prev_split_ids, "previous splits"); - tracing::trace!(fragment_id, prev_split_ids = ?discovered_splits.keys(), "discovered splits"); - - let discovered_split_ids: HashSet<_> = discovered_splits.keys().cloned().collect(); - - let dropped_splits: HashSet<_> = prev_split_ids - .difference(&discovered_split_ids) - .cloned() - .collect(); - - if !dropped_splits.is_empty() { - if opts.enable_scale_in { - tracing::info!(fragment_id, dropped_spltis = ?dropped_splits, "new dropped splits"); - } else { - tracing::warn!(fragment_id, dropped_spltis = ?dropped_splits, "split dropping happened, but it is not allowed"); - } - } - - let new_discovered_splits: BTreeSet<_> = discovered_split_ids - .into_iter() - .filter(|split_id| !prev_split_ids.contains(split_id)) - .collect(); - - if opts.enable_scale_in || opts.enable_adaptive { - // if we support scale in, no more splits are discovered, and no splits are dropped, return - // we need to check if discovered_split_ids is empty, because if it is empty, we need to - // handle the case of scale in to zero (like deleting all objects from s3) - if dropped_splits.is_empty() - && new_discovered_splits.is_empty() - && !discovered_splits.is_empty() - { - return None; - } - } else { - // if we do not support scale in, and no more splits are discovered, return - if new_discovered_splits.is_empty() && !discovered_splits.is_empty() { - return None; - } - } - - tracing::info!(fragment_id, new_discovered_splits = ?new_discovered_splits, "new discovered splits"); - - let mut heap = BinaryHeap::with_capacity(actor_splits.len()); - - for (actor_id, mut splits) in actor_splits { - if opts.enable_scale_in || opts.enable_adaptive { - splits.retain(|split| !dropped_splits.contains(&split.id())); - } - - heap.push(ActorSplitsAssignment { actor_id, splits }) - } - - for split_id in new_discovered_splits { - // ActorSplitsAssignment's Ord is reversed, so this is min heap, i.e., - // we get the assignment with the least splits here. - - // Note: If multiple actors have the same number of splits, it will be randomly picked. - // When the number of source actors is larger than the number of splits, - // It's possible that the assignment is uneven. - // e.g., https://github.com/risingwavelabs/risingwave/issues/14324#issuecomment-1875033158 - // TODO: We should make the assignment rack-aware to make sure it's even. - let mut peek_ref = heap.peek_mut().unwrap(); - peek_ref - .splits - .push(discovered_splits.get(&split_id).cloned().unwrap()); - } - - Some( - heap.into_iter() - .map(|ActorSplitsAssignment { actor_id, splits }| (actor_id, splits)) - .collect(), - ) -} - -/// Assign splits to a new set of actors, according to existing assignment. -/// -/// illustration: -/// ```text -/// upstream new -/// actor x1 [split 1, split2] -> actor y1 [split 1, split2] -/// actor x2 [split 3] -> actor y2 [split 3] -/// ... -/// ``` -fn align_splits( - // (actor_id, upstream_actor_id) - aligned_actors: impl IntoIterator, - existing_assignment: &HashMap>, - fragment_id: FragmentId, - upstream_source_fragment_id: FragmentId, -) -> anyhow::Result>> { - aligned_actors - .into_iter() - .map(|(actor_id, upstream_actor_id)| { - let Some(splits) = existing_assignment.get(&upstream_actor_id) else { - return Err(anyhow::anyhow!("upstream assignment not found, fragment_id: {fragment_id}, upstream_fragment_id: {upstream_source_fragment_id}, actor_id: {actor_id}, upstream_assignment: {existing_assignment:?}, upstream_actor_id: {upstream_actor_id:?}")); - }; - Ok(( - actor_id, - splits.clone(), - )) - }) - .collect() -} - impl SourceManager { const DEFAULT_SOURCE_TICK_INTERVAL: Duration = Duration::from_secs(10); @@ -802,7 +248,7 @@ impl SourceManager { { let sources = metadata_manager.list_sources().await?; for source in sources { - Self::create_source_worker_async(source, &mut managed_sources, metrics.clone())? + create_source_worker_async(source, &mut managed_sources, metrics.clone())? } } @@ -904,283 +350,12 @@ impl SourceManager { core.apply_source_change(source_change); } - /// Migrates splits from previous actors to the new actors for a rescheduled fragment. - /// - /// Very occasionally split removal may happen during scaling, in which case we need to - /// use the old splits for reallocation instead of the latest splits (which may be missing), - /// so that we can resolve the split removal in the next command. - pub async fn migrate_splits_for_source_actors( - &self, - fragment_id: FragmentId, - prev_actor_ids: &[ActorId], - curr_actor_ids: &[ActorId], - ) -> MetaResult>> { - let core = self.core.lock().await; - - let prev_splits = prev_actor_ids - .iter() - .flat_map(|actor_id| core.actor_splits.get(actor_id).unwrap()) - .map(|split| (split.id(), split.clone())) - .collect(); - - let empty_actor_splits = curr_actor_ids - .iter() - .map(|actor_id| (*actor_id, vec![])) - .collect(); - - let diff = reassign_splits( - fragment_id, - empty_actor_splits, - &prev_splits, - // pre-allocate splits is the first time getting splits and it does not have scale-in scene - SplitDiffOptions::default(), - ) - .unwrap_or_default(); - - Ok(diff) - } - - /// Migrates splits from previous actors to the new actors for a rescheduled fragment. - pub fn migrate_splits_for_backfill_actors( - &self, - fragment_id: FragmentId, - upstream_source_fragment_id: FragmentId, - curr_actor_ids: &[ActorId], - fragment_actor_splits: &HashMap>>, - no_shuffle_upstream_actor_map: &HashMap>, - ) -> MetaResult>> { - // align splits for backfill fragments with its upstream source fragment - let actors = no_shuffle_upstream_actor_map - .iter() - .filter(|(id, _)| curr_actor_ids.contains(id)) - .map(|(id, upstream_fragment_actors)| { - ( - *id, - *upstream_fragment_actors - .get(&upstream_source_fragment_id) - .unwrap(), - ) - }); - let upstream_assignment = fragment_actor_splits - .get(&upstream_source_fragment_id) - .unwrap(); - tracing::info!( - fragment_id, - upstream_source_fragment_id, - ?upstream_assignment, - "migrate_splits_for_backfill_actors" - ); - Ok(align_splits( - actors, - upstream_assignment, - fragment_id, - upstream_source_fragment_id, - )?) - } - - /// Allocates splits to actors for a newly created source executor. - pub async fn allocate_splits(&self, job_id: &TableId) -> MetaResult { - let core = self.core.lock().await; - let table_fragments = core - .metadata_manager - .get_job_fragments_by_id(job_id) - .await?; - - let source_fragments = table_fragments.stream_source_fragments(); - - let mut assigned = HashMap::new(); - - 'loop_source: for (source_id, fragments) in source_fragments { - let handle = core - .managed_sources - .get(&source_id) - .with_context(|| format!("could not find source {}", source_id))?; - - if handle.splits.lock().await.splits.is_none() { - // force refresh source - let (tx, rx) = oneshot::channel(); - handle - .sync_call_tx - .send(tx) - .ok() - .context("failed to send sync call")?; - rx.await - .ok() - .context("failed to receive sync call response")??; - } - - for fragment_id in fragments { - let empty_actor_splits: HashMap> = table_fragments - .fragments - .get(&fragment_id) - .unwrap() - .actors - .iter() - .map(|actor| (actor.actor_id, vec![])) - .collect(); - let actor_hashset: HashSet = empty_actor_splits.keys().cloned().collect(); - let splits = handle_discover_splits(handle, source_id, &actor_hashset).await?; - if splits.is_empty() { - tracing::warn!("no splits detected for source {}", source_id); - continue 'loop_source; - } - - if let Some(diff) = reassign_splits( - fragment_id, - empty_actor_splits, - &splits, - SplitDiffOptions::default(), - ) { - assigned.insert(fragment_id, diff); - } - } - } - - Ok(assigned) - } - - /// Allocates splits to actors for replace source job. - pub async fn allocate_splits_for_replace_source( - &self, - job_id: &TableId, - merge_updates: &Vec, - ) -> MetaResult { - tracing::debug!(?merge_updates, "allocate_splits_for_replace_source"); - if merge_updates.is_empty() { - // no existing downstream. We can just re-allocate splits arbitrarily. - return self.allocate_splits(job_id).await; - } - - let core = self.core.lock().await; - let table_fragments = core - .metadata_manager - .get_job_fragments_by_id(job_id) - .await?; - - let source_fragments = table_fragments.stream_source_fragments(); - assert_eq!( - source_fragments.len(), - 1, - "replace source job should only have one source" - ); - let (_source_id, fragments) = source_fragments.into_iter().next().unwrap(); - assert_eq!( - fragments.len(), - 1, - "replace source job should only have one fragment" - ); - let fragment_id = fragments.into_iter().next().unwrap(); - - debug_assert!( - !merge_updates.is_empty() - && merge_updates.iter().all(|merge_update| { - merge_update.upstream_fragment_id == merge_updates[0].upstream_fragment_id - && merge_update.new_upstream_fragment_id == Some(fragment_id) - }), - "merge update should only replace one fragment: {:?}", - merge_updates - ); - let prev_fragment_id = merge_updates[0].upstream_fragment_id; - // Here we align the new source executor to backfill executors - // - // old_source => new_source backfill_1 - // actor_x1 => actor_y1 -----┬------>actor_a1 - // actor_x2 => actor_y2 -----┼-┬---->actor_a2 - // │ │ - // │ │ backfill_2 - // └─┼---->actor_b1 - // └---->actor_b2 - // - // Note: we can choose any backfill actor to align here. - // We use `HashMap` to dedup. - let aligned_actors: HashMap = merge_updates - .iter() - .map(|merge_update| { - assert_eq!(merge_update.added_upstream_actor_id.len(), 1); - // Note: removed_upstream_actor_id is not set for replace job, so we can't use it. - assert_eq!(merge_update.removed_upstream_actor_id.len(), 0); - ( - merge_update.added_upstream_actor_id[0], - merge_update.actor_id, - ) - }) - .collect(); - let assignment = align_splits( - aligned_actors.into_iter(), - &core.actor_splits, - fragment_id, - prev_fragment_id, - )?; - Ok(HashMap::from([(fragment_id, assignment)])) - } - - /// Allocates splits to actors for a newly created `SourceBackfill` executor. - /// - /// Unlike [`Self::allocate_splits`], which creates a new assignment, - /// this method aligns the splits for backfill fragments with its upstream source fragment ([`align_splits`]). - pub async fn allocate_splits_for_backfill( - &self, - table_id: &TableId, - // dispatchers from SourceExecutor to SourceBackfillExecutor - dispatchers: &HashMap>, - ) -> MetaResult { - let core = self.core.lock().await; - let table_fragments = core - .metadata_manager - .get_job_fragments_by_id(table_id) - .await?; - - let source_backfill_fragments = table_fragments.source_backfill_fragments()?; - - let mut assigned = HashMap::new(); - - for (_source_id, fragments) in source_backfill_fragments { - for (fragment_id, upstream_source_fragment_id) in fragments { - let upstream_actors = core - .metadata_manager - .get_running_actors_of_fragment(upstream_source_fragment_id) - .await?; - let mut backfill_actors = vec![]; - for upstream_actor in upstream_actors { - if let Some(dispatchers) = dispatchers.get(&upstream_actor) { - let err = || { - anyhow::anyhow!( - "source backfill fragment's upstream fragment should have one dispatcher, fragment_id: {fragment_id}, upstream_fragment_id: {upstream_source_fragment_id}, upstream_actor: {upstream_actor}, dispatchers: {dispatchers:?}", - fragment_id = fragment_id, - upstream_source_fragment_id = upstream_source_fragment_id, - upstream_actor = upstream_actor, - dispatchers = dispatchers - ) - }; - if dispatchers.len() != 1 || dispatchers[0].downstream_actor_id.len() != 1 { - return Err(err().into()); - } - - backfill_actors - .push((dispatchers[0].downstream_actor_id[0], upstream_actor)); - } - } - assigned.insert( - fragment_id, - align_splits( - backfill_actors, - &core.actor_splits, - fragment_id, - upstream_source_fragment_id, - )?, - ); - } - } - - Ok(assigned) - } - /// create and register connector worker for source. pub async fn register_source(&self, source: &Source) -> MetaResult<()> { tracing::debug!("register_source: {}", source.get_id()); let mut core = self.core.lock().await; if let Entry::Vacant(e) = core.managed_sources.entry(source.get_id() as _) { - let handle = create_source_worker_handle(source, self.metrics.clone()) + let handle = create_source_worker(source, self.metrics.clone()) .await .context("failed to create source worker")?; e.insert(handle); @@ -1204,66 +379,6 @@ impl SourceManager { } } - /// Used on startup ([`Self::new`]). Failed sources will not block meta startup. - fn create_source_worker_async( - source: Source, - managed_sources: &mut HashMap, - metrics: Arc, - ) -> MetaResult<()> { - tracing::info!("spawning new watcher for source {}", source.id); - - let splits = Arc::new(Mutex::new(SharedSplitMap { splits: None })); - let current_splits_ref = splits.clone(); - let source_id = source.id; - - let connector_properties = extract_prop_from_existing_source(&source)?; - - let enable_drop_split = connector_properties.enable_drop_split(); - let enable_adaptive_splits = connector_properties.enable_adaptive_splits(); - let (sync_call_tx, sync_call_rx) = tokio::sync::mpsc::unbounded_channel(); - let handle = tokio::spawn(async move { - let mut ticker = time::interval(Self::DEFAULT_SOURCE_TICK_INTERVAL); - ticker.set_missed_tick_behavior(MissedTickBehavior::Skip); - - dispatch_source_prop!(connector_properties, prop, { - let mut worker = loop { - ticker.tick().await; - - match ConnectorSourceWorker::create( - &source, - prop.deref().clone(), - DEFAULT_SOURCE_WORKER_TICK_INTERVAL, - current_splits_ref.clone(), - metrics.clone(), - ) - .await - { - Ok(worker) => { - break worker; - } - Err(e) => { - tracing::warn!(error = %e.as_report(), "failed to create source worker"); - } - } - }; - - worker.run(sync_call_rx).await - }); - }); - - managed_sources.insert( - source_id as SourceId, - ConnectorSourceWorkerHandle { - handle, - sync_call_tx, - splits, - enable_drop_split, - enable_adaptive_splits, - }, - ); - Ok(()) - } - pub async fn list_assignments(&self) -> HashMap> { let core = self.core.lock().await; core.actor_splits.clone() @@ -1384,235 +499,3 @@ pub fn build_actor_split_impls( }) .collect() } - -#[cfg(test)] -mod tests { - use std::collections::{BTreeMap, HashMap, HashSet}; - - use risingwave_common::types::JsonbVal; - use risingwave_connector::error::ConnectorResult; - use risingwave_connector::source::{SplitId, SplitMetaData}; - use serde::{Deserialize, Serialize}; - - use crate::model::{ActorId, FragmentId}; - use crate::stream::source_manager::{reassign_splits, SplitDiffOptions}; - - #[derive(Debug, Copy, Clone, Serialize, Deserialize)] - struct TestSplit { - id: u32, - } - - impl SplitMetaData for TestSplit { - fn id(&self) -> SplitId { - format!("{}", self.id).into() - } - - fn encode_to_json(&self) -> JsonbVal { - serde_json::to_value(*self).unwrap().into() - } - - fn restore_from_json(value: JsonbVal) -> ConnectorResult { - serde_json::from_value(value.take()).map_err(Into::into) - } - - fn update_offset(&mut self, _last_read_offset: String) -> ConnectorResult<()> { - Ok(()) - } - } - - fn check_all_splits( - discovered_splits: &BTreeMap, - diff: &HashMap>, - ) { - let mut split_ids: HashSet<_> = discovered_splits.keys().cloned().collect(); - - for splits in diff.values() { - for split in splits { - assert!(split_ids.remove(&split.id())) - } - } - - assert!(split_ids.is_empty()); - } - - #[test] - fn test_drop_splits() { - let mut actor_splits: HashMap = HashMap::new(); - actor_splits.insert(0, vec![TestSplit { id: 0 }, TestSplit { id: 1 }]); - actor_splits.insert(1, vec![TestSplit { id: 2 }, TestSplit { id: 3 }]); - actor_splits.insert(2, vec![TestSplit { id: 4 }, TestSplit { id: 5 }]); - - let mut prev_split_to_actor = HashMap::new(); - for (actor_id, splits) in &actor_splits { - for split in splits { - prev_split_to_actor.insert(split.id(), *actor_id); - } - } - - let discovered_splits: BTreeMap = (1..5) - .map(|i| { - let split = TestSplit { id: i }; - (split.id(), split) - }) - .collect(); - - let opts = SplitDiffOptions { - enable_scale_in: true, - enable_adaptive: false, - }; - - let prev_split_ids: HashSet<_> = actor_splits - .values() - .flat_map(|splits| splits.iter().map(|split| split.id())) - .collect(); - - let diff = reassign_splits( - FragmentId::default(), - actor_splits, - &discovered_splits, - opts, - ) - .unwrap(); - check_all_splits(&discovered_splits, &diff); - - let mut after_split_to_actor = HashMap::new(); - for (actor_id, splits) in &diff { - for split in splits { - after_split_to_actor.insert(split.id(), *actor_id); - } - } - - let discovered_split_ids: HashSet<_> = discovered_splits.keys().cloned().collect(); - - let retained_split_ids: HashSet<_> = - prev_split_ids.intersection(&discovered_split_ids).collect(); - - for retained_split_id in retained_split_ids { - assert_eq!( - prev_split_to_actor.get(retained_split_id), - after_split_to_actor.get(retained_split_id) - ) - } - } - - #[test] - fn test_drop_splits_to_empty() { - let mut actor_splits: HashMap = HashMap::new(); - actor_splits.insert(0, vec![TestSplit { id: 0 }]); - - let discovered_splits: BTreeMap = BTreeMap::new(); - - let opts = SplitDiffOptions { - enable_scale_in: true, - enable_adaptive: false, - }; - - let diff = reassign_splits( - FragmentId::default(), - actor_splits, - &discovered_splits, - opts, - ) - .unwrap(); - - assert!(!diff.is_empty()) - } - - #[test] - fn test_reassign_splits() { - let actor_splits = HashMap::new(); - let discovered_splits: BTreeMap = BTreeMap::new(); - assert!(reassign_splits( - FragmentId::default(), - actor_splits, - &discovered_splits, - Default::default() - ) - .is_none()); - - let actor_splits = (0..3).map(|i| (i, vec![])).collect(); - let discovered_splits: BTreeMap = BTreeMap::new(); - let diff = reassign_splits( - FragmentId::default(), - actor_splits, - &discovered_splits, - Default::default(), - ) - .unwrap(); - assert_eq!(diff.len(), 3); - for splits in diff.values() { - assert!(splits.is_empty()) - } - - let actor_splits = (0..3).map(|i| (i, vec![])).collect(); - let discovered_splits: BTreeMap = (0..3) - .map(|i| { - let split = TestSplit { id: i }; - (split.id(), split) - }) - .collect(); - - let diff = reassign_splits( - FragmentId::default(), - actor_splits, - &discovered_splits, - Default::default(), - ) - .unwrap(); - assert_eq!(diff.len(), 3); - for splits in diff.values() { - assert_eq!(splits.len(), 1); - } - - check_all_splits(&discovered_splits, &diff); - - let actor_splits = (0..3).map(|i| (i, vec![TestSplit { id: i }])).collect(); - let discovered_splits: BTreeMap = (0..5) - .map(|i| { - let split = TestSplit { id: i }; - (split.id(), split) - }) - .collect(); - - let diff = reassign_splits( - FragmentId::default(), - actor_splits, - &discovered_splits, - Default::default(), - ) - .unwrap(); - assert_eq!(diff.len(), 3); - for splits in diff.values() { - let len = splits.len(); - assert!(len == 1 || len == 2); - } - - check_all_splits(&discovered_splits, &diff); - - let mut actor_splits: HashMap> = - (0..3).map(|i| (i, vec![TestSplit { id: i }])).collect(); - actor_splits.insert(3, vec![]); - actor_splits.insert(4, vec![]); - - let discovered_splits: BTreeMap = (0..5) - .map(|i| { - let split = TestSplit { id: i }; - (split.id(), split) - }) - .collect(); - - let diff = reassign_splits( - FragmentId::default(), - actor_splits, - &discovered_splits, - Default::default(), - ) - .unwrap(); - assert_eq!(diff.len(), 5); - for splits in diff.values() { - assert_eq!(splits.len(), 1); - } - - check_all_splits(&discovered_splits, &diff); - } -} diff --git a/src/meta/src/stream/source_manager/split_assignment.rs b/src/meta/src/stream/source_manager/split_assignment.rs new file mode 100644 index 0000000000000..e36c1198bfc96 --- /dev/null +++ b/src/meta/src/stream/source_manager/split_assignment.rs @@ -0,0 +1,830 @@ +// Copyright 2025 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use super::*; + +impl SourceManager { + /// Migrates splits from previous actors to the new actors for a rescheduled fragment. + /// + /// Very occasionally split removal may happen during scaling, in which case we need to + /// use the old splits for reallocation instead of the latest splits (which may be missing), + /// so that we can resolve the split removal in the next command. + pub async fn migrate_splits_for_source_actors( + &self, + fragment_id: FragmentId, + prev_actor_ids: &[ActorId], + curr_actor_ids: &[ActorId], + ) -> MetaResult>> { + let core = self.core.lock().await; + + let prev_splits = prev_actor_ids + .iter() + .flat_map(|actor_id| core.actor_splits.get(actor_id).unwrap()) + .map(|split| (split.id(), split.clone())) + .collect(); + + let empty_actor_splits = curr_actor_ids + .iter() + .map(|actor_id| (*actor_id, vec![])) + .collect(); + + let diff = reassign_splits( + fragment_id, + empty_actor_splits, + &prev_splits, + // pre-allocate splits is the first time getting splits and it does not have scale-in scene + SplitDiffOptions::default(), + ) + .unwrap_or_default(); + + Ok(diff) + } + + /// Migrates splits from previous actors to the new actors for a rescheduled fragment. + pub fn migrate_splits_for_backfill_actors( + &self, + fragment_id: FragmentId, + upstream_source_fragment_id: FragmentId, + curr_actor_ids: &[ActorId], + fragment_actor_splits: &HashMap>>, + no_shuffle_upstream_actor_map: &HashMap>, + ) -> MetaResult>> { + // align splits for backfill fragments with its upstream source fragment + let actors = no_shuffle_upstream_actor_map + .iter() + .filter(|(id, _)| curr_actor_ids.contains(id)) + .map(|(id, upstream_fragment_actors)| { + ( + *id, + *upstream_fragment_actors + .get(&upstream_source_fragment_id) + .unwrap(), + ) + }); + let upstream_assignment = fragment_actor_splits + .get(&upstream_source_fragment_id) + .unwrap(); + tracing::info!( + fragment_id, + upstream_source_fragment_id, + ?upstream_assignment, + "migrate_splits_for_backfill_actors" + ); + Ok(align_splits( + actors, + upstream_assignment, + fragment_id, + upstream_source_fragment_id, + )?) + } + + /// Allocates splits to actors for a newly created source executor. + pub async fn allocate_splits(&self, job_id: &TableId) -> MetaResult { + let core = self.core.lock().await; + let table_fragments = core + .metadata_manager + .get_job_fragments_by_id(job_id) + .await?; + + let source_fragments = table_fragments.stream_source_fragments(); + + let mut assigned = HashMap::new(); + + 'loop_source: for (source_id, fragments) in source_fragments { + let handle = core + .managed_sources + .get(&source_id) + .with_context(|| format!("could not find source {}", source_id))?; + + if handle.splits.lock().await.splits.is_none() { + // force refresh source + let (tx, rx) = oneshot::channel(); + handle + .sync_call_tx + .send(tx) + .ok() + .context("failed to send sync call")?; + rx.await + .ok() + .context("failed to receive sync call response")??; + } + + for fragment_id in fragments { + let empty_actor_splits: HashMap> = table_fragments + .fragments + .get(&fragment_id) + .unwrap() + .actors + .iter() + .map(|actor| (actor.actor_id, vec![])) + .collect(); + let actor_hashset: HashSet = empty_actor_splits.keys().cloned().collect(); + let splits = handle.discovered_splits(source_id, &actor_hashset).await?; + if splits.is_empty() { + tracing::warn!("no splits detected for source {}", source_id); + continue 'loop_source; + } + + if let Some(diff) = reassign_splits( + fragment_id, + empty_actor_splits, + &splits, + SplitDiffOptions::default(), + ) { + assigned.insert(fragment_id, diff); + } + } + } + + Ok(assigned) + } + + /// Allocates splits to actors for replace source job. + pub async fn allocate_splits_for_replace_source( + &self, + job_id: &TableId, + merge_updates: &Vec, + ) -> MetaResult { + tracing::debug!(?merge_updates, "allocate_splits_for_replace_source"); + if merge_updates.is_empty() { + // no existing downstream. We can just re-allocate splits arbitrarily. + return self.allocate_splits(job_id).await; + } + + let core = self.core.lock().await; + let table_fragments = core + .metadata_manager + .get_job_fragments_by_id(job_id) + .await?; + + let source_fragments = table_fragments.stream_source_fragments(); + assert_eq!( + source_fragments.len(), + 1, + "replace source job should only have one source" + ); + let (_source_id, fragments) = source_fragments.into_iter().next().unwrap(); + assert_eq!( + fragments.len(), + 1, + "replace source job should only have one fragment" + ); + let fragment_id = fragments.into_iter().next().unwrap(); + + debug_assert!( + !merge_updates.is_empty() + && merge_updates.iter().all(|merge_update| { + merge_update.upstream_fragment_id == merge_updates[0].upstream_fragment_id + && merge_update.new_upstream_fragment_id == Some(fragment_id) + }), + "merge update should only replace one fragment: {:?}", + merge_updates + ); + let prev_fragment_id = merge_updates[0].upstream_fragment_id; + // Here we align the new source executor to backfill executors + // + // old_source => new_source backfill_1 + // actor_x1 => actor_y1 -----┬------>actor_a1 + // actor_x2 => actor_y2 -----┼-┬---->actor_a2 + // │ │ + // │ │ backfill_2 + // └─┼---->actor_b1 + // └---->actor_b2 + // + // Note: we can choose any backfill actor to align here. + // We use `HashMap` to dedup. + let aligned_actors: HashMap = merge_updates + .iter() + .map(|merge_update| { + assert_eq!(merge_update.added_upstream_actor_id.len(), 1); + // Note: removed_upstream_actor_id is not set for replace job, so we can't use it. + assert_eq!(merge_update.removed_upstream_actor_id.len(), 0); + ( + merge_update.added_upstream_actor_id[0], + merge_update.actor_id, + ) + }) + .collect(); + let assignment = align_splits( + aligned_actors.into_iter(), + &core.actor_splits, + fragment_id, + prev_fragment_id, + )?; + Ok(HashMap::from([(fragment_id, assignment)])) + } + + /// Allocates splits to actors for a newly created `SourceBackfill` executor. + /// + /// Unlike [`Self::allocate_splits`], which creates a new assignment, + /// this method aligns the splits for backfill fragments with its upstream source fragment ([`align_splits`]). + pub async fn allocate_splits_for_backfill( + &self, + table_id: &TableId, + // dispatchers from SourceExecutor to SourceBackfillExecutor + dispatchers: &HashMap>, + ) -> MetaResult { + let core = self.core.lock().await; + let table_fragments = core + .metadata_manager + .get_job_fragments_by_id(table_id) + .await?; + + let source_backfill_fragments = table_fragments.source_backfill_fragments()?; + + let mut assigned = HashMap::new(); + + for (_source_id, fragments) in source_backfill_fragments { + for (fragment_id, upstream_source_fragment_id) in fragments { + let upstream_actors = core + .metadata_manager + .get_running_actors_of_fragment(upstream_source_fragment_id) + .await?; + let mut backfill_actors = vec![]; + for upstream_actor in upstream_actors { + if let Some(dispatchers) = dispatchers.get(&upstream_actor) { + let err = || { + anyhow::anyhow!( + "source backfill fragment's upstream fragment should have one dispatcher, fragment_id: {fragment_id}, upstream_fragment_id: {upstream_source_fragment_id}, upstream_actor: {upstream_actor}, dispatchers: {dispatchers:?}", + fragment_id = fragment_id, + upstream_source_fragment_id = upstream_source_fragment_id, + upstream_actor = upstream_actor, + dispatchers = dispatchers + ) + }; + if dispatchers.len() != 1 || dispatchers[0].downstream_actor_id.len() != 1 { + return Err(err().into()); + } + + backfill_actors + .push((dispatchers[0].downstream_actor_id[0], upstream_actor)); + } + } + assigned.insert( + fragment_id, + align_splits( + backfill_actors, + &core.actor_splits, + fragment_id, + upstream_source_fragment_id, + )?, + ); + } + } + + Ok(assigned) + } +} + +impl SourceManagerCore { + /// Checks whether the external source metadata has changed, + /// and re-assigns splits if there's a diff. + /// + /// `self.actor_splits` will not be updated. It will be updated by `Self::apply_source_change`, + /// after the mutation barrier has been collected. + pub async fn reassign_splits(&self) -> MetaResult> { + let mut split_assignment: SplitAssignment = HashMap::new(); + + 'loop_source: for (source_id, handle) in &self.managed_sources { + let source_fragment_ids = match self.source_fragments.get(source_id) { + Some(fragment_ids) if !fragment_ids.is_empty() => fragment_ids, + _ => { + continue; + } + }; + let backfill_fragment_ids = self.backfill_fragments.get(source_id); + + 'loop_fragment: for &fragment_id in source_fragment_ids { + let actors = match self + .metadata_manager + .get_running_actors_of_fragment(fragment_id) + .await + { + Ok(actors) => { + if actors.is_empty() { + tracing::warn!("No actors found for fragment {}", fragment_id); + continue 'loop_fragment; + } + actors + } + Err(err) => { + tracing::warn!(error = %err.as_report(), "Failed to get the actor of the fragment, maybe the fragment doesn't exist anymore"); + continue 'loop_fragment; + } + }; + + let discovered_splits = handle.discovered_splits(*source_id, &actors).await?; + if discovered_splits.is_empty() { + // The discover loop for this source is not ready yet; we'll wait for the next run + continue 'loop_source; + } + + let prev_actor_splits: HashMap<_, _> = actors + .into_iter() + .map(|actor_id| { + ( + actor_id, + self.actor_splits + .get(&actor_id) + .cloned() + .unwrap_or_default(), + ) + }) + .collect(); + + if let Some(new_assignment) = reassign_splits( + fragment_id, + prev_actor_splits, + &discovered_splits, + SplitDiffOptions { + enable_scale_in: handle.enable_drop_split, + enable_adaptive: handle.enable_adaptive_splits, + }, + ) { + split_assignment.insert(fragment_id, new_assignment); + } + } + + if let Some(backfill_fragment_ids) = backfill_fragment_ids { + // align splits for backfill fragments with its upstream source fragment + for (fragment_id, upstream_fragment_id) in backfill_fragment_ids { + let Some(upstream_assignment) = split_assignment.get(upstream_fragment_id) + else { + // upstream fragment unchanged, do not update backfill fragment too + continue; + }; + let actors = match self + .metadata_manager + .get_running_actors_for_source_backfill(*fragment_id) + .await + { + Ok(actors) => { + if actors.is_empty() { + tracing::warn!("No actors found for fragment {}", fragment_id); + continue; + } + actors + } + Err(err) => { + tracing::warn!(error = %err.as_report(),"Failed to get the actor of the fragment, maybe the fragment doesn't exist anymore"); + continue; + } + }; + split_assignment.insert( + *fragment_id, + align_splits( + actors, + upstream_assignment, + *fragment_id, + *upstream_fragment_id, + )?, + ); + } + } + } + + self.metadata_manager + .split_fragment_map_by_database(split_assignment) + .await + } +} + +/// Reassigns splits if there are new splits or dropped splits, +/// i.e., `actor_splits` and `discovered_splits` differ, or actors are rescheduled. +/// +/// The existing splits will remain unmoved in their currently assigned actor. +/// +/// If an actor has an upstream actor, it should be a backfill executor, +/// and its splits should be aligned with the upstream actor. **`reassign_splits` should not be used in this case. +/// Use [`align_splits`] instead.** +/// +/// - `fragment_id`: just for logging +/// +/// ## Different connectors' behavior of split change +/// +/// ### Kafka and Pulsar +/// They only support increasing the number of splits via adding new empty splits. +/// Old data is not moved. +/// +/// ### Kinesis +/// It supports *pairwise* shard split and merge. +/// +/// In both cases, old data remain in the old shard(s) and the old shard is still available. +/// New data are routed to the new shard(s). +/// After the retention period has expired, the old shard will become `EXPIRED` and isn't +/// listed any more. In other words, the total number of shards will first increase and then decrease. +/// +/// See also: +/// - [Kinesis resharding doc](https://docs.aws.amazon.com/streams/latest/dev/kinesis-using-sdk-java-after-resharding.html#kinesis-using-sdk-java-resharding-data-routing) +/// - An example of how the shards can be like: +fn reassign_splits( + fragment_id: FragmentId, + actor_splits: HashMap>, + discovered_splits: &BTreeMap, + opts: SplitDiffOptions, +) -> Option>> +where + T: SplitMetaData + Clone, +{ + // if no actors, return + if actor_splits.is_empty() { + return None; + } + + let prev_split_ids: HashSet<_> = actor_splits + .values() + .flat_map(|splits| splits.iter().map(SplitMetaData::id)) + .collect(); + + tracing::trace!(fragment_id, prev_split_ids = ?prev_split_ids, "previous splits"); + tracing::trace!(fragment_id, prev_split_ids = ?discovered_splits.keys(), "discovered splits"); + + let discovered_split_ids: HashSet<_> = discovered_splits.keys().cloned().collect(); + + let dropped_splits: HashSet<_> = prev_split_ids + .difference(&discovered_split_ids) + .cloned() + .collect(); + + if !dropped_splits.is_empty() { + if opts.enable_scale_in { + tracing::info!(fragment_id, dropped_spltis = ?dropped_splits, "new dropped splits"); + } else { + tracing::warn!(fragment_id, dropped_spltis = ?dropped_splits, "split dropping happened, but it is not allowed"); + } + } + + let new_discovered_splits: BTreeSet<_> = discovered_split_ids + .into_iter() + .filter(|split_id| !prev_split_ids.contains(split_id)) + .collect(); + + if opts.enable_scale_in || opts.enable_adaptive { + // if we support scale in, no more splits are discovered, and no splits are dropped, return + // we need to check if discovered_split_ids is empty, because if it is empty, we need to + // handle the case of scale in to zero (like deleting all objects from s3) + if dropped_splits.is_empty() + && new_discovered_splits.is_empty() + && !discovered_splits.is_empty() + { + return None; + } + } else { + // if we do not support scale in, and no more splits are discovered, return + if new_discovered_splits.is_empty() && !discovered_splits.is_empty() { + return None; + } + } + + tracing::info!(fragment_id, new_discovered_splits = ?new_discovered_splits, "new discovered splits"); + + let mut heap = BinaryHeap::with_capacity(actor_splits.len()); + + for (actor_id, mut splits) in actor_splits { + if opts.enable_scale_in || opts.enable_adaptive { + splits.retain(|split| !dropped_splits.contains(&split.id())); + } + + heap.push(ActorSplitsAssignment { actor_id, splits }) + } + + for split_id in new_discovered_splits { + // ActorSplitsAssignment's Ord is reversed, so this is min heap, i.e., + // we get the assignment with the least splits here. + + // Note: If multiple actors have the same number of splits, it will be randomly picked. + // When the number of source actors is larger than the number of splits, + // It's possible that the assignment is uneven. + // e.g., https://github.com/risingwavelabs/risingwave/issues/14324#issuecomment-1875033158 + // TODO: We should make the assignment rack-aware to make sure it's even. + let mut peek_ref = heap.peek_mut().unwrap(); + peek_ref + .splits + .push(discovered_splits.get(&split_id).cloned().unwrap()); + } + + Some( + heap.into_iter() + .map(|ActorSplitsAssignment { actor_id, splits }| (actor_id, splits)) + .collect(), + ) +} + +/// Assign splits to a new set of actors, according to existing assignment. +/// +/// illustration: +/// ```text +/// upstream new +/// actor x1 [split 1, split2] -> actor y1 [split 1, split2] +/// actor x2 [split 3] -> actor y2 [split 3] +/// ... +/// ``` +fn align_splits( + // (actor_id, upstream_actor_id) + aligned_actors: impl IntoIterator, + existing_assignment: &HashMap>, + fragment_id: FragmentId, + upstream_source_fragment_id: FragmentId, +) -> anyhow::Result>> { + aligned_actors + .into_iter() + .map(|(actor_id, upstream_actor_id)| { + let Some(splits) = existing_assignment.get(&upstream_actor_id) else { + return Err(anyhow::anyhow!("upstream assignment not found, fragment_id: {fragment_id}, upstream_fragment_id: {upstream_source_fragment_id}, actor_id: {actor_id}, upstream_assignment: {existing_assignment:?}, upstream_actor_id: {upstream_actor_id:?}")); + }; + Ok(( + actor_id, + splits.clone(), + )) + }) + .collect() +} + +/// Note: the `PartialEq` and `Ord` impl just compares the number of splits. +#[derive(Debug)] +struct ActorSplitsAssignment { + actor_id: ActorId, + splits: Vec, +} + +impl Eq for ActorSplitsAssignment {} + +impl PartialEq for ActorSplitsAssignment { + fn eq(&self, other: &Self) -> bool { + self.splits.len() == other.splits.len() + } +} + +impl PartialOrd for ActorSplitsAssignment { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for ActorSplitsAssignment { + fn cmp(&self, other: &Self) -> Ordering { + // Note: this is reversed order, to make BinaryHeap a min heap. + other.splits.len().cmp(&self.splits.len()) + } +} + +#[derive(Debug)] +pub struct SplitDiffOptions { + pub enable_scale_in: bool, + + /// For most connectors, this should be false. When enabled, RisingWave will not track any progress. + pub enable_adaptive: bool, +} + +#[allow(clippy::derivable_impls)] +impl Default for SplitDiffOptions { + fn default() -> Self { + SplitDiffOptions { + enable_scale_in: false, + enable_adaptive: false, + } + } +} + +#[cfg(test)] +mod tests { + use std::collections::{BTreeMap, HashMap, HashSet}; + + use risingwave_common::types::JsonbVal; + use risingwave_connector::error::ConnectorResult; + use risingwave_connector::source::{SplitId, SplitMetaData}; + use serde::{Deserialize, Serialize}; + + use super::*; + use crate::model::{ActorId, FragmentId}; + + #[derive(Debug, Copy, Clone, Serialize, Deserialize)] + struct TestSplit { + id: u32, + } + + impl SplitMetaData for TestSplit { + fn id(&self) -> SplitId { + format!("{}", self.id).into() + } + + fn encode_to_json(&self) -> JsonbVal { + serde_json::to_value(*self).unwrap().into() + } + + fn restore_from_json(value: JsonbVal) -> ConnectorResult { + serde_json::from_value(value.take()).map_err(Into::into) + } + + fn update_offset(&mut self, _last_read_offset: String) -> ConnectorResult<()> { + Ok(()) + } + } + + fn check_all_splits( + discovered_splits: &BTreeMap, + diff: &HashMap>, + ) { + let mut split_ids: HashSet<_> = discovered_splits.keys().cloned().collect(); + + for splits in diff.values() { + for split in splits { + assert!(split_ids.remove(&split.id())) + } + } + + assert!(split_ids.is_empty()); + } + + #[test] + fn test_drop_splits() { + let mut actor_splits: HashMap = HashMap::new(); + actor_splits.insert(0, vec![TestSplit { id: 0 }, TestSplit { id: 1 }]); + actor_splits.insert(1, vec![TestSplit { id: 2 }, TestSplit { id: 3 }]); + actor_splits.insert(2, vec![TestSplit { id: 4 }, TestSplit { id: 5 }]); + + let mut prev_split_to_actor = HashMap::new(); + for (actor_id, splits) in &actor_splits { + for split in splits { + prev_split_to_actor.insert(split.id(), *actor_id); + } + } + + let discovered_splits: BTreeMap = (1..5) + .map(|i| { + let split = TestSplit { id: i }; + (split.id(), split) + }) + .collect(); + + let opts = SplitDiffOptions { + enable_scale_in: true, + enable_adaptive: false, + }; + + let prev_split_ids: HashSet<_> = actor_splits + .values() + .flat_map(|splits| splits.iter().map(|split| split.id())) + .collect(); + + let diff = reassign_splits( + FragmentId::default(), + actor_splits, + &discovered_splits, + opts, + ) + .unwrap(); + check_all_splits(&discovered_splits, &diff); + + let mut after_split_to_actor = HashMap::new(); + for (actor_id, splits) in &diff { + for split in splits { + after_split_to_actor.insert(split.id(), *actor_id); + } + } + + let discovered_split_ids: HashSet<_> = discovered_splits.keys().cloned().collect(); + + let retained_split_ids: HashSet<_> = + prev_split_ids.intersection(&discovered_split_ids).collect(); + + for retained_split_id in retained_split_ids { + assert_eq!( + prev_split_to_actor.get(retained_split_id), + after_split_to_actor.get(retained_split_id) + ) + } + } + + #[test] + fn test_drop_splits_to_empty() { + let mut actor_splits: HashMap = HashMap::new(); + actor_splits.insert(0, vec![TestSplit { id: 0 }]); + + let discovered_splits: BTreeMap = BTreeMap::new(); + + let opts = SplitDiffOptions { + enable_scale_in: true, + enable_adaptive: false, + }; + + let diff = reassign_splits( + FragmentId::default(), + actor_splits, + &discovered_splits, + opts, + ) + .unwrap(); + + assert!(!diff.is_empty()) + } + + #[test] + fn test_reassign_splits() { + let actor_splits = HashMap::new(); + let discovered_splits: BTreeMap = BTreeMap::new(); + assert!(reassign_splits( + FragmentId::default(), + actor_splits, + &discovered_splits, + Default::default() + ) + .is_none()); + + let actor_splits = (0..3).map(|i| (i, vec![])).collect(); + let discovered_splits: BTreeMap = BTreeMap::new(); + let diff = reassign_splits( + FragmentId::default(), + actor_splits, + &discovered_splits, + Default::default(), + ) + .unwrap(); + assert_eq!(diff.len(), 3); + for splits in diff.values() { + assert!(splits.is_empty()) + } + + let actor_splits = (0..3).map(|i| (i, vec![])).collect(); + let discovered_splits: BTreeMap = (0..3) + .map(|i| { + let split = TestSplit { id: i }; + (split.id(), split) + }) + .collect(); + + let diff = reassign_splits( + FragmentId::default(), + actor_splits, + &discovered_splits, + Default::default(), + ) + .unwrap(); + assert_eq!(diff.len(), 3); + for splits in diff.values() { + assert_eq!(splits.len(), 1); + } + + check_all_splits(&discovered_splits, &diff); + + let actor_splits = (0..3).map(|i| (i, vec![TestSplit { id: i }])).collect(); + let discovered_splits: BTreeMap = (0..5) + .map(|i| { + let split = TestSplit { id: i }; + (split.id(), split) + }) + .collect(); + + let diff = reassign_splits( + FragmentId::default(), + actor_splits, + &discovered_splits, + Default::default(), + ) + .unwrap(); + assert_eq!(diff.len(), 3); + for splits in diff.values() { + let len = splits.len(); + assert!(len == 1 || len == 2); + } + + check_all_splits(&discovered_splits, &diff); + + let mut actor_splits: HashMap> = + (0..3).map(|i| (i, vec![TestSplit { id: i }])).collect(); + actor_splits.insert(3, vec![]); + actor_splits.insert(4, vec![]); + + let discovered_splits: BTreeMap = (0..5) + .map(|i| { + let split = TestSplit { id: i }; + (split.id(), split) + }) + .collect(); + + let diff = reassign_splits( + FragmentId::default(), + actor_splits, + &discovered_splits, + Default::default(), + ) + .unwrap(); + assert_eq!(diff.len(), 5); + for splits in diff.values() { + assert_eq!(splits.len(), 1); + } + + check_all_splits(&discovered_splits, &diff); + } +} diff --git a/src/meta/src/stream/source_manager/worker.rs b/src/meta/src/stream/source_manager/worker.rs new file mode 100644 index 0000000000000..6f6d647ffb78f --- /dev/null +++ b/src/meta/src/stream/source_manager/worker.rs @@ -0,0 +1,320 @@ +// Copyright 2025 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use super::*; + +const MAX_FAIL_CNT: u32 = 10; +const DEFAULT_SOURCE_TICK_TIMEOUT: Duration = Duration::from_secs(10); + +pub struct SharedSplitMap { + pub splits: Option>, +} + +type SharedSplitMapRef = Arc>; + +/// `ConnectorSourceWorker` keeps fetching the latest split metadata from the external source service ([`Self::tick`]), +/// and maintains it in `current_splits`. +pub struct ConnectorSourceWorker { + source_id: SourceId, + source_name: String, + current_splits: SharedSplitMapRef, + enumerator: P::SplitEnumerator, + period: Duration, + metrics: Arc, + connector_properties: P, + fail_cnt: u32, + source_is_up: LabelGuardedIntGauge<2>, +} + +fn extract_prop_from_existing_source(source: &Source) -> ConnectorResult { + let options_with_secret = + WithOptionsSecResolved::new(source.with_properties.clone(), source.secret_refs.clone()); + let mut properties = ConnectorProperties::extract(options_with_secret, false)?; + properties.init_from_pb_source(source); + Ok(properties) +} +fn extract_prop_from_new_source(source: &Source) -> ConnectorResult { + let options_with_secret = + WithOptionsSecResolved::new(source.with_properties.clone(), source.secret_refs.clone()); + let mut properties = ConnectorProperties::extract(options_with_secret, true)?; + properties.init_from_pb_source(source); + Ok(properties) +} + +/// Used to create a new [`ConnectorSourceWorkerHandle`] for a new source. +/// +/// It will call [`ConnectorSourceWorker::tick()`] to fetch split metadata once before returning. +pub async fn create_source_worker( + source: &Source, + metrics: Arc, +) -> MetaResult { + tracing::info!("spawning new watcher for source {}", source.id); + + let splits = Arc::new(Mutex::new(SharedSplitMap { splits: None })); + let current_splits_ref = splits.clone(); + + let connector_properties = extract_prop_from_new_source(source)?; + let enable_scale_in = connector_properties.enable_drop_split(); + let enable_adaptive_splits = connector_properties.enable_adaptive_splits(); + let (sync_call_tx, sync_call_rx) = tokio::sync::mpsc::unbounded_channel(); + let handle = dispatch_source_prop!(connector_properties, prop, { + let mut worker = ConnectorSourceWorker::create( + source, + *prop, + DEFAULT_SOURCE_WORKER_TICK_INTERVAL, + current_splits_ref.clone(), + metrics, + ) + .await?; + + // if fail to fetch meta info, will refuse to create source + + // todo: make the timeout configurable, longer than `properties.sync.call.timeout` + // in kafka + tokio::time::timeout(DEFAULT_SOURCE_TICK_TIMEOUT, worker.tick()) + .await + .ok() + .with_context(|| { + format!( + "failed to fetch meta info for source {}, timeout {:?}", + source.id, DEFAULT_SOURCE_TICK_TIMEOUT + ) + })??; + + tokio::spawn(async move { worker.run(sync_call_rx).await }) + }); + Ok(ConnectorSourceWorkerHandle { + handle, + sync_call_tx, + splits, + enable_drop_split: enable_scale_in, + enable_adaptive_splits, + }) +} + +/// Used on startup ([`SourceManager::new`]). Failed sources will not block meta startup. +pub fn create_source_worker_async( + source: Source, + managed_sources: &mut HashMap, + metrics: Arc, +) -> MetaResult<()> { + tracing::info!("spawning new watcher for source {}", source.id); + + let splits = Arc::new(Mutex::new(SharedSplitMap { splits: None })); + let current_splits_ref = splits.clone(); + let source_id = source.id; + + let connector_properties = extract_prop_from_existing_source(&source)?; + + let enable_drop_split = connector_properties.enable_drop_split(); + let enable_adaptive_splits = connector_properties.enable_adaptive_splits(); + let (sync_call_tx, sync_call_rx) = tokio::sync::mpsc::unbounded_channel(); + let handle = tokio::spawn(async move { + let mut ticker = time::interval(DEFAULT_SOURCE_WORKER_TICK_INTERVAL); + ticker.set_missed_tick_behavior(MissedTickBehavior::Skip); + + dispatch_source_prop!(connector_properties, prop, { + let mut worker = loop { + ticker.tick().await; + + match ConnectorSourceWorker::create( + &source, + prop.deref().clone(), + DEFAULT_SOURCE_WORKER_TICK_INTERVAL, + current_splits_ref.clone(), + metrics.clone(), + ) + .await + { + Ok(worker) => { + break worker; + } + Err(e) => { + tracing::warn!(error = %e.as_report(), "failed to create source worker"); + } + } + }; + + worker.run(sync_call_rx).await + }); + }); + + managed_sources.insert( + source_id as SourceId, + ConnectorSourceWorkerHandle { + handle, + sync_call_tx, + splits, + enable_drop_split, + enable_adaptive_splits, + }, + ); + Ok(()) +} + +const DEFAULT_SOURCE_WORKER_TICK_INTERVAL: Duration = Duration::from_secs(30); + +impl ConnectorSourceWorker

{ + /// Recreate the `SplitEnumerator` to establish a new connection to the external source service. + async fn refresh(&mut self) -> MetaResult<()> { + let enumerator = P::SplitEnumerator::new( + self.connector_properties.clone(), + Arc::new(SourceEnumeratorContext { + metrics: self.metrics.source_enumerator_metrics.clone(), + info: SourceEnumeratorInfo { + source_id: self.source_id as u32, + }, + }), + ) + .await + .context("failed to create SplitEnumerator")?; + self.enumerator = enumerator; + self.fail_cnt = 0; + tracing::info!("refreshed source enumerator: {}", self.source_name); + Ok(()) + } + + /// On creation, connection to the external source service will be established, but `splits` + /// will not be updated until `tick` is called. + pub async fn create( + source: &Source, + connector_properties: P, + period: Duration, + splits: Arc>, + metrics: Arc, + ) -> MetaResult { + let enumerator = P::SplitEnumerator::new( + connector_properties.clone(), + Arc::new(SourceEnumeratorContext { + metrics: metrics.source_enumerator_metrics.clone(), + info: SourceEnumeratorInfo { + source_id: source.id, + }, + }), + ) + .await + .context("failed to create SplitEnumerator")?; + + let source_is_up = metrics + .source_is_up + .with_guarded_label_values(&[source.id.to_string().as_str(), &source.name]); + + Ok(Self { + source_id: source.id as SourceId, + source_name: source.name.clone(), + current_splits: splits, + enumerator, + period, + metrics, + connector_properties, + fail_cnt: 0, + source_is_up, + }) + } + + pub async fn run( + &mut self, + mut sync_call_rx: UnboundedReceiver>>, + ) { + let mut interval = time::interval(self.period); + interval.set_missed_tick_behavior(MissedTickBehavior::Skip); + loop { + select! { + biased; + tx = sync_call_rx.borrow_mut().recv() => { + if let Some(tx) = tx { + let _ = tx.send(self.tick().await); + } + } + _ = interval.tick() => { + if self.fail_cnt > MAX_FAIL_CNT { + if let Err(e) = self.refresh().await { + tracing::error!(error = %e.as_report(), "error happened when refresh from connector source worker"); + } + } + if let Err(e) = self.tick().await { + tracing::error!(error = %e.as_report(), "error happened when tick from connector source worker"); + } + } + } + } + } + + /// Uses [`SplitEnumerator`] to fetch the latest split metadata from the external source service. + async fn tick(&mut self) -> MetaResult<()> { + let source_is_up = |res: i64| { + self.source_is_up.set(res); + }; + let splits = self.enumerator.list_splits().await.inspect_err(|_| { + source_is_up(0); + self.fail_cnt += 1; + })?; + source_is_up(1); + self.fail_cnt = 0; + let mut current_splits = self.current_splits.lock().await; + current_splits.splits.replace( + splits + .into_iter() + .map(|split| (split.id(), P::Split::into(split))) + .collect(), + ); + + Ok(()) + } +} + +/// Handle for a running [`ConnectorSourceWorker`]. +pub struct ConnectorSourceWorkerHandle { + pub handle: JoinHandle<()>, + pub sync_call_tx: UnboundedSender>>, + pub splits: SharedSplitMapRef, + pub enable_drop_split: bool, + pub enable_adaptive_splits: bool, +} + +impl ConnectorSourceWorkerHandle { + pub fn get_enable_adaptive_splits(&self) -> bool { + self.enable_adaptive_splits + } + + pub async fn discovered_splits( + &self, + source_id: SourceId, + actors: &HashSet, + ) -> MetaResult, SplitImpl>> { + let Some(mut discovered_splits) = self.splits.lock().await.splits.clone() else { + tracing::info!( + "The discover loop for source {} is not ready yet; we'll wait for the next run", + source_id + ); + return Ok(BTreeMap::new()); + }; + if discovered_splits.is_empty() { + tracing::warn!("No splits discovered for source {}", source_id); + } + + if self.enable_adaptive_splits { + // Connector supporting adaptive splits returns just one split, and we need to make the number of splits equal to the number of actors in this fragment. + // Because we Risingwave consume the splits statelessly and we do not need to keep the id internally, we always use actor_id as split_id. + // And prev splits record should be dropped via CN. + + debug_assert!(self.enable_drop_split); + debug_assert!(discovered_splits.len() == 1); + discovered_splits = + fill_adaptive_split(discovered_splits.values().next().unwrap(), actors)?; + } + + Ok(discovered_splits) + } +}