Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: delete Kafka consumer group when source backfill is finished #20077

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions e2e_test/source_inline/commands.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ script = '''
set -e
if [ -n "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" ]; then
echo "Deleting all Kafka consumer groups..."
rpk group list | tail -n +2 | awk '{print $2}' | while read -r group; do
echo "Deleting Kafka consumer group: $group"
rpk group delete "$group"
done
echo "Deleting all Kafka topics..."
rpk topic delete -r "*"
echo "Deleting all schema registry subjects"
Expand Down
20 changes: 3 additions & 17 deletions e2e_test/source_inline/kafka/consumer_group.slt.serial
Original file line number Diff line number Diff line change
Expand Up @@ -58,28 +58,14 @@ EOF
# (enable.auto.commit defaults to true, and auto.commit.interval.ms defaults to 5s)
sleep 5s

system ok
./e2e_test/source_inline/kafka/consumer_group.mjs --mv mv list-members
----
0


# The lag for MV's group is 0.
system ok
./e2e_test/source_inline/kafka/consumer_group.mjs --mv mv list-lags
----
0


# Test delete consumer group on drop

# my_group: 1 source fragment, 1 backfill fragment, 1 batch query
# TODO: drop backfill fragment on backfill finish
# my_group: 1 source fragment, 1 batch query, (1 backfill fragment's group is already dropped after backfill finished)
# We only check my_group to avoid interfering with other tests.
system ok retry 3 backoff 5s
./e2e_test/source_inline/kafka/consumer_group.mjs count-groups | grep my_group
----
my_group: 3
my_group: 2


statement ok
Expand All @@ -90,7 +76,7 @@ sleep 1s
system ok retry 3 backoff 5s
./e2e_test/source_inline/kafka/consumer_group.mjs count-groups | grep my_group
----
my_group: 2
my_group: 1


system ok
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ c
# (enable.auto.commit defaults to true, and auto.commit.interval.ms defaults to 5s)
sleep 5s

system ok
system ok retry 3 backoff 5s
./e2e_test/source_inline/kafka/consumer_group.mjs --mv mv list-members
----
0
Expand Down
8 changes: 4 additions & 4 deletions e2e_test/source_inline/kafka/shared_source.slt.serial
Original file line number Diff line number Diff line change
Expand Up @@ -189,10 +189,10 @@ select v1, v2 from mv_1;
system ok
internal_table.mjs --name mv_1 --type sourcebackfill
----
0,"{""num_consumed_rows"": 2, ""state"": ""Finished"", ""target_offset"": ""0""}"
1,"{""num_consumed_rows"": 2, ""state"": ""Finished"", ""target_offset"": ""0""}"
2,"{""num_consumed_rows"": 3, ""state"": ""Finished"", ""target_offset"": ""1""}"
3,"{""num_consumed_rows"": 4, ""state"": ""Finished"", ""target_offset"": ""2""}"
0,"{""num_consumed_rows"": 1, ""state"": ""Finished"", ""target_offset"": ""0""}"
1,"{""num_consumed_rows"": 1, ""state"": ""Finished"", ""target_offset"": ""0""}"
2,"{""num_consumed_rows"": 2, ""state"": ""Finished"", ""target_offset"": ""1""}"
3,"{""num_consumed_rows"": 3, ""state"": ""Finished"", ""target_offset"": ""2""}"


system ok
Expand Down
9 changes: 9 additions & 0 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,10 @@ pub trait SplitEnumerator: Sized + Send {
async fn on_drop_fragments(&mut self, _fragment_ids: Vec<u32>) -> Result<()> {
Ok(())
}
/// Do some cleanup work when a backfill fragment is finished, e.g., drop Kafka consumer group.
async fn on_finish_backfill(&mut self, _fragment_ids: Vec<u32>) -> Result<()> {
Ok(())
}
}

pub type SourceContextRef = Arc<SourceContext>;
Expand All @@ -206,6 +210,7 @@ pub type SourceEnumeratorContextRef = Arc<SourceEnumeratorContext>;
pub trait AnySplitEnumerator: Send {
async fn list_splits(&mut self) -> Result<Vec<SplitImpl>>;
async fn on_drop_fragments(&mut self, _fragment_ids: Vec<u32>) -> Result<()>;
async fn on_finish_backfill(&mut self, _fragment_ids: Vec<u32>) -> Result<()>;
}

#[async_trait]
Expand All @@ -219,6 +224,10 @@ impl<T: SplitEnumerator<Split: Into<SplitImpl>>> AnySplitEnumerator for T {
async fn on_drop_fragments(&mut self, _fragment_ids: Vec<u32>) -> Result<()> {
SplitEnumerator::on_drop_fragments(self, _fragment_ids).await
}

async fn on_finish_backfill(&mut self, _fragment_ids: Vec<u32>) -> Result<()> {
SplitEnumerator::on_finish_backfill(self, _fragment_ids).await
}
}

/// The max size of a chunk yielded by source stream.
Expand Down
54 changes: 31 additions & 23 deletions src/connector/src/source/kafka/enumerator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,33 @@ pub struct KafkaSplitEnumerator {
config: rdkafka::ClientConfig,
}

impl KafkaSplitEnumerator {}
impl KafkaSplitEnumerator {
async fn drop_consumer_groups(&self, fragment_ids: Vec<u32>) -> ConnectorResult<()> {
let admin = SHARED_KAFKA_ADMIN
.try_get_with_by_ref(&self.properties.connection, async {
tracing::info!("build new kafka admin for {}", self.broker_address);
Ok(Arc::new(
build_kafka_admin(&self.config, &self.properties).await?,
))
})
.await?;

let group_ids = fragment_ids
.iter()
.map(|fragment_id| self.properties.group_id(*fragment_id))
.collect::<Vec<_>>();
let group_ids: Vec<&str> = group_ids.iter().map(|s| s.as_str()).collect();
let res = admin
.delete_groups(&group_ids, &AdminOptions::default())
.await?;
tracing::debug!(
topic = self.topic,
?fragment_ids,
"delete groups result: {res:?}"
);
Ok(())
}
}

#[async_trait]
impl SplitEnumerator for KafkaSplitEnumerator {
Expand Down Expand Up @@ -178,29 +204,11 @@ impl SplitEnumerator for KafkaSplitEnumerator {
}

async fn on_drop_fragments(&mut self, fragment_ids: Vec<u32>) -> ConnectorResult<()> {
let admin = SHARED_KAFKA_ADMIN
.try_get_with_by_ref(&self.properties.connection, async {
tracing::info!("build new kafka admin for {}", self.broker_address);
Ok(Arc::new(
build_kafka_admin(&self.config, &self.properties).await?,
))
})
.await?;
self.drop_consumer_groups(fragment_ids).await
}

let group_ids = fragment_ids
.iter()
.map(|fragment_id| self.properties.group_id(*fragment_id))
.collect::<Vec<_>>();
let group_ids: Vec<&str> = group_ids.iter().map(|s| s.as_str()).collect();
let res = admin
.delete_groups(&group_ids, &AdminOptions::default())
.await?;
tracing::debug!(
topic = self.topic,
?fragment_ids,
"delete groups result: {res:?}"
);
Ok(())
async fn on_finish_backfill(&mut self, fragment_ids: Vec<u32>) -> ConnectorResult<()> {
self.drop_consumer_groups(fragment_ids).await
}
}

Expand Down
1 change: 1 addition & 0 deletions src/connector/src/source/kafka/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,5 +327,6 @@ impl KafkaSplitReader {
// yield in the outer loop so that we can always guarantee that some messages are read
// every `MAX_CHUNK_SIZE`.
}
tracing::info!("kafka reader finished");
}
}
1 change: 1 addition & 0 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -977,6 +977,7 @@ impl DdlController {
.await?;
if aborted {
tracing::warn!(id = job_id, "aborted streaming job");
// FIXME: might also need other cleanup here
if let Some(source_id) = source_id {
self.source_manager
.apply_source_change(SourceChange::DropSource {
Expand Down
27 changes: 26 additions & 1 deletion src/meta/src/stream/source_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ impl SourceManagerCore {
pub fn apply_source_change(&mut self, source_change: SourceChange) {
let mut added_source_fragments = Default::default();
let mut added_backfill_fragments = Default::default();
let mut finished_backfill_fragments = Default::default();
let mut split_assignment = Default::default();
let mut dropped_actors = Default::default();
let mut fragment_replacements = Default::default();
Expand All @@ -121,6 +122,11 @@ impl SourceManagerCore {
added_backfill_fragments = added_backfill_fragments_;
split_assignment = split_assignment_;
}
SourceChange::CreateJobFinished {
finished_backfill_fragments: finished_backfill_fragments_,
} => {
finished_backfill_fragments = finished_backfill_fragments_;
}
SourceChange::SplitChange(split_assignment_) => {
split_assignment = split_assignment_;
}
Expand Down Expand Up @@ -188,6 +194,16 @@ impl SourceManagerCore {
.extend(fragments);
}

for (source_id, fragments) in finished_backfill_fragments {
let handle = self.managed_sources.get(&source_id).unwrap_or_else(|| {
panic!(
"source {} not found when adding backfill fragments {:?}",
source_id, fragments
);
});
handle.finish_backfill(fragments.iter().map(|(id, _up_id)| *id).collect());
}

for (_, actor_splits) in split_assignment {
for (actor_id, splits) in actor_splits {
// override previous splits info
Expand Down Expand Up @@ -477,12 +493,21 @@ impl SourceManager {
}

pub enum SourceChange {
/// `CREATE SOURCE` (shared), or `CREATE MV`
/// `CREATE SOURCE` (shared), or `CREATE MV`.
/// This is applied after the job is successfully created (`post_collect` barrier).
CreateJob {
added_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
/// (`source_id`, -> (`source_backfill_fragment_id`, `upstream_source_fragment_id`))
added_backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
split_assignment: SplitAssignment,
},
/// `CREATE SOURCE` (shared), or `CREATE MV` is _finished_ (backfill is done).
/// This is applied after `wait_streaming_job_finished`.
/// XXX: Should we merge `CreateJob` into this?
CreateJobFinished {
/// (`source_id`, -> (`source_backfill_fragment_id`, `upstream_source_fragment_id`))
finished_backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
},
SplitChange(SplitAssignment),
/// `DROP SOURCE` or `DROP MV`
DropSource {
Expand Down
23 changes: 23 additions & 0 deletions src/meta/src/stream/source_manager/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,12 @@ impl ConnectorSourceWorker {
tracing::warn!(error = %e.as_report(), "error happened when drop fragment");
}
}
SourceWorkerCommand::FinishBackfill(fragment_ids) => {
if let Err(e) = self.finish_backfill(fragment_ids).await {
// when error happens, we just log it and ignore
tracing::warn!(error = %e.as_report(), "error happened when finish backfill");
}
}
SourceWorkerCommand::Terminate => {
return;
}
Expand Down Expand Up @@ -287,6 +293,11 @@ impl ConnectorSourceWorker {
self.enumerator.on_drop_fragments(fragment_ids).await?;
Ok(())
}

async fn finish_backfill(&mut self, fragment_ids: Vec<FragmentId>) -> MetaResult<()> {
self.enumerator.on_finish_backfill(fragment_ids).await?;
Ok(())
}
}

/// Handle for a running [`ConnectorSourceWorker`].
Expand Down Expand Up @@ -354,13 +365,23 @@ impl ConnectorSourceWorkerHandle {
}

pub fn drop_fragments(&self, fragment_ids: Vec<FragmentId>) {
tracing::debug!("drop_fragments: {:?}", fragment_ids);
if let Err(e) = self.send_command(SourceWorkerCommand::DropFragments(fragment_ids)) {
// ignore drop fragment error, just log it
tracing::warn!(error = %e.as_report(), "failed to drop fragments");
}
}

pub fn finish_backfill(&self, fragment_ids: Vec<FragmentId>) {
tracing::debug!("finish_backfill: {:?}", fragment_ids);
if let Err(e) = self.send_command(SourceWorkerCommand::FinishBackfill(fragment_ids)) {
// ignore error, just log it
tracing::warn!(error = %e.as_report(), "failed to finish backfill");
}
}

pub fn terminate(&self, dropped_fragments: Option<BTreeSet<FragmentId>>) {
tracing::debug!("terminate: {:?}", dropped_fragments);
if let Some(dropped_fragments) = dropped_fragments {
self.drop_fragments(dropped_fragments.into_iter().collect());
}
Expand All @@ -378,6 +399,8 @@ pub enum SourceWorkerCommand {
Tick(#[educe(Debug(ignore))] oneshot::Sender<MetaResult<()>>),
/// Async command to drop a fragment.
DropFragments(Vec<FragmentId>),
/// Async command to finish backfill.
FinishBackfill(Vec<FragmentId>),
/// Terminate the worker task.
Terminate,
}
7 changes: 6 additions & 1 deletion src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use crate::manager::{
MetaSrvEnv, MetadataManager, NotificationVersion, StreamingJob, StreamingJobType,
};
use crate::model::{ActorId, FragmentId, StreamJobFragments, TableParallelism};
use crate::stream::SourceManagerRef;
use crate::stream::{SourceChange, SourceManagerRef};
use crate::{MetaError, MetaResult};

pub type GlobalStreamManagerRef = Arc<GlobalStreamManager>;
Expand Down Expand Up @@ -392,6 +392,10 @@ impl GlobalStreamManager {
.await?,
);

let source_change = SourceChange::CreateJobFinished {
finished_backfill_fragments: stream_job_fragments.source_backfill_fragments()?,
};

let info = CreateStreamingJobCommandInfo {
stream_job_fragments,
upstream_root_actors,
Expand Down Expand Up @@ -444,6 +448,7 @@ impl GlobalStreamManager {
.metadata_manager
.wait_streaming_job_finished(streaming_job.id() as _)
.await?;
self.source_manager.apply_source_change(source_change).await;
tracing::debug!(?streaming_job, "stream job finish");
Ok(version)
}
Expand Down
8 changes: 8 additions & 0 deletions src/stream/src/executor/source/source_backfill_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,13 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
.await?;

if self.should_report_finished(&backfill_stage.states) {
// drop the backfill kafka consumers
backfill_stream = select_with_strategy(
input.by_ref().map(Either::Left),
futures::stream::pending().boxed().map(Either::Right),
select_strategy,
);

self.progress.finish(
barrier.epoch,
backfill_stage.total_backfilled_rows(),
Expand Down Expand Up @@ -734,6 +741,7 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
}
}

std::mem::drop(backfill_stream);
let mut states = backfill_stage.states;
// Make sure `Finished` state is persisted.
self.backfill_state_store.set_states(states.clone()).await?;
Expand Down
Loading