diff --git a/python/ray/_private/runtime_env/agent/runtime_env_agent.py b/python/ray/_private/runtime_env/agent/runtime_env_agent.py index 59959f19aad3a..87b5c9d2d807f 100644 --- a/python/ray/_private/runtime_env/agent/runtime_env_agent.py +++ b/python/ray/_private/runtime_env/agent/runtime_env_agent.py @@ -178,8 +178,22 @@ def __init__( runtime_env_agent_port, ): super().__init__() - self._runtime_env_dir = runtime_env_dir + + self._logger = default_logger self._logging_params = logging_params + self._logging_params.update(filename=self.LOG_FILENAME) + self._logger = setup_component_logger( + logger_name=default_logger.name, **self._logging_params + ) + # Don't propagate logs to the root logger, because these logs + # might contain sensitive information. Instead, these logs should + # be confined to the runtime env agent log file `self.LOG_FILENAME`. + self._logger.propagate = False + + self._logger.info("Starting runtime env agent at pid %s", os.getpid()) + self._logger.info(f"Parent raylet pid is {os.environ.get('RAY_RAYLET_PID')}") + + self._runtime_env_dir = runtime_env_dir self._gcs_address = gcs_address self._per_job_logger_cache = dict() # Cache the results of creating envs to avoid repeatedly calling into @@ -232,18 +246,6 @@ def __init__( self.unused_runtime_env_processor, ) - self._logger = default_logger - self._logging_params.update(filename=self.LOG_FILENAME) - self._logger = setup_component_logger( - logger_name=default_logger.name, **self._logging_params - ) - # Don't propagate logs to the root logger, because these logs - # might contain sensitive information. Instead, these logs should - # be confined to the runtime env agent log file `self.LOG_FILENAME`. - self._logger.propagate = False - - self._logger.info("Starting runtime env agent at pid %s", os.getpid()) - self._logger.info("Parent raylet pid is %s", int(os.environ["RAY_RAYLET_PID"])) self._logger.info( "Listening to address %s, port %d", address, runtime_env_agent_port ) diff --git a/python/ray/tests/test_runtime_env.py b/python/ray/tests/test_runtime_env.py index 304d213a0f491..044455ec6a77c 100644 --- a/python/ray/tests/test_runtime_env.py +++ b/python/ray/tests/test_runtime_env.py @@ -282,11 +282,11 @@ def test_container_option_serialize(runtime_env_class): @pytest.mark.skipif(sys.platform == "win32", reason="Flaky on Windows.") @pytest.mark.parametrize("runtime_env_class", [dict, RuntimeEnv]) -def test_no_spurious_worker_startup(shutdown_only, runtime_env_class): +def test_no_spurious_worker_startup(shutdown_only, runtime_env_class, monkeypatch): """Test that no extra workers start up during a long env installation.""" # Causes agent to sleep for 15 seconds to simulate creating a runtime env. - os.environ["RAY_RUNTIME_ENV_SLEEP_FOR_TESTING_S"] = "15" + monkeypatch.setenv("RAY_RUNTIME_ENV_SLEEP_FOR_TESTING_S", "15") ray.init(num_cpus=1) @ray.remote @@ -343,10 +343,9 @@ def get_num_workers(): @pytest.fixture -def runtime_env_local_dev_env_var(): - os.environ["RAY_RUNTIME_ENV_LOCAL_DEV_MODE"] = "1" +def runtime_env_local_dev_env_var(monkeypatch): + monkeypatch.setenv("RAY_RUNTIME_ENV_LOCAL_DEV_MODE", "1") yield - del os.environ["RAY_RUNTIME_ENV_LOCAL_DEV_MODE"] @pytest.mark.skipif(sys.platform == "win32", reason="very slow on Windows.") @@ -388,6 +387,62 @@ def f(): ray.get(f.remote()) +RT_ENV_AGENT_SLOW_STARTUP_PLUGIN_CLASS_PATH = ( + "ray.tests.test_runtime_env.RtEnvAgentSlowStartupPlugin" # noqa +) +RT_ENV_AGENT_SLOW_STARTUP_PLUGIN_NAME = "RtEnvAgentSlowStartupPlugin" +RT_ENV_AGENT_SLOW_STARTUP_PLUGIN_CLASS_PATH = ( + "ray.tests.test_runtime_env.RtEnvAgentSlowStartupPlugin" +) + + +class RtEnvAgentSlowStartupPlugin(RuntimeEnvPlugin): + + name = RT_ENV_AGENT_SLOW_STARTUP_PLUGIN_NAME + + def __init__(self): + # This happens in Runtime Env Agent start up process. Make it slow. + import time + + time.sleep(5) + print("starting...") + + +@pytest.mark.parametrize( + "set_runtime_env_plugins", + [ + '[{"class":"' + RT_ENV_AGENT_SLOW_STARTUP_PLUGIN_CLASS_PATH + '"}]', + ], + indirect=True, +) +def test_slow_runtime_env_agent_startup_on_task_pressure( + shutdown_only, set_runtime_env_plugins +): + """ + Starts nodes with runtime env agent and a slow plugin. Then when the runtime env + agent is still starting up, we submit a lot of tasks to the cluster. The tasks + should wait for the runtime env agent to start up and then run. + https://github.com/ray-project/ray/issues/45353 + """ + ray.init() + + @ray.remote(num_cpus=0.1) + def get_foo(): + return os.environ.get("foo") + + print("Submitting 20 tasks...") + + # Each task has a different runtime env to ensure the agent is invoked for each. + vals = ray.get( + [ + get_foo.options(runtime_env={"env_vars": {"foo": f"bar{i}"}}).remote() + for i in range(20) + ] + ) + print("20 tasks done.") + assert vals == [f"bar{i}" for i in range(20)] + + class TestURICache: def test_zero_cache_size(self): uris_to_sizes = {"5": 5, "3": 3} @@ -502,11 +557,10 @@ def delete_fn(uri, logger): @pytest.fixture -def enable_dev_mode(local_env_var_enabled): +def enable_dev_mode(local_env_var_enabled, monkeypatch): enabled = "1" if local_env_var_enabled else "0" - os.environ["RAY_RUNTIME_ENV_LOG_TO_DRIVER_ENABLED"] = enabled + monkeypatch.setenv("RAY_RUNTIME_ENV_LOG_TO_DRIVER_ENABLED", enabled) yield - del os.environ["RAY_RUNTIME_ENV_LOG_TO_DRIVER_ENABLED"] @pytest.mark.skipif( diff --git a/src/ray/raylet/runtime_env_agent_client.cc b/src/ray/raylet/runtime_env_agent_client.cc index 49bbe7e78f2f2..8b0477ede999c 100644 --- a/src/ray/raylet/runtime_env_agent_client.cc +++ b/src/ray/raylet/runtime_env_agent_client.cc @@ -45,12 +45,17 @@ namespace { // Will call callback exactly once with pair{non-ok, any} or pair{ok, reply body}. // // Hard coded behavior: +// - method is POST. +// - version is HTTP/1.1. // - content type is "application/octet-stream". -// - connection has no timeout (i.e. waits forever. This is because runtime env agent can +// - connection has infinite timeout (This is because runtime env agent can // work for a long time.) -// - on_resolve and on_connect failures return NotFound. This allows retry on the -// server not (yet) started up. -// - on_read and on_write failures return IOError. +// +// Error handling: (return means invoking the fail_callback with the error) +// - on_resolve and on_connect failures return NotFound. +// - on_write and on_read failures return Disconnected. +// - if the HTTP response is received and well-formed, but the status code is not OK, +// return IOError. // // Spirit from // https://www.boost.org/doc/libs/develop/libs/beast/example/http/client/async/http_client_async.cpp @@ -63,6 +68,7 @@ class Session : public std::enable_shared_from_this { static std::shared_ptr Create(net::io_context &ioc, std::string_view host, std::string_view port, + http::verb method, std::string_view target, std::string body, std::function succ_callback, @@ -72,6 +78,7 @@ class Session : public std::enable_shared_from_this { return std::shared_ptr(new Session(ioc, host, port, + method, target, std::move(body), std::move(succ_callback), @@ -96,6 +103,7 @@ class Session : public std::enable_shared_from_this { explicit Session(net::io_context &ioc, std::string_view host, std::string_view port, + http::verb method, std::string_view target, std::string body, std::function succ_callback, @@ -104,18 +112,19 @@ class Session : public std::enable_shared_from_this { stream_(ioc), host_(std::string(host)), port_(std::string(port)), + method_(method), succ_callback_(std::move(succ_callback)), fail_callback_(std::move(fail_callback)) { stream_.expires_never(); - req_.method(http::verb::post); + req_.method(method_); req_.target(target); req_.body() = std::move(body); + req_.version(11); // HTTP/1.1 req_.set(http::field::host, host); req_.set(http::field::user_agent, BOOST_BEAST_VERSION_STRING); req_.set(http::field::content_type, "application/octet-stream"); - // aiohttp has a bug that, if you don't set this value, it returns 400. - // https://github.com/aio-libs/aiohttp/issues/7208 - req_.content_length(req_.body().size()); + // Sets Content-Length header. + req_.prepare_payload(); } void Failed(ray::Status status) { @@ -134,6 +143,7 @@ class Session : public std::enable_shared_from_this { return; } + stream_.expires_never(); // Make the connection on the IP address we get from a lookup stream_.async_connect( results, beast::bind_front_handler(&Session::on_connect, shared_from_this())); @@ -145,19 +155,20 @@ class Session : public std::enable_shared_from_this { return; } + stream_.expires_never(); // Send the HTTP request to the remote host http::async_write( stream_, req_, beast::bind_front_handler(&Session::on_write, shared_from_this())); } void on_write(beast::error_code ec, std::size_t bytes_transferred) { - boost::ignore_unused(bytes_transferred); - if (ec) { - Failed(ray::Status::IOError("on_write " + ec.message())); + Failed(ray::Status::Disconnected("on_write " + ec.message() + + ", bytes_transferred " + + std::to_string(bytes_transferred))); return; } - + stream_.expires_never(); // Receive the HTTP response http::async_read(stream_, buffer_, @@ -166,16 +177,17 @@ class Session : public std::enable_shared_from_this { } void on_read(beast::error_code ec, std::size_t bytes_transferred) { - boost::ignore_unused(bytes_transferred); - if (ec) { - Failed(ray::Status::IOError("on_read " + ec.message())); + Failed(ray::Status::Disconnected("on_read " + ec.message() + + ", bytes_transferred " + + std::to_string(bytes_transferred))); return; } + if (http::to_status_class(res_.result()) == http::status_class::successful) { Succeeded(std::move(res_).body()); } else { - Failed(ray::Status::IOError(absl::StrCat("POST result non-ok status code ", + Failed(ray::Status::IOError(absl::StrCat("HTTP request returns non-ok status code ", res_.result_int(), ", body", std::move(res_).body()))); @@ -193,6 +205,7 @@ class Session : public std::enable_shared_from_this { beast::tcp_stream stream_; std::string host_; std::string port_; + http::verb method_; std::function succ_callback_; std::function fail_callback_; beast::flat_buffer buffer_; // (Must persist between reads) @@ -293,12 +306,13 @@ class HttpRuntimeEnvAgentClient : public RuntimeEnvAgentClient { delay_executor_([]() { QuickExit(); }, /*ms*/ 10000); } - /// @brief Invokes `try_invoke_once`. If it fails with a ray::Status::NotFound error, - /// retries every after `agent_manager_retry_interval_ms` up until `deadline` passed. - /// After which, fail_callback is called with the NotFound error from `try_invoke_once`. + /// @brief Invokes `try_invoke_once`. If it fails with a network error, retries every + /// after `agent_manager_retry_interval_ms` up until `deadline` passed. After which, + /// fail_callback is called with the NotFound error from `try_invoke_once`. /// - /// Note that retry only happens on network errors. Application errors returned by the - /// server are not retried. + /// Note that retry only happens on network errors, i.e. NotFound and Disconnected, on + /// which cases we did not receive a well-formed HTTP response. Application errors + /// returned by the server are not retried. /// /// If the retries took so long and exceeded deadline, Raylet exits immediately. Note /// the check happens after `try_invoke_once` returns. This means if you have a @@ -316,14 +330,13 @@ class HttpRuntimeEnvAgentClient : public RuntimeEnvAgentClient { FailCallback fail_callback, int64_t deadline_ms) { try_invoke_once(succ_callback, [=](ray::Status status) { - if (!status.IsNotFound()) { + if ((!status.IsNotFound()) && (!status.IsDisconnected())) { // Non retryable errors, invoke fail_callback fail_callback(status); } else if (current_time_ms() > deadline_ms) { - RAY_LOG(ERROR) << "Runtime Env Agent timed out as NotFound in " - << agent_register_timeout_ms_ << "ms. Status: " << status - << ", address: " << this->address_ << ", port: " << this->port_str_ - << ", Suiciding..."; + RAY_LOG(ERROR) << "Runtime Env Agent timed out in " << agent_register_timeout_ms_ + << "ms. Status: " << status << ", address: " << this->address_ + << ", port: " << this->port_str_ << ", existing immediately..."; ExitImmediately(); } else { RAY_LOG(INFO) << "Runtime Env Agent network error: " << status @@ -415,6 +428,7 @@ class HttpRuntimeEnvAgentClient : public RuntimeEnvAgentClient { io_context_, address_, port_str_, + http::verb::post, HTTP_PATH_GET_OR_CREATE_RUNTIME_ENV, std::move(payload), /*succ_callback=*/ @@ -481,6 +495,7 @@ class HttpRuntimeEnvAgentClient : public RuntimeEnvAgentClient { io_context_, address_, port_str_, + http::verb::post, HTTP_PATH_DELETE_RUNTIME_ENV_IF_POSSIBLE, std::move(payload), /*succ_callback=*/