Skip to content

Commit

Permalink
fix & lock to be safer
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan committed Jan 15, 2025
1 parent 4da5a88 commit eb6edbf
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 16 deletions.
15 changes: 15 additions & 0 deletions src/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,18 @@ pub fn current_cluster_version() -> String {
PG_VERSION, RW_VERSION, GIT_SHA
)
}

/// Panics if `debug_assertions` is set, otherwise logs a warning.
///
/// Note: unlike `panic` which returns `!`, this macro returns `()`,
/// which cannot be used like `result.unwrap_or_else(|| panic_if_debug!(...))`.
#[macro_export]
macro_rules! panic_if_debug {
($($arg:tt)*) => {
if cfg!(debug_assertions) {
panic!($($arg)*)
} else {
tracing::warn!($($arg)*)
}
};
}
3 changes: 2 additions & 1 deletion src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1256,10 +1256,11 @@ impl DdlController {
..
} = release_ctx;

let _guard = self.source_manager.pause_tick().await;
self.stream_manager
.drop_streaming_jobs(
risingwave_common::catalog::DatabaseId::new(database_id as _),
removed_actors.into_iter().map(|id| id as _).collect(),
removed_actors.iter().map(|id| *id as _).collect(),
removed_streaming_job_ids,
removed_state_table_ids,
removed_fragments.iter().map(|id| *id as _).collect(),
Expand Down
3 changes: 1 addition & 2 deletions src/meta/src/stream/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2336,8 +2336,7 @@ impl GlobalStreamManager {
fragment_actors,
};

tracing::debug!("pausing tick lock in source manager");
let _source_pause_guard = self.source_manager.paused.lock().await;
let _guard = self.source_manager.pause_tick().await;

self.barrier_scheduler
.run_config_change_command_with_pause(database_id, command)
Expand Down
33 changes: 20 additions & 13 deletions src/meta/src/stream/source_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::time::Duration;
use anyhow::Context;
use risingwave_common::catalog::{DatabaseId, TableId};
use risingwave_common::metrics::LabelGuardedIntGauge;
use risingwave_common::panic_if_debug;
use risingwave_connector::error::ConnectorResult;
use risingwave_connector::source::{
fill_adaptive_split, ConnectorProperties, SourceEnumeratorContext, SourceEnumeratorInfo,
Expand All @@ -37,7 +38,7 @@ use risingwave_pb::stream_plan::update_mutation::MergeUpdate;
use risingwave_pb::stream_plan::Dispatcher;
use thiserror_ext::AsReport;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tokio::sync::{oneshot, Mutex};
use tokio::sync::{oneshot, Mutex, MutexGuard};
use tokio::task::JoinHandle;
use tokio::time::MissedTickBehavior;
use tokio::{select, time};
Expand Down Expand Up @@ -236,13 +237,13 @@ impl SourceManagerCore {
managed_fragment_ids.remove(fragment_id);
dropped_ids.push(*fragment_id);
}
let handle = self.managed_sources.get(&source_id).unwrap_or_else(|| {
panic!(
"source {} not found when dropping fragment {:?}",
source_id, dropped_ids
if let Some(handle) = self.managed_sources.get(&source_id) {
handle.drop_fragments(dropped_ids);
} else {
panic_if_debug!(
"source {source_id} not found when dropping fragment {dropped_ids:?}",
);
});
handle.drop_fragments(dropped_ids);
}
if managed_fragment_ids.is_empty() {
entry.remove();
}
Expand All @@ -256,13 +257,13 @@ impl SourceManagerCore {
}
}
if !dropped_ids.is_empty() {
let handle = self.managed_sources.get(source_id).unwrap_or_else(|| {
panic!(
"source {} not found when dropping fragment {:?}",
source_id, dropped_ids
if let Some(handle) = self.managed_sources.get(source_id) {
handle.drop_fragments(dropped_ids);
} else {
panic_if_debug!(
"source {source_id} not found when dropping fragment {dropped_ids:?}",
);
});
handle.drop_fragments(dropped_ids);
}
}
}
}
Expand Down Expand Up @@ -467,6 +468,12 @@ impl SourceManager {
}
}
}

/// Pause the tick loop in source manager until the returned guard is dropped.
pub async fn pause_tick(&self) -> MutexGuard<'_, ()> {
tracing::debug!("pausing tick lock in source manager");
self.paused.lock().await
}
}

pub enum SourceChange {
Expand Down

0 comments on commit eb6edbf

Please sign in to comment.