Skip to content

Commit

Permalink
feat: delete Kafka consumer group when source backfill is finished (#…
Browse files Browse the repository at this point in the history
…20077)

Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan authored Jan 17, 2025
1 parent 27efddf commit 0695831
Show file tree
Hide file tree
Showing 12 changed files with 118 additions and 47 deletions.
5 changes: 5 additions & 0 deletions e2e_test/source_inline/commands.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ script = '''
set -e
if [ -n "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" ]; then
echo "Deleting all Kafka consumer groups..."
rpk group list | tail -n +2 | awk '{print $2}' | while read -r group; do
echo "Deleting Kafka consumer group: $group"
rpk group delete "$group"
done
echo "Deleting all Kafka topics..."
rpk topic delete -r "*"
echo "Deleting all schema registry subjects"
Expand Down
20 changes: 3 additions & 17 deletions e2e_test/source_inline/kafka/consumer_group.slt.serial
Original file line number Diff line number Diff line change
Expand Up @@ -58,28 +58,14 @@ EOF
# (enable.auto.commit defaults to true, and auto.commit.interval.ms defaults to 5s)
sleep 5s

system ok
./e2e_test/source_inline/kafka/consumer_group.mjs --mv mv list-members
----
0


# The lag for MV's group is 0.
system ok
./e2e_test/source_inline/kafka/consumer_group.mjs --mv mv list-lags
----
0


# Test delete consumer group on drop

# my_group: 1 source fragment, 1 backfill fragment, 1 batch query
# TODO: drop backfill fragment on backfill finish
# my_group: 1 source fragment, 1 batch query, (1 backfill fragment's group is already dropped after backfill finished)
# We only check my_group to avoid interfering with other tests.
system ok retry 3 backoff 5s
./e2e_test/source_inline/kafka/consumer_group.mjs count-groups | grep my_group
----
my_group: 3
my_group: 2


statement ok
Expand All @@ -90,7 +76,7 @@ sleep 1s
system ok retry 3 backoff 5s
./e2e_test/source_inline/kafka/consumer_group.mjs count-groups | grep my_group
----
my_group: 2
my_group: 1


system ok
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ c
# (enable.auto.commit defaults to true, and auto.commit.interval.ms defaults to 5s)
sleep 5s

system ok
system ok retry 3 backoff 5s
./e2e_test/source_inline/kafka/consumer_group.mjs --mv mv list-members
----
0
Expand Down
8 changes: 4 additions & 4 deletions e2e_test/source_inline/kafka/shared_source.slt.serial
Original file line number Diff line number Diff line change
Expand Up @@ -189,10 +189,10 @@ select v1, v2 from mv_1;
system ok
internal_table.mjs --name mv_1 --type sourcebackfill
----
0,"{""num_consumed_rows"": 2, ""state"": ""Finished"", ""target_offset"": ""0""}"
1,"{""num_consumed_rows"": 2, ""state"": ""Finished"", ""target_offset"": ""0""}"
2,"{""num_consumed_rows"": 3, ""state"": ""Finished"", ""target_offset"": ""1""}"
3,"{""num_consumed_rows"": 4, ""state"": ""Finished"", ""target_offset"": ""2""}"
0,"{""num_consumed_rows"": 1, ""state"": ""Finished"", ""target_offset"": ""0""}"
1,"{""num_consumed_rows"": 1, ""state"": ""Finished"", ""target_offset"": ""0""}"
2,"{""num_consumed_rows"": 2, ""state"": ""Finished"", ""target_offset"": ""1""}"
3,"{""num_consumed_rows"": 3, ""state"": ""Finished"", ""target_offset"": ""2""}"


system ok
Expand Down
9 changes: 9 additions & 0 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,10 @@ pub trait SplitEnumerator: Sized + Send {
async fn on_drop_fragments(&mut self, _fragment_ids: Vec<u32>) -> Result<()> {
Ok(())
}
/// Do some cleanup work when a backfill fragment is finished, e.g., drop Kafka consumer group.
async fn on_finish_backfill(&mut self, _fragment_ids: Vec<u32>) -> Result<()> {
Ok(())
}
}

pub type SourceContextRef = Arc<SourceContext>;
Expand All @@ -206,6 +210,7 @@ pub type SourceEnumeratorContextRef = Arc<SourceEnumeratorContext>;
pub trait AnySplitEnumerator: Send {
async fn list_splits(&mut self) -> Result<Vec<SplitImpl>>;
async fn on_drop_fragments(&mut self, _fragment_ids: Vec<u32>) -> Result<()>;
async fn on_finish_backfill(&mut self, _fragment_ids: Vec<u32>) -> Result<()>;
}

#[async_trait]
Expand All @@ -219,6 +224,10 @@ impl<T: SplitEnumerator<Split: Into<SplitImpl>>> AnySplitEnumerator for T {
async fn on_drop_fragments(&mut self, _fragment_ids: Vec<u32>) -> Result<()> {
SplitEnumerator::on_drop_fragments(self, _fragment_ids).await
}

async fn on_finish_backfill(&mut self, _fragment_ids: Vec<u32>) -> Result<()> {
SplitEnumerator::on_finish_backfill(self, _fragment_ids).await
}
}

/// The max size of a chunk yielded by source stream.
Expand Down
54 changes: 31 additions & 23 deletions src/connector/src/source/kafka/enumerator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,33 @@ pub struct KafkaSplitEnumerator {
config: rdkafka::ClientConfig,
}

impl KafkaSplitEnumerator {}
impl KafkaSplitEnumerator {
async fn drop_consumer_groups(&self, fragment_ids: Vec<u32>) -> ConnectorResult<()> {
let admin = SHARED_KAFKA_ADMIN
.try_get_with_by_ref(&self.properties.connection, async {
tracing::info!("build new kafka admin for {}", self.broker_address);
Ok(Arc::new(
build_kafka_admin(&self.config, &self.properties).await?,
))
})
.await?;

let group_ids = fragment_ids
.iter()
.map(|fragment_id| self.properties.group_id(*fragment_id))
.collect::<Vec<_>>();
let group_ids: Vec<&str> = group_ids.iter().map(|s| s.as_str()).collect();
let res = admin
.delete_groups(&group_ids, &AdminOptions::default())
.await?;
tracing::debug!(
topic = self.topic,
?fragment_ids,
"delete groups result: {res:?}"
);
Ok(())
}
}

#[async_trait]
impl SplitEnumerator for KafkaSplitEnumerator {
Expand Down Expand Up @@ -178,29 +204,11 @@ impl SplitEnumerator for KafkaSplitEnumerator {
}

async fn on_drop_fragments(&mut self, fragment_ids: Vec<u32>) -> ConnectorResult<()> {
let admin = SHARED_KAFKA_ADMIN
.try_get_with_by_ref(&self.properties.connection, async {
tracing::info!("build new kafka admin for {}", self.broker_address);
Ok(Arc::new(
build_kafka_admin(&self.config, &self.properties).await?,
))
})
.await?;
self.drop_consumer_groups(fragment_ids).await
}

let group_ids = fragment_ids
.iter()
.map(|fragment_id| self.properties.group_id(*fragment_id))
.collect::<Vec<_>>();
let group_ids: Vec<&str> = group_ids.iter().map(|s| s.as_str()).collect();
let res = admin
.delete_groups(&group_ids, &AdminOptions::default())
.await?;
tracing::debug!(
topic = self.topic,
?fragment_ids,
"delete groups result: {res:?}"
);
Ok(())
async fn on_finish_backfill(&mut self, fragment_ids: Vec<u32>) -> ConnectorResult<()> {
self.drop_consumer_groups(fragment_ids).await
}
}

Expand Down
1 change: 1 addition & 0 deletions src/connector/src/source/kafka/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,5 +327,6 @@ impl KafkaSplitReader {
// yield in the outer loop so that we can always guarantee that some messages are read
// every `MAX_CHUNK_SIZE`.
}
tracing::info!("kafka reader finished");
}
}
1 change: 1 addition & 0 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -977,6 +977,7 @@ impl DdlController {
.await?;
if aborted {
tracing::warn!(id = job_id, "aborted streaming job");
// FIXME: might also need other cleanup here
if let Some(source_id) = source_id {
self.source_manager
.apply_source_change(SourceChange::DropSource {
Expand Down
27 changes: 26 additions & 1 deletion src/meta/src/stream/source_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ impl SourceManagerCore {
pub fn apply_source_change(&mut self, source_change: SourceChange) {
let mut added_source_fragments = Default::default();
let mut added_backfill_fragments = Default::default();
let mut finished_backfill_fragments = Default::default();
let mut split_assignment = Default::default();
let mut dropped_actors = Default::default();
let mut fragment_replacements = Default::default();
Expand All @@ -121,6 +122,11 @@ impl SourceManagerCore {
added_backfill_fragments = added_backfill_fragments_;
split_assignment = split_assignment_;
}
SourceChange::CreateJobFinished {
finished_backfill_fragments: finished_backfill_fragments_,
} => {
finished_backfill_fragments = finished_backfill_fragments_;
}
SourceChange::SplitChange(split_assignment_) => {
split_assignment = split_assignment_;
}
Expand Down Expand Up @@ -188,6 +194,16 @@ impl SourceManagerCore {
.extend(fragments);
}

for (source_id, fragments) in finished_backfill_fragments {
let handle = self.managed_sources.get(&source_id).unwrap_or_else(|| {
panic!(
"source {} not found when adding backfill fragments {:?}",
source_id, fragments
);
});
handle.finish_backfill(fragments.iter().map(|(id, _up_id)| *id).collect());
}

for (_, actor_splits) in split_assignment {
for (actor_id, splits) in actor_splits {
// override previous splits info
Expand Down Expand Up @@ -477,12 +493,21 @@ impl SourceManager {
}

pub enum SourceChange {
/// `CREATE SOURCE` (shared), or `CREATE MV`
/// `CREATE SOURCE` (shared), or `CREATE MV`.
/// This is applied after the job is successfully created (`post_collect` barrier).
CreateJob {
added_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
/// (`source_id`, -> (`source_backfill_fragment_id`, `upstream_source_fragment_id`))
added_backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
split_assignment: SplitAssignment,
},
/// `CREATE SOURCE` (shared), or `CREATE MV` is _finished_ (backfill is done).
/// This is applied after `wait_streaming_job_finished`.
/// XXX: Should we merge `CreateJob` into this?
CreateJobFinished {
/// (`source_id`, -> (`source_backfill_fragment_id`, `upstream_source_fragment_id`))
finished_backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
},
SplitChange(SplitAssignment),
/// `DROP SOURCE` or `DROP MV`
DropSource {
Expand Down
23 changes: 23 additions & 0 deletions src/meta/src/stream/source_manager/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,12 @@ impl ConnectorSourceWorker {
tracing::warn!(error = %e.as_report(), "error happened when drop fragment");
}
}
SourceWorkerCommand::FinishBackfill(fragment_ids) => {
if let Err(e) = self.finish_backfill(fragment_ids).await {
// when error happens, we just log it and ignore
tracing::warn!(error = %e.as_report(), "error happened when finish backfill");
}
}
SourceWorkerCommand::Terminate => {
return;
}
Expand Down Expand Up @@ -287,6 +293,11 @@ impl ConnectorSourceWorker {
self.enumerator.on_drop_fragments(fragment_ids).await?;
Ok(())
}

async fn finish_backfill(&mut self, fragment_ids: Vec<FragmentId>) -> MetaResult<()> {
self.enumerator.on_finish_backfill(fragment_ids).await?;
Ok(())
}
}

/// Handle for a running [`ConnectorSourceWorker`].
Expand Down Expand Up @@ -354,13 +365,23 @@ impl ConnectorSourceWorkerHandle {
}

pub fn drop_fragments(&self, fragment_ids: Vec<FragmentId>) {
tracing::debug!("drop_fragments: {:?}", fragment_ids);
if let Err(e) = self.send_command(SourceWorkerCommand::DropFragments(fragment_ids)) {
// ignore drop fragment error, just log it
tracing::warn!(error = %e.as_report(), "failed to drop fragments");
}
}

pub fn finish_backfill(&self, fragment_ids: Vec<FragmentId>) {
tracing::debug!("finish_backfill: {:?}", fragment_ids);
if let Err(e) = self.send_command(SourceWorkerCommand::FinishBackfill(fragment_ids)) {
// ignore error, just log it
tracing::warn!(error = %e.as_report(), "failed to finish backfill");
}
}

pub fn terminate(&self, dropped_fragments: Option<BTreeSet<FragmentId>>) {
tracing::debug!("terminate: {:?}", dropped_fragments);
if let Some(dropped_fragments) = dropped_fragments {
self.drop_fragments(dropped_fragments.into_iter().collect());
}
Expand All @@ -378,6 +399,8 @@ pub enum SourceWorkerCommand {
Tick(#[educe(Debug(ignore))] oneshot::Sender<MetaResult<()>>),
/// Async command to drop a fragment.
DropFragments(Vec<FragmentId>),
/// Async command to finish backfill.
FinishBackfill(Vec<FragmentId>),
/// Terminate the worker task.
Terminate,
}
7 changes: 6 additions & 1 deletion src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use crate::manager::{
MetaSrvEnv, MetadataManager, NotificationVersion, StreamingJob, StreamingJobType,
};
use crate::model::{ActorId, FragmentId, StreamJobFragments, TableParallelism};
use crate::stream::SourceManagerRef;
use crate::stream::{SourceChange, SourceManagerRef};
use crate::{MetaError, MetaResult};

pub type GlobalStreamManagerRef = Arc<GlobalStreamManager>;
Expand Down Expand Up @@ -392,6 +392,10 @@ impl GlobalStreamManager {
.await?,
);

let source_change = SourceChange::CreateJobFinished {
finished_backfill_fragments: stream_job_fragments.source_backfill_fragments()?,
};

let info = CreateStreamingJobCommandInfo {
stream_job_fragments,
upstream_root_actors,
Expand Down Expand Up @@ -444,6 +448,7 @@ impl GlobalStreamManager {
.metadata_manager
.wait_streaming_job_finished(streaming_job.id() as _)
.await?;
self.source_manager.apply_source_change(source_change).await;
tracing::debug!(?streaming_job, "stream job finish");
Ok(version)
}
Expand Down
8 changes: 8 additions & 0 deletions src/stream/src/executor/source/source_backfill_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,13 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
.await?;

if self.should_report_finished(&backfill_stage.states) {
// drop the backfill kafka consumers
backfill_stream = select_with_strategy(
input.by_ref().map(Either::Left),
futures::stream::pending().boxed().map(Either::Right),
select_strategy,
);

self.progress.finish(
barrier.epoch,
backfill_stage.total_backfilled_rows(),
Expand Down Expand Up @@ -734,6 +741,7 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
}
}

std::mem::drop(backfill_stream);
let mut states = backfill_stage.states;
// Make sure `Finished` state is persisted.
self.backfill_state_store.set_states(states.clone()).await?;
Expand Down

0 comments on commit 0695831

Please sign in to comment.