diff --git a/amlb/utils/process.py b/amlb/utils/process.py index 97c557a86..3752f85b3 100644 --- a/amlb/utils/process.py +++ b/amlb/utils/process.py @@ -49,6 +49,10 @@ def file_lock(path, timeout=-1): yield +class StaleProcessError(subprocess.TimeoutExpired): + pass + + def run_subprocess( *popenargs, input=None, @@ -104,7 +108,16 @@ def communicate(process, input=None, timeout=None): except: # also handles kb interrupts process.kill() raise + retcode = process.poll() + if retcode is None: + # Process still lives => communication stopped because of activity timeout + process.kill() + process.wait() + raise StaleProcessError( + process.args, float("nan"), output=stdout, stderr=stderr + ) + if check and retcode: raise subprocess.CalledProcessError( retcode, process.args, output=stdout, stderr=stderr @@ -186,6 +199,9 @@ def read_pipe(pipe, timeout): pipes = as_list(pipe) # wait until a pipe is ready for reading, non-Windows only. ready, *_ = select.select(pipes, [], [], timeout) + # if a pipe is not ready it could be timeout or it could be end of process + # so at this point we do not know. Only after the communicate function is over do we know. + # i.e., if the process is still running it does not have a retcode. reads = [""] * len(pipes) # print update for each pipe that is ready for reading for i, p in enumerate(pipes): @@ -315,8 +331,6 @@ def communicate(*args, **kwargs): log.log(params.output_level, e.stdout) if e.stderr: log.log(params.error_level, e.stderr) - # error_tail = tail(e.stderr, 25) if e.stderr else 'Unknown Error' - # raise subprocess.SubprocessError("Error when running command `{cmd}`: {error}".format(cmd=full_cmd, error=error_tail)) raise e diff --git a/resources/config.yaml b/resources/config.yaml index d9976b1f6..b990f43d1 100644 --- a/resources/config.yaml +++ b/resources/config.yaml @@ -27,6 +27,7 @@ archive: ['logs'] # list of output folders that should be archived by defa setup: # configuration namespace for the framework setup phase. live_output: true # set to true to stream the output of setup commands, if false they are only printed when setup is complete. activity_timeout: 600 # when using live output, subprocess will be considered as hanging if nothing was printed during this activity time. + # No effect is live_output is set to 'False'. frameworks: # configuration namespace for the frameworks definitions. definition_file: # list of yaml files describing the frameworks base definitions. diff --git a/runbenchmark.py b/runbenchmark.py index 00cfef132..431e865ef 100644 --- a/runbenchmark.py +++ b/runbenchmark.py @@ -18,6 +18,8 @@ str2bool, str_sanitize, zip_path, + StaleProcessError, + Namespace, ) from amlb import log, AutoMLError from amlb.defaults import default_dirs @@ -358,9 +360,19 @@ args.mode, ) - bench.setup(amlb.SetupMode[args.setup]) - if args.setup != "only": - res = bench.run(args.task, args.fold) + try: + bench.setup(amlb.SetupMode[args.setup]) + except StaleProcessError as e: + setting = "setup.activity_timeout" + timeout = Namespace.get(amlb_res.config, setting) + log.error( + f"Process '{e.cmd}' was aborted after producing no output for {timeout} seconds. " + f"If the process is expected to take more time, please raise the '{setting}' limit." + ) + exit_code = 1 + else: + if args.setup != "only": + res = bench.run(args.task, args.fold) except (ValueError, AutoMLError) as e: log.error("\nERROR:\n%s", e) if extras.get("verbose") is True: diff --git a/tests/unit/amlb/utils/process/test_run.py b/tests/unit/amlb/utils/process/test_run.py new file mode 100644 index 000000000..e6df0d62d --- /dev/null +++ b/tests/unit/amlb/utils/process/test_run.py @@ -0,0 +1,20 @@ +import pytest + +from amlb.utils import StaleProcessError, run_cmd + + +@pytest.mark.parametrize("mode", ["line", "block", True]) +def test_subprocess_detects_stale_process(mode): + one_ms = 0.001 + with pytest.raises(StaleProcessError): + run_cmd(f"sleep {one_ms}", _activity_timeout_=one_ms / 2, _live_output_=mode) + + +def test_subprocess_does_not_raises_if_process_exits_early(): + run_cmd("echo hi", _activity_timeout_=1, _live_output_=True) + + +@pytest.mark.xfail(reason="Detection of stale processes currently requires output") +def test_subprocess_does_not_raises_on_silent_process(): + one_ms = 0.001 + run_cmd(f"sleep {one_ms}", _activity_timeout_=one_ms / 2, _live_output_=True)