diff --git a/e2e_test/source_inline/commands.toml b/e2e_test/source_inline/commands.toml index 57d09d8237efa..3300b6e3a5df5 100644 --- a/e2e_test/source_inline/commands.toml +++ b/e2e_test/source_inline/commands.toml @@ -35,6 +35,11 @@ script = ''' set -e if [ -n "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" ]; then + echo "Deleting all Kafka consumer groups..." + rpk group list | tail -n +2 | awk '{print $2}' | while read -r group; do + echo "Deleting Kafka consumer group: $group" + rpk group delete "$group" + done echo "Deleting all Kafka topics..." rpk topic delete -r "*" echo "Deleting all schema registry subjects" diff --git a/e2e_test/source_inline/kafka/consumer_group.slt.serial b/e2e_test/source_inline/kafka/consumer_group.slt.serial index 29f41c82d6168..ed6ce13e3c006 100644 --- a/e2e_test/source_inline/kafka/consumer_group.slt.serial +++ b/e2e_test/source_inline/kafka/consumer_group.slt.serial @@ -58,28 +58,14 @@ EOF # (enable.auto.commit defaults to true, and auto.commit.interval.ms defaults to 5s) sleep 5s -system ok -./e2e_test/source_inline/kafka/consumer_group.mjs --mv mv list-members ----- -0 - - -# The lag for MV's group is 0. -system ok -./e2e_test/source_inline/kafka/consumer_group.mjs --mv mv list-lags ----- -0 - - # Test delete consumer group on drop -# my_group: 1 source fragment, 1 backfill fragment, 1 batch query -# TODO: drop backfill fragment on backfill finish +# my_group: 1 source fragment, 1 batch query, (1 backfill fragment's group is already dropped after backfill finished) # We only check my_group to avoid interfering with other tests. system ok retry 3 backoff 5s ./e2e_test/source_inline/kafka/consumer_group.mjs count-groups | grep my_group ---- -my_group: 3 +my_group: 2 statement ok @@ -90,7 +76,7 @@ sleep 1s system ok retry 3 backoff 5s ./e2e_test/source_inline/kafka/consumer_group.mjs count-groups | grep my_group ---- -my_group: 2 +my_group: 1 system ok diff --git a/e2e_test/source_inline/kafka/consumer_group_non_shared.slt.serial b/e2e_test/source_inline/kafka/consumer_group_non_shared.slt.serial index 2009bf53f8968..3111673f0a5e0 100644 --- a/e2e_test/source_inline/kafka/consumer_group_non_shared.slt.serial +++ b/e2e_test/source_inline/kafka/consumer_group_non_shared.slt.serial @@ -52,7 +52,7 @@ c # (enable.auto.commit defaults to true, and auto.commit.interval.ms defaults to 5s) sleep 5s -system ok +system ok retry 3 backoff 5s ./e2e_test/source_inline/kafka/consumer_group.mjs --mv mv list-members ---- 0 diff --git a/e2e_test/source_inline/kafka/shared_source.slt.serial b/e2e_test/source_inline/kafka/shared_source.slt.serial index af6b371d21c49..42d4cf86725ef 100644 --- a/e2e_test/source_inline/kafka/shared_source.slt.serial +++ b/e2e_test/source_inline/kafka/shared_source.slt.serial @@ -189,10 +189,10 @@ select v1, v2 from mv_1; system ok internal_table.mjs --name mv_1 --type sourcebackfill ---- -0,"{""num_consumed_rows"": 2, ""state"": ""Finished"", ""target_offset"": ""0""}" -1,"{""num_consumed_rows"": 2, ""state"": ""Finished"", ""target_offset"": ""0""}" -2,"{""num_consumed_rows"": 3, ""state"": ""Finished"", ""target_offset"": ""1""}" -3,"{""num_consumed_rows"": 4, ""state"": ""Finished"", ""target_offset"": ""2""}" +0,"{""num_consumed_rows"": 1, ""state"": ""Finished"", ""target_offset"": ""0""}" +1,"{""num_consumed_rows"": 1, ""state"": ""Finished"", ""target_offset"": ""0""}" +2,"{""num_consumed_rows"": 2, ""state"": ""Finished"", ""target_offset"": ""1""}" +3,"{""num_consumed_rows"": 3, ""state"": ""Finished"", ""target_offset"": ""2""}" system ok diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index 28f3a756ea5c9..406d678a65ee6 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -196,6 +196,10 @@ pub trait SplitEnumerator: Sized + Send { async fn on_drop_fragments(&mut self, _fragment_ids: Vec) -> Result<()> { Ok(()) } + /// Do some cleanup work when a backfill fragment is finished, e.g., drop Kafka consumer group. + async fn on_finish_backfill(&mut self, _fragment_ids: Vec) -> Result<()> { + Ok(()) + } } pub type SourceContextRef = Arc; @@ -206,6 +210,7 @@ pub type SourceEnumeratorContextRef = Arc; pub trait AnySplitEnumerator: Send { async fn list_splits(&mut self) -> Result>; async fn on_drop_fragments(&mut self, _fragment_ids: Vec) -> Result<()>; + async fn on_finish_backfill(&mut self, _fragment_ids: Vec) -> Result<()>; } #[async_trait] @@ -219,6 +224,10 @@ impl>> AnySplitEnumerator for T { async fn on_drop_fragments(&mut self, _fragment_ids: Vec) -> Result<()> { SplitEnumerator::on_drop_fragments(self, _fragment_ids).await } + + async fn on_finish_backfill(&mut self, _fragment_ids: Vec) -> Result<()> { + SplitEnumerator::on_finish_backfill(self, _fragment_ids).await + } } /// The max size of a chunk yielded by source stream. diff --git a/src/connector/src/source/kafka/enumerator.rs b/src/connector/src/source/kafka/enumerator.rs index 00e3ead9049b9..42045b20ba2e0 100644 --- a/src/connector/src/source/kafka/enumerator.rs +++ b/src/connector/src/source/kafka/enumerator.rs @@ -75,7 +75,33 @@ pub struct KafkaSplitEnumerator { config: rdkafka::ClientConfig, } -impl KafkaSplitEnumerator {} +impl KafkaSplitEnumerator { + async fn drop_consumer_groups(&self, fragment_ids: Vec) -> ConnectorResult<()> { + let admin = SHARED_KAFKA_ADMIN + .try_get_with_by_ref(&self.properties.connection, async { + tracing::info!("build new kafka admin for {}", self.broker_address); + Ok(Arc::new( + build_kafka_admin(&self.config, &self.properties).await?, + )) + }) + .await?; + + let group_ids = fragment_ids + .iter() + .map(|fragment_id| self.properties.group_id(*fragment_id)) + .collect::>(); + let group_ids: Vec<&str> = group_ids.iter().map(|s| s.as_str()).collect(); + let res = admin + .delete_groups(&group_ids, &AdminOptions::default()) + .await?; + tracing::debug!( + topic = self.topic, + ?fragment_ids, + "delete groups result: {res:?}" + ); + Ok(()) + } +} #[async_trait] impl SplitEnumerator for KafkaSplitEnumerator { @@ -178,29 +204,11 @@ impl SplitEnumerator for KafkaSplitEnumerator { } async fn on_drop_fragments(&mut self, fragment_ids: Vec) -> ConnectorResult<()> { - let admin = SHARED_KAFKA_ADMIN - .try_get_with_by_ref(&self.properties.connection, async { - tracing::info!("build new kafka admin for {}", self.broker_address); - Ok(Arc::new( - build_kafka_admin(&self.config, &self.properties).await?, - )) - }) - .await?; + self.drop_consumer_groups(fragment_ids).await + } - let group_ids = fragment_ids - .iter() - .map(|fragment_id| self.properties.group_id(*fragment_id)) - .collect::>(); - let group_ids: Vec<&str> = group_ids.iter().map(|s| s.as_str()).collect(); - let res = admin - .delete_groups(&group_ids, &AdminOptions::default()) - .await?; - tracing::debug!( - topic = self.topic, - ?fragment_ids, - "delete groups result: {res:?}" - ); - Ok(()) + async fn on_finish_backfill(&mut self, fragment_ids: Vec) -> ConnectorResult<()> { + self.drop_consumer_groups(fragment_ids).await } } diff --git a/src/connector/src/source/kafka/source/reader.rs b/src/connector/src/source/kafka/source/reader.rs index cc921bd01326f..e0eddb0dee507 100644 --- a/src/connector/src/source/kafka/source/reader.rs +++ b/src/connector/src/source/kafka/source/reader.rs @@ -327,5 +327,6 @@ impl KafkaSplitReader { // yield in the outer loop so that we can always guarantee that some messages are read // every `MAX_CHUNK_SIZE`. } + tracing::info!("kafka reader finished"); } } diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 992f5f2610038..976206477f8c9 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -977,6 +977,7 @@ impl DdlController { .await?; if aborted { tracing::warn!(id = job_id, "aborted streaming job"); + // FIXME: might also need other cleanup here if let Some(source_id) = source_id { self.source_manager .apply_source_change(SourceChange::DropSource { diff --git a/src/meta/src/stream/source_manager.rs b/src/meta/src/stream/source_manager.rs index 3c883903232ba..367482e831fb4 100644 --- a/src/meta/src/stream/source_manager.rs +++ b/src/meta/src/stream/source_manager.rs @@ -105,6 +105,7 @@ impl SourceManagerCore { pub fn apply_source_change(&mut self, source_change: SourceChange) { let mut added_source_fragments = Default::default(); let mut added_backfill_fragments = Default::default(); + let mut finished_backfill_fragments = Default::default(); let mut split_assignment = Default::default(); let mut dropped_actors = Default::default(); let mut fragment_replacements = Default::default(); @@ -121,6 +122,11 @@ impl SourceManagerCore { added_backfill_fragments = added_backfill_fragments_; split_assignment = split_assignment_; } + SourceChange::CreateJobFinished { + finished_backfill_fragments: finished_backfill_fragments_, + } => { + finished_backfill_fragments = finished_backfill_fragments_; + } SourceChange::SplitChange(split_assignment_) => { split_assignment = split_assignment_; } @@ -188,6 +194,16 @@ impl SourceManagerCore { .extend(fragments); } + for (source_id, fragments) in finished_backfill_fragments { + let handle = self.managed_sources.get(&source_id).unwrap_or_else(|| { + panic!( + "source {} not found when adding backfill fragments {:?}", + source_id, fragments + ); + }); + handle.finish_backfill(fragments.iter().map(|(id, _up_id)| *id).collect()); + } + for (_, actor_splits) in split_assignment { for (actor_id, splits) in actor_splits { // override previous splits info @@ -477,12 +493,21 @@ impl SourceManager { } pub enum SourceChange { - /// `CREATE SOURCE` (shared), or `CREATE MV` + /// `CREATE SOURCE` (shared), or `CREATE MV`. + /// This is applied after the job is successfully created (`post_collect` barrier). CreateJob { added_source_fragments: HashMap>, + /// (`source_id`, -> (`source_backfill_fragment_id`, `upstream_source_fragment_id`)) added_backfill_fragments: HashMap>, split_assignment: SplitAssignment, }, + /// `CREATE SOURCE` (shared), or `CREATE MV` is _finished_ (backfill is done). + /// This is applied after `wait_streaming_job_finished`. + /// XXX: Should we merge `CreateJob` into this? + CreateJobFinished { + /// (`source_id`, -> (`source_backfill_fragment_id`, `upstream_source_fragment_id`)) + finished_backfill_fragments: HashMap>, + }, SplitChange(SplitAssignment), /// `DROP SOURCE` or `DROP MV` DropSource { diff --git a/src/meta/src/stream/source_manager/worker.rs b/src/meta/src/stream/source_manager/worker.rs index 9c77803c63832..d870a72a3cb23 100644 --- a/src/meta/src/stream/source_manager/worker.rs +++ b/src/meta/src/stream/source_manager/worker.rs @@ -241,6 +241,12 @@ impl ConnectorSourceWorker { tracing::warn!(error = %e.as_report(), "error happened when drop fragment"); } } + SourceWorkerCommand::FinishBackfill(fragment_ids) => { + if let Err(e) = self.finish_backfill(fragment_ids).await { + // when error happens, we just log it and ignore + tracing::warn!(error = %e.as_report(), "error happened when finish backfill"); + } + } SourceWorkerCommand::Terminate => { return; } @@ -287,6 +293,11 @@ impl ConnectorSourceWorker { self.enumerator.on_drop_fragments(fragment_ids).await?; Ok(()) } + + async fn finish_backfill(&mut self, fragment_ids: Vec) -> MetaResult<()> { + self.enumerator.on_finish_backfill(fragment_ids).await?; + Ok(()) + } } /// Handle for a running [`ConnectorSourceWorker`]. @@ -354,13 +365,23 @@ impl ConnectorSourceWorkerHandle { } pub fn drop_fragments(&self, fragment_ids: Vec) { + tracing::debug!("drop_fragments: {:?}", fragment_ids); if let Err(e) = self.send_command(SourceWorkerCommand::DropFragments(fragment_ids)) { // ignore drop fragment error, just log it tracing::warn!(error = %e.as_report(), "failed to drop fragments"); } } + pub fn finish_backfill(&self, fragment_ids: Vec) { + tracing::debug!("finish_backfill: {:?}", fragment_ids); + if let Err(e) = self.send_command(SourceWorkerCommand::FinishBackfill(fragment_ids)) { + // ignore error, just log it + tracing::warn!(error = %e.as_report(), "failed to finish backfill"); + } + } + pub fn terminate(&self, dropped_fragments: Option>) { + tracing::debug!("terminate: {:?}", dropped_fragments); if let Some(dropped_fragments) = dropped_fragments { self.drop_fragments(dropped_fragments.into_iter().collect()); } @@ -378,6 +399,8 @@ pub enum SourceWorkerCommand { Tick(#[educe(Debug(ignore))] oneshot::Sender>), /// Async command to drop a fragment. DropFragments(Vec), + /// Async command to finish backfill. + FinishBackfill(Vec), /// Terminate the worker task. Terminate, } diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index d9e64ab3c660a..e20165fb50078 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -41,7 +41,7 @@ use crate::manager::{ MetaSrvEnv, MetadataManager, NotificationVersion, StreamingJob, StreamingJobType, }; use crate::model::{ActorId, FragmentId, StreamJobFragments, TableParallelism}; -use crate::stream::SourceManagerRef; +use crate::stream::{SourceChange, SourceManagerRef}; use crate::{MetaError, MetaResult}; pub type GlobalStreamManagerRef = Arc; @@ -392,6 +392,10 @@ impl GlobalStreamManager { .await?, ); + let source_change = SourceChange::CreateJobFinished { + finished_backfill_fragments: stream_job_fragments.source_backfill_fragments()?, + }; + let info = CreateStreamingJobCommandInfo { stream_job_fragments, upstream_root_actors, @@ -444,6 +448,7 @@ impl GlobalStreamManager { .metadata_manager .wait_streaming_job_finished(streaming_job.id() as _) .await?; + self.source_manager.apply_source_change(source_change).await; tracing::debug!(?streaming_job, "stream job finish"); Ok(version) } diff --git a/src/stream/src/executor/source/source_backfill_executor.rs b/src/stream/src/executor/source/source_backfill_executor.rs index d960ea97ef859..d8117f86fb5dc 100644 --- a/src/stream/src/executor/source/source_backfill_executor.rs +++ b/src/stream/src/executor/source/source_backfill_executor.rs @@ -636,6 +636,13 @@ impl SourceBackfillExecutorInner { .await?; if self.should_report_finished(&backfill_stage.states) { + // drop the backfill kafka consumers + backfill_stream = select_with_strategy( + input.by_ref().map(Either::Left), + futures::stream::pending().boxed().map(Either::Right), + select_strategy, + ); + self.progress.finish( barrier.epoch, backfill_stage.total_backfilled_rows(), @@ -734,6 +741,7 @@ impl SourceBackfillExecutorInner { } } + std::mem::drop(backfill_stream); let mut states = backfill_stage.states; // Make sure `Finished` state is persisted. self.backfill_state_store.set_states(states.clone()).await?;