diff --git a/src/meta/src/barrier/context/recovery.rs b/src/meta/src/barrier/context/recovery.rs index c13eaabf90822..2dd75e1bb57c9 100644 --- a/src/meta/src/barrier/context/recovery.rs +++ b/src/meta/src/barrier/context/recovery.rs @@ -427,13 +427,13 @@ impl GlobalBarrierWorkerContextImpl { .collect(); if expired_worker_slots.is_empty() { - debug!("no expired worker slots, skipping."); + info!("no expired worker slots, skipping."); return self.resolve_graph_info(None).await; } - debug!("start migrate actors."); + info!("start migrate actors."); let mut to_migrate_worker_slots = expired_worker_slots.into_iter().rev().collect_vec(); - debug!("got to migrate worker slots {:#?}", to_migrate_worker_slots); + info!("got to migrate worker slots {:#?}", to_migrate_worker_slots); let mut inuse_worker_slots: HashSet<_> = all_inuse_worker_slots .intersection(&active_worker_slots) @@ -535,6 +535,8 @@ impl GlobalBarrierWorkerContextImpl { warn!(?changed, "get worker changed or timed out. Retry migrate"); } + info!("migration plan {:?}", plan); + mgr.catalog_controller.migrate_actors(plan).await?; info!("migrate actors succeed."); diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index f2c95b0f59e3b..207b4dfdbd28f 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -1125,12 +1125,17 @@ impl CatalogController { .insert(*actor_id); } - let expired_workers: HashSet<_> = plan.keys().map(|k| k.worker_id() as WorkerId).collect(); + let expired_or_changed_workers: HashSet<_> = + plan.keys().map(|k| k.worker_id() as WorkerId).collect(); let mut actor_migration_plan = HashMap::new(); for (worker, fragment) in actor_locations { - if expired_workers.contains(&worker) { - for (_, actors) in fragment { + if expired_or_changed_workers.contains(&worker) { + for (fragment_id, actors) in fragment { + debug!( + "worker {} expired or changed, migrating fragment {}", + worker, fragment_id + ); let worker_slot_to_actor: HashMap<_, _> = actors .iter() .enumerate() @@ -1140,8 +1145,9 @@ impl CatalogController { .collect(); for (worker_slot, actor) in worker_slot_to_actor { - actor_migration_plan - .insert(actor, plan[&worker_slot].worker_id() as WorkerId); + if let Some(target) = plan.get(&worker_slot) { + actor_migration_plan.insert(actor, target.worker_id() as WorkerId); + } } } }