From c9512fafbd1ac3eda3d2cbc0e5aa8b449231ea1e Mon Sep 17 00:00:00 2001 From: Chris Olszewski Date: Fri, 26 Jul 2024 12:26:25 -0500 Subject: [PATCH 1/2] fix(watch): properly shut down persistent tasks --- crates/turborepo-lib/src/run/mod.rs | 23 +++++++++++++++++++ crates/turborepo-lib/src/run/watch.rs | 33 +++++++++++++++++++-------- crates/turborepo-ui/src/tui/app.rs | 3 +++ 3 files changed, 50 insertions(+), 9 deletions(-) diff --git a/crates/turborepo-lib/src/run/mod.rs b/crates/turborepo-lib/src/run/mod.rs index 16cf9731c5d60..0997a047713a3 100644 --- a/crates/turborepo-lib/src/run/mod.rs +++ b/crates/turborepo-lib/src/run/mod.rs @@ -207,6 +207,13 @@ impl Run { Ok(Some((sender, handle))) } + /// Returns a handle that can be used to stop a run + pub fn stopper(&self) -> RunStopper { + RunStopper { + manager: self.processes.clone(), + } + } + pub async fn run( &mut self, experimental_ui_sender: Option, @@ -215,6 +222,11 @@ impl Run { if let Some(subscriber) = self.signal_handler.subscribe() { let run_cache = self.run_cache.clone(); tokio::spawn(async move { + // Caching is disabled for watch so we don't need to wait on shutting down the + // cache. + if is_watch { + return; + } let _guard = subscriber.listen().await; let spinner = turborepo_ui::start_spinner("...Finishing writing to cache..."); if let Ok((status, closed)) = run_cache.shutdown_cache().await { @@ -439,3 +451,14 @@ impl Run { Ok(exit_code) } } + +#[derive(Debug, Clone)] +pub struct RunStopper { + manager: ProcessManager, +} + +impl RunStopper { + pub async fn stop(&self) { + self.manager.stop().await; + } +} diff --git a/crates/turborepo-lib/src/run/watch.rs b/crates/turborepo-lib/src/run/watch.rs index eecfbf8a79dd1..466bdd5e48e24 100644 --- a/crates/turborepo-lib/src/run/watch.rs +++ b/crates/turborepo-lib/src/run/watch.rs @@ -47,7 +47,7 @@ impl ChangedPackages { pub struct WatchClient { run: Run, watched_packages: HashSet, - persistent_tasks_handle: Option>>, + persistent_tasks_handle: Option, connector: DaemonConnector, base: CommandBase, telemetry: CommandEventBuilder, @@ -56,6 +56,11 @@ pub struct WatchClient { ui_handle: Option>>, } +struct PersistentRunHandle { + stopper: run::RunStopper, + run_task: JoinHandle>, +} + #[derive(Debug, Error, Diagnostic)] pub enum Error { #[error("failed to connect to daemon")] @@ -304,6 +309,14 @@ impl WatchClient { self.watched_packages = self.run.get_relevant_packages(); + // Clean up currently running persistent tasks + if let Some(PersistentRunHandle { stopper, run_task }) = + self.persistent_tasks_handle.take() + { + // Shut down the tasks for the run + stopper.stop().await; + run_task.abort(); + } if let Some(sender) = &self.ui_sender { let task_names = self.run.engine.tasks_with_command(&self.run.pkg_dep_graph); sender @@ -312,18 +325,20 @@ impl WatchClient { } if self.run.has_persistent_tasks() { - // Abort old run - if let Some(run) = self.persistent_tasks_handle.take() { - run.abort(); - } - + debug_assert!( + self.persistent_tasks_handle.is_none(), + "persistent handle should be empty before creating a new one" + ); let mut persistent_run = self.run.create_run_for_persistent_tasks(); let ui_sender = self.ui_sender.clone(); // If we have persistent tasks, we run them on a separate thread // since persistent tasks don't finish - self.persistent_tasks_handle = Some(tokio::spawn(async move { - persistent_run.run(ui_sender, true).await - })); + self.persistent_tasks_handle = Some(PersistentRunHandle { + stopper: persistent_run.stopper(), + run_task: tokio::spawn( + async move { persistent_run.run(ui_sender, true).await }, + ), + }); // But we still run the regular tasks blocking let mut non_persistent_run = self.run.create_run_without_persistent_tasks(); diff --git a/crates/turborepo-ui/src/tui/app.rs b/crates/turborepo-ui/src/tui/app.rs index bf8f008855011..d841475c07a21 100644 --- a/crates/turborepo-ui/src/tui/app.rs +++ b/crates/turborepo-ui/src/tui/app.rs @@ -139,6 +139,7 @@ impl App { /// If finished, removes from finished and starts again as new task. #[tracing::instrument(skip(self, output_logs))] pub fn start_task(&mut self, task: &str, output_logs: OutputLogs) -> Result<(), Error> { + debug!("starting {task}"); // Name of currently highlighted task. // We will use this after the order switches. let highlighted_task = self @@ -202,6 +203,7 @@ impl App { /// Errors if given task wasn't a running task #[tracing::instrument(skip(self, result))] pub fn finish_task(&mut self, task: &str, result: TaskResult) -> Result<(), Error> { + debug!("finishing task {task}"); // Name of currently highlighted task. // We will use this after the order switches. let highlighted_task = self @@ -265,6 +267,7 @@ impl App { #[tracing::instrument(skip(self))] pub fn update_tasks(&mut self, tasks: Vec) { + debug!("updating task list: {tasks:?}"); // Make sure all tasks have a terminal output for task in &tasks { self.tasks From dd793dd7378f9867b03396313cf342fde36ee21a Mon Sep 17 00:00:00 2001 From: Chris Olszewski Date: Fri, 26 Jul 2024 15:46:28 -0500 Subject: [PATCH 2/2] fix(watch): exit command if persistent task exits --- crates/turborepo-lib/src/run/watch.rs | 50 +++++++++++++++++++++------ 1 file changed, 40 insertions(+), 10 deletions(-) diff --git a/crates/turborepo-lib/src/run/watch.rs b/crates/turborepo-lib/src/run/watch.rs index 466bdd5e48e24..84c94c97b9ef3 100644 --- a/crates/turborepo-lib/src/run/watch.rs +++ b/crates/turborepo-lib/src/run/watch.rs @@ -100,6 +100,8 @@ pub enum Error { PackageChange(#[from] tonic::Status), #[error("could not connect to UI thread")] UISend(String), + #[error("persistent tasks exited unexpectedly")] + PersistentExit, } impl WatchClient { @@ -174,14 +176,32 @@ impl WatchClient { }; let run_fut = async { - loop { - notify_run.notified().await; + let mut persistent_exit = None; + 'outer: loop { + if let Some(persistent) = &mut persistent_exit { + // here we watch both notify *and* persistent task + // if notify exits, then continue per usual + // if persist exits, then we break out of loop with a + select! { + _ = notify_run.notified() => {}, + _ = persistent => { + break 'outer; + } + } + } else { + notify_run.notified().await; + } let changed_packages_guard = changed_packages.lock().await; if !changed_packages_guard.borrow().is_empty() { let changed_packages = changed_packages_guard.take(); - self.execute_run(changed_packages).await?; + let (_result, persistent_exit_rx) = self.execute_run(changed_packages).await?; + // Only update persist exit if a new one was created + if let Some(rx) = persistent_exit_rx { + persistent_exit = Some(rx); + } } } + Err(Error::PersistentExit) }; select! { @@ -231,7 +251,10 @@ impl WatchClient { Ok(()) } - async fn execute_run(&mut self, changed_packages: ChangedPackages) -> Result { + async fn execute_run( + &mut self, + changed_packages: ChangedPackages, + ) -> Result<(i32, Option>), Error> { // Should we recover here? match changed_packages { ChangedPackages::Some(packages) => { @@ -275,7 +298,7 @@ impl WatchClient { .build(&signal_handler, telemetry) .await?; - Ok(run.run(self.ui_sender.clone(), true).await?) + Ok((run.run(self.ui_sender.clone(), true).await?, None)) } ChangedPackages::All => { let mut args = self.base.args().clone(); @@ -333,18 +356,25 @@ impl WatchClient { let ui_sender = self.ui_sender.clone(); // If we have persistent tasks, we run them on a separate thread // since persistent tasks don't finish + let (persist_guard, persist_exit) = tokio::sync::oneshot::channel::<()>(); self.persistent_tasks_handle = Some(PersistentRunHandle { stopper: persistent_run.stopper(), - run_task: tokio::spawn( - async move { persistent_run.run(ui_sender, true).await }, - ), + run_task: tokio::spawn(async move { + // We move the guard in here so we can determine if the persist tasks + // exit as it'll go out of scope and drop. + let _guard = persist_guard; + persistent_run.run(ui_sender, true).await + }), }); // But we still run the regular tasks blocking let mut non_persistent_run = self.run.create_run_without_persistent_tasks(); - Ok(non_persistent_run.run(self.ui_sender.clone(), true).await?) + Ok(( + non_persistent_run.run(self.ui_sender.clone(), true).await?, + Some(persist_exit), + )) } else { - Ok(self.run.run(self.ui_sender.clone(), true).await?) + Ok((self.run.run(self.ui_sender.clone(), true).await?, None)) } } }