Skip to content

Commit

Permalink
fix: add a separate change command for job creation finish
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan committed Jan 17, 2025
1 parent 0a3eb99 commit 226e404
Show file tree
Hide file tree
Showing 9 changed files with 54 additions and 27 deletions.
5 changes: 5 additions & 0 deletions e2e_test/source_inline/commands.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ script = '''
set -e
if [ -n "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" ]; then
echo "Deleting all Kafka consumer groups..."
rpk group list | tail -n +2 | awk '{print $2}' | while read -r group; do
echo "Deleting Kafka consumer group: $group"
rpk group delete "$group"
done
echo "Deleting all Kafka topics..."
rpk topic delete -r "*"
echo "Deleting all schema registry subjects"
Expand Down
17 changes: 3 additions & 14 deletions e2e_test/source_inline/kafka/consumer_group.slt.serial
Original file line number Diff line number Diff line change
Expand Up @@ -54,20 +54,9 @@ EOF

# There are 2 consumer groups, 1 for batch query (not listed below), 1 for Source.
# All of them are "Empty" state with 0 members, because we manually `assign` partitions to them.


system ok
./e2e_test/source_inline/kafka/consumer_group.mjs --mv mv list-members
----
0


# The lag for MV's group is 0.
system ok
./e2e_test/source_inline/kafka/consumer_group.mjs --mv mv list-lags
----
0

# At the beginning, the Source's consumer group will not occur. They will be created after committing offset to Kafka.
# (enable.auto.commit defaults to true, and auto.commit.interval.ms defaults to 5s)
sleep 5s

# Test delete consumer group on drop

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,11 @@ c

# There are 2 consumer groups, 1 for batch query (not listed below), 1 for MV.
# All of them are "Empty" state with 0 members, because we manually `assign` partitions to them.
# At the beginning, the MV's consumer group will not occur. They will be created after committing offset to Kafka.
# (enable.auto.commit defaults to true, and auto.commit.interval.ms defaults to 5s)
sleep 5s

system ok
system ok retry 3 backoff 5s
./e2e_test/source_inline/kafka/consumer_group.mjs --mv mv list-members
----
0
Expand Down
7 changes: 2 additions & 5 deletions src/connector/src/source/kafka/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use async_trait::async_trait;
use futures::StreamExt;
use futures_async_stream::try_stream;
use rdkafka::config::RDKafkaLogLevel;
use rdkafka::consumer::{CommitMode, Consumer, StreamConsumer};
use rdkafka::consumer::{Consumer, StreamConsumer};
use rdkafka::error::KafkaError;
use rdkafka::{ClientConfig, Message, Offset, TopicPartitionList};
use risingwave_common::metrics::LabelGuardedIntGauge;
Expand Down Expand Up @@ -141,10 +141,6 @@ impl SplitReader for KafkaSplitReader {
tracing::debug!("backfill_info: {:?}", backfill_info);

consumer.assign(&tpl)?;
// commit the offsets to make the consumer group available immediately.
// (Otherwise we may fail to list or delete consumer groups)
tpl.set_all_offsets(Offset::Offset(0))?;
consumer.commit(&tpl, CommitMode::Sync).await?;

// The two parameters below are only used by developers for performance testing purposes,
// so we panic here on purpose if the input is not correctly recognized.
Expand Down Expand Up @@ -331,5 +327,6 @@ impl KafkaSplitReader {
// yield in the outer loop so that we can always guarantee that some messages are read
// every `MAX_CHUNK_SIZE`.
}
tracing::info!("kafka reader finished");
}
}
1 change: 1 addition & 0 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -977,6 +977,7 @@ impl DdlController {
.await?;
if aborted {
tracing::warn!(id = job_id, "aborted streaming job");
// FIXME: might also need other cleanup here
if let Some(source_id) = source_id {
self.source_manager
.apply_source_change(SourceChange::DropSource {
Expand Down
28 changes: 22 additions & 6 deletions src/meta/src/stream/source_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ impl SourceManagerCore {
pub fn apply_source_change(&mut self, source_change: SourceChange) {
let mut added_source_fragments = Default::default();
let mut added_backfill_fragments = Default::default();
let mut finished_backfill_fragments = Default::default();
let mut split_assignment = Default::default();
let mut dropped_actors = Default::default();
let mut fragment_replacements = Default::default();
Expand All @@ -121,6 +122,11 @@ impl SourceManagerCore {
added_backfill_fragments = added_backfill_fragments_;
split_assignment = split_assignment_;
}
SourceChange::CreateJobFinished {
finished_backfill_fragments: finished_backfill_fragments_,
} => {
finished_backfill_fragments = finished_backfill_fragments_;
}
SourceChange::SplitChange(split_assignment_) => {
split_assignment = split_assignment_;
}
Expand Down Expand Up @@ -182,18 +188,20 @@ impl SourceManagerCore {
}

for (source_id, fragments) in added_backfill_fragments {
// Note: when the backfill fragment is considered created, backfill must be finished.
self.backfill_fragments
.entry(source_id)
.or_default()
.extend(fragments);
}

for (source_id, fragments) in finished_backfill_fragments {
let handle = self.managed_sources.get(&source_id).unwrap_or_else(|| {
panic!(
"source {} not found when adding backfill fragments {:?}",
source_id, fragments
);
});
handle.finish_backfill(fragments.iter().map(|(id, _up_id)| *id).collect());
self.backfill_fragments
.entry(source_id)
.or_default()
.extend(fragments);
}

for (_, actor_splits) in split_assignment {
Expand Down Expand Up @@ -486,12 +494,20 @@ impl SourceManager {

pub enum SourceChange {
/// `CREATE SOURCE` (shared), or `CREATE MV`.
/// Note this is applied _after_ the job is successfully created (`post_collect` barrier).
/// This is applied after the job is successfully created (`post_collect` barrier).
CreateJob {
added_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
/// (`source_id`, -> (`source_backfill_fragment_id`, `upstream_source_fragment_id`))
added_backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
split_assignment: SplitAssignment,
},
/// `CREATE SOURCE` (shared), or `CREATE MV` is _finished_ (backfill is done).
/// This is applied after `wait_streaming_job_finished`.
/// XXX: Should we merge `CreateJob` into this?
CreateJobFinished {
/// (`source_id`, -> (`source_backfill_fragment_id`, `upstream_source_fragment_id`))
finished_backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
},
SplitChange(SplitAssignment),
/// `DROP SOURCE` or `DROP MV`
DropSource {
Expand Down
3 changes: 3 additions & 0 deletions src/meta/src/stream/source_manager/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,20 +365,23 @@ impl ConnectorSourceWorkerHandle {
}

pub fn drop_fragments(&self, fragment_ids: Vec<FragmentId>) {
tracing::debug!("drop_fragments: {:?}", fragment_ids);
if let Err(e) = self.send_command(SourceWorkerCommand::DropFragments(fragment_ids)) {
// ignore drop fragment error, just log it
tracing::warn!(error = %e.as_report(), "failed to drop fragments");
}
}

pub fn finish_backfill(&self, fragment_ids: Vec<FragmentId>) {
tracing::debug!("finish_backfill: {:?}", fragment_ids);
if let Err(e) = self.send_command(SourceWorkerCommand::FinishBackfill(fragment_ids)) {
// ignore error, just log it
tracing::warn!(error = %e.as_report(), "failed to finish backfill");
}
}

pub fn terminate(&self, dropped_fragments: Option<BTreeSet<FragmentId>>) {
tracing::debug!("terminate: {:?}", dropped_fragments);
if let Some(dropped_fragments) = dropped_fragments {
self.drop_fragments(dropped_fragments.into_iter().collect());
}
Expand Down
7 changes: 6 additions & 1 deletion src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use crate::manager::{
MetaSrvEnv, MetadataManager, NotificationVersion, StreamingJob, StreamingJobType,
};
use crate::model::{ActorId, FragmentId, StreamJobFragments, TableParallelism};
use crate::stream::SourceManagerRef;
use crate::stream::{SourceChange, SourceManagerRef};
use crate::{MetaError, MetaResult};

pub type GlobalStreamManagerRef = Arc<GlobalStreamManager>;
Expand Down Expand Up @@ -392,6 +392,10 @@ impl GlobalStreamManager {
.await?,
);

let source_change = SourceChange::CreateJobFinished {
finished_backfill_fragments: stream_job_fragments.source_backfill_fragments()?,
};

let info = CreateStreamingJobCommandInfo {
stream_job_fragments,
upstream_root_actors,
Expand Down Expand Up @@ -444,6 +448,7 @@ impl GlobalStreamManager {
.metadata_manager
.wait_streaming_job_finished(streaming_job.id() as _)
.await?;
self.source_manager.apply_source_change(source_change).await;
tracing::debug!(?streaming_job, "stream job finish");
Ok(version)
}
Expand Down
8 changes: 8 additions & 0 deletions src/stream/src/executor/source/source_backfill_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,13 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
.await?;

if self.should_report_finished(&backfill_stage.states) {
// drop the backfill kafka consumers
backfill_stream = select_with_strategy(
input.by_ref().map(Either::Left),
futures::stream::pending().boxed().map(Either::Right),
select_strategy,
);

self.progress.finish(
barrier.epoch,
backfill_stage.total_backfilled_rows(),
Expand Down Expand Up @@ -734,6 +741,7 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
}
}

std::mem::drop(backfill_stream);
let mut states = backfill_stage.states;
// Make sure `Finished` state is persisted.
self.backfill_state_store.set_states(states.clone()).await?;
Expand Down

0 comments on commit 226e404

Please sign in to comment.