Skip to content

Commit

Permalink
feat(meta): show recovery cause when request is rejected (#13836)
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao authored Dec 6, 2023
1 parent 5af41d3 commit 2fba274
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 29 deletions.
14 changes: 3 additions & 11 deletions src/meta/service/src/scale_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,7 @@ impl ScaleService for ScaleServiceImpl {
&self,
request: Request<RescheduleRequest>,
) -> Result<Response<RescheduleResponse>, Status> {
if !self.barrier_manager.is_running().await {
return Err(Status::unavailable(
"Rescheduling is unavailable for now. Likely the cluster is starting or recovering.",
));
}
self.barrier_manager.check_status_running().await?;

let RescheduleRequest {
reschedules,
Expand Down Expand Up @@ -190,13 +186,9 @@ impl ScaleService for ScaleServiceImpl {
&self,
request: Request<GetReschedulePlanRequest>,
) -> Result<Response<GetReschedulePlanResponse>, Status> {
let req = request.into_inner();
self.barrier_manager.check_status_running().await?;

if !self.barrier_manager.is_running().await {
return Err(Status::unavailable(
"Rescheduling is unavailable for now. Likely the cluster is starting or recovering.",
));
}
let req = request.into_inner();

let _reschedule_job_lock = self.stream_manager.reschedule_lock.read().await;

Expand Down
33 changes: 27 additions & 6 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,20 @@ pub(crate) type TableDefinitionMap = TableMap<String>;
pub(crate) type TableNotifierMap = TableMap<Notifier>;
pub(crate) type TableFragmentMap = TableMap<TableFragments>;

/// The reason why the cluster is recovering.
enum RecoveryReason {
/// After bootstrap.
Bootstrap,
/// After failure.
Failover(MetaError),
}

/// Status of barrier manager.
enum BarrierManagerStatus {
/// Barrier manager is starting.
Starting,
/// Barrier manager is under recovery.
Recovering,
Recovering(RecoveryReason),
/// Barrier manager is running.
Running,
}
Expand Down Expand Up @@ -566,10 +574,19 @@ impl GlobalBarrierManager {
(join_handle, shutdown_tx)
}

/// Return whether the barrier manager is running.
pub async fn is_running(&self) -> bool {
/// Check the status of barrier manager, return error if it is not `Running`.
pub async fn check_status_running(&self) -> MetaResult<()> {
let status = self.status.lock().await;
matches!(*status, BarrierManagerStatus::Running)
match &*status {
BarrierManagerStatus::Starting
| BarrierManagerStatus::Recovering(RecoveryReason::Bootstrap) => {
bail!("The cluster is bootstrapping")
}
BarrierManagerStatus::Recovering(RecoveryReason::Failover(e)) => {
Err(anyhow::anyhow!(e.clone()).context("The cluster is recovering"))?
}
BarrierManagerStatus::Running => Ok(()),
}
}

/// Set barrier manager status.
Expand Down Expand Up @@ -631,7 +648,8 @@ impl GlobalBarrierManager {
// consistency.
// Even if there's no actor to recover, we still go through the recovery process to
// inject the first `Initial` barrier.
self.set_status(BarrierManagerStatus::Recovering).await;
self.set_status(BarrierManagerStatus::Recovering(RecoveryReason::Bootstrap))
.await;
let span = tracing::info_span!("bootstrap_recovery", prev_epoch = prev_epoch.value().0);

let paused = self.take_pause_on_bootstrap().await.unwrap_or(false);
Expand Down Expand Up @@ -1010,7 +1028,10 @@ impl GlobalBarrierManager {
}

if self.enable_recovery {
self.set_status(BarrierManagerStatus::Recovering).await;
self.set_status(BarrierManagerStatus::Recovering(RecoveryReason::Failover(
err.clone(),
)))
.await;
let latest_snapshot = self.hummock_manager.latest_snapshot();
let prev_epoch = TracedEpoch::new(latest_snapshot.committed_epoch.into()); // we can only recovery from the committed epoch
let span = tracing::info_span!(
Expand Down
13 changes: 1 addition & 12 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,24 +235,13 @@ impl DdlController {
}
}

/// `check_barrier_manager_status` checks the status of the barrier manager, return unavailable
/// when it's not running.
async fn check_barrier_manager_status(&self) -> MetaResult<()> {
if !self.barrier_manager.is_running().await {
return Err(MetaError::unavailable(
"The cluster is starting or recovering",
));
}
Ok(())
}

/// `run_command` spawns a tokio coroutine to execute the target ddl command. When the client
/// has been interrupted during executing, the request will be cancelled by tonic. Since we have
/// a lot of logic for revert, status management, notification and so on, ensuring consistency
/// would be a huge hassle and pain if we don't spawn here.
pub async fn run_command(&self, command: DdlCommand) -> MetaResult<NotificationVersion> {
if !command.allow_in_recovery() {
self.check_barrier_manager_status().await?;
self.barrier_manager.check_status_running().await?;
}
let ctrl = self.clone();
let fut = async move {
Expand Down

0 comments on commit 2fba274

Please sign in to comment.