Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][runtime_env] Retry on read error in rt env agent client response read #45513

Merged
merged 6 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 15 additions & 13 deletions python/ray/_private/runtime_env/agent/runtime_env_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,22 @@ def __init__(
runtime_env_agent_port,
):
super().__init__()
self._runtime_env_dir = runtime_env_dir

self._logger = default_logger
self._logging_params = logging_params
self._logging_params.update(filename=self.LOG_FILENAME)
self._logger = setup_component_logger(
logger_name=default_logger.name, **self._logging_params
)
# Don't propagate logs to the root logger, because these logs
# might contain sensitive information. Instead, these logs should
# be confined to the runtime env agent log file `self.LOG_FILENAME`.
self._logger.propagate = False

self._logger.info("Starting runtime env agent at pid %s", os.getpid())
self._logger.info(f"Parent raylet pid is {os.environ.get('RAY_RAYLET_PID')}")

self._runtime_env_dir = runtime_env_dir
self._gcs_address = gcs_address
self._per_job_logger_cache = dict()
# Cache the results of creating envs to avoid repeatedly calling into
Expand Down Expand Up @@ -232,18 +246,6 @@ def __init__(
self.unused_runtime_env_processor,
)

self._logger = default_logger
self._logging_params.update(filename=self.LOG_FILENAME)
self._logger = setup_component_logger(
logger_name=default_logger.name, **self._logging_params
)
# Don't propagate logs to the root logger, because these logs
# might contain sensitive information. Instead, these logs should
# be confined to the runtime env agent log file `self.LOG_FILENAME`.
self._logger.propagate = False

self._logger.info("Starting runtime env agent at pid %s", os.getpid())
self._logger.info("Parent raylet pid is %s", int(os.environ["RAY_RAYLET_PID"]))
self._logger.info(
"Listening to address %s, port %d", address, runtime_env_agent_port
)
Expand Down
56 changes: 56 additions & 0 deletions python/ray/tests/test_runtime_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,62 @@ def f():
ray.get(f.remote())


RT_ENV_AGENT_SLOW_STARTUP_PLUGIN_CLASS_PATH = (
"ray.tests.test_runtime_env.RtEnvAgentSlowStartupPlugin" # noqa
)
RT_ENV_AGENT_SLOW_STARTUP_PLUGIN_NAME = "RtEnvAgentSlowStartupPlugin"
RT_ENV_AGENT_SLOW_STARTUP_PLUGIN_CLASS_PATH = (
"ray.tests.test_runtime_env.RtEnvAgentSlowStartupPlugin"
)


class RtEnvAgentSlowStartupPlugin(RuntimeEnvPlugin):

name = RT_ENV_AGENT_SLOW_STARTUP_PLUGIN_NAME

def __init__(self):
# This happens in Runtime Env Agent start up process. Make it slow.
import time

time.sleep(5)
print("starting...")


@pytest.mark.parametrize(
"set_runtime_env_plugins",
[
'[{"class":"' + RT_ENV_AGENT_SLOW_STARTUP_PLUGIN_CLASS_PATH + '"}]',
],
indirect=True,
)
def test_slow_runtime_env_agent_startup_on_task_pressure(
shutdown_only, set_runtime_env_plugins
):
"""
Starts nodes with runtime env agent and a slow plugin. Then when the runtime env
agent is still starting up, we submit a lot of tasks to the cluster. The tasks
should wait for the runtime env agent to start up and then run.
https://github.com/ray-project/ray/issues/45353
Comment on lines +423 to +425
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not accurate? since we won't wait but just retry

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we wait every 1s before retry

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually waits for agent_manager_retry_interval_ms_ = 100ms, until total = agent_register_timeout_ms = 10000ms = 10s and hard error out.

"""
ray.init()

@ray.remote(num_cpus=0.1)
def get_foo():
return os.environ.get("foo")

print("Submitting 20 tasks...")

# Each task has a different runtime env to ensure the agent is invoked for each.
vals = ray.get(
[
get_foo.options(runtime_env={"env_vars": {"foo": f"bar{i}"}}).remote()
for i in range(20)
]
)
print("20 tasks done.")
assert vals == [f"bar{i}" for i in range(20)]


class TestURICache:
def test_zero_cache_size(self):
uris_to_sizes = {"5": 5, "3": 3}
Expand Down
67 changes: 41 additions & 26 deletions src/ray/raylet/runtime_env_agent_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,17 @@ namespace {
// Will call callback exactly once with pair{non-ok, any} or pair{ok, reply body}.
//
// Hard coded behavior:
// - method is POST.
// - version is HTTP/1.1.
// - content type is "application/octet-stream".
// - connection has no timeout (i.e. waits forever. This is because runtime env agent can
// - connection has infinite timeout (This is because runtime env agent can
// work for a long time.)
// - on_resolve and on_connect failures return NotFound. This allows retry on the
// server not (yet) started up.
// - on_read and on_write failures return IOError.
//
// Error handling: (return means invoking the fail_callback with the error)
// - on_resolve and on_connect failures return NotFound.
// - on_write and on_read failures return Disconnected.
// - if the HTTP response is received and well-formed, but the status code is not OK,
// return IOError.
//
// Spirit from
// https://www.boost.org/doc/libs/develop/libs/beast/example/http/client/async/http_client_async.cpp
Expand All @@ -63,6 +68,7 @@ class Session : public std::enable_shared_from_this<Session> {
static std::shared_ptr<Session> Create(net::io_context &ioc,
std::string_view host,
std::string_view port,
http::verb method,
std::string_view target,
std::string body,
std::function<void(std::string)> succ_callback,
Expand All @@ -72,6 +78,7 @@ class Session : public std::enable_shared_from_this<Session> {
return std::shared_ptr<Session>(new Session(ioc,
host,
port,
method,
target,
std::move(body),
std::move(succ_callback),
Expand All @@ -96,6 +103,7 @@ class Session : public std::enable_shared_from_this<Session> {
explicit Session(net::io_context &ioc,
std::string_view host,
std::string_view port,
http::verb method,
std::string_view target,
std::string body,
std::function<void(std::string)> succ_callback,
Expand All @@ -104,18 +112,19 @@ class Session : public std::enable_shared_from_this<Session> {
stream_(ioc),
host_(std::string(host)),
port_(std::string(port)),
method_(method),
succ_callback_(std::move(succ_callback)),
fail_callback_(std::move(fail_callback)) {
stream_.expires_never();
req_.method(http::verb::post);
req_.method(method_);
req_.target(target);
req_.body() = std::move(body);
req_.version(11); // HTTP/1.1
req_.set(http::field::host, host);
req_.set(http::field::user_agent, BOOST_BEAST_VERSION_STRING);
req_.set(http::field::content_type, "application/octet-stream");
// aiohttp has a bug that, if you don't set this value, it returns 400.
// https://github.com/aio-libs/aiohttp/issues/7208
req_.content_length(req_.body().size());
// Sets Content-Length header.
req_.prepare_payload();
}

void Failed(ray::Status status) {
Expand All @@ -134,6 +143,7 @@ class Session : public std::enable_shared_from_this<Session> {
return;
}

stream_.expires_never();
// Make the connection on the IP address we get from a lookup
stream_.async_connect(
results, beast::bind_front_handler(&Session::on_connect, shared_from_this()));
Expand All @@ -145,19 +155,20 @@ class Session : public std::enable_shared_from_this<Session> {
return;
}

stream_.expires_never();
// Send the HTTP request to the remote host
http::async_write(
stream_, req_, beast::bind_front_handler(&Session::on_write, shared_from_this()));
}

void on_write(beast::error_code ec, std::size_t bytes_transferred) {
boost::ignore_unused(bytes_transferred);

if (ec) {
Failed(ray::Status::IOError("on_write " + ec.message()));
Failed(ray::Status::Disconnected("on_write " + ec.message() +
", bytes_transferred " +
std::to_string(bytes_transferred)));
return;
}

stream_.expires_never();
// Receive the HTTP response
http::async_read(stream_,
buffer_,
Expand All @@ -166,16 +177,17 @@ class Session : public std::enable_shared_from_this<Session> {
}

void on_read(beast::error_code ec, std::size_t bytes_transferred) {
boost::ignore_unused(bytes_transferred);

if (ec) {
Failed(ray::Status::IOError("on_read " + ec.message()));
Failed(ray::Status::Disconnected("on_read " + ec.message() +
", bytes_transferred " +
std::to_string(bytes_transferred)));
return;
}

if (http::to_status_class(res_.result()) == http::status_class::successful) {
Succeeded(std::move(res_).body());
} else {
Failed(ray::Status::IOError(absl::StrCat("POST result non-ok status code ",
Failed(ray::Status::IOError(absl::StrCat("HTTP request returns non-ok status code ",
res_.result_int(),
", body",
std::move(res_).body())));
Expand All @@ -193,6 +205,7 @@ class Session : public std::enable_shared_from_this<Session> {
beast::tcp_stream stream_;
std::string host_;
std::string port_;
http::verb method_;
std::function<void(std::string)> succ_callback_;
std::function<void(ray::Status)> fail_callback_;
beast::flat_buffer buffer_; // (Must persist between reads)
Expand Down Expand Up @@ -293,12 +306,13 @@ class HttpRuntimeEnvAgentClient : public RuntimeEnvAgentClient {
delay_executor_([]() { QuickExit(); }, /*ms*/ 10000);
}

/// @brief Invokes `try_invoke_once`. If it fails with a ray::Status::NotFound error,
/// retries every after `agent_manager_retry_interval_ms` up until `deadline` passed.
/// After which, fail_callback is called with the NotFound error from `try_invoke_once`.
/// @brief Invokes `try_invoke_once`. If it fails with a network error, retries every
/// after `agent_manager_retry_interval_ms` up until `deadline` passed. After which,
/// fail_callback is called with the NotFound error from `try_invoke_once`.
///
/// Note that retry only happens on network errors. Application errors returned by the
/// server are not retried.
/// Note that retry only happens on network errors, i.e. NotFound and Disconnected, on
/// which cases we did not receive a well-formed HTTP response. Application errors
/// returned by the server are not retried.
///
/// If the retries took so long and exceeded deadline, Raylet exits immediately. Note
/// the check happens after `try_invoke_once` returns. This means if you have a
Expand All @@ -316,14 +330,13 @@ class HttpRuntimeEnvAgentClient : public RuntimeEnvAgentClient {
FailCallback fail_callback,
int64_t deadline_ms) {
try_invoke_once(succ_callback, [=](ray::Status status) {
if (!status.IsNotFound()) {
if ((!status.IsNotFound()) && (!status.IsDisconnected())) {
// Non retryable errors, invoke fail_callback
fail_callback(status);
} else if (current_time_ms() > deadline_ms) {
RAY_LOG(ERROR) << "Runtime Env Agent timed out as NotFound in "
<< agent_register_timeout_ms_ << "ms. Status: " << status
<< ", address: " << this->address_ << ", port: " << this->port_str_
<< ", Suiciding...";
RAY_LOG(ERROR) << "Runtime Env Agent timed out in " << agent_register_timeout_ms_
<< "ms. Status: " << status << ", address: " << this->address_
<< ", port: " << this->port_str_ << ", existing immediately...";
ExitImmediately();
} else {
RAY_LOG(INFO) << "Runtime Env Agent network error: " << status
Expand Down Expand Up @@ -415,6 +428,7 @@ class HttpRuntimeEnvAgentClient : public RuntimeEnvAgentClient {
io_context_,
address_,
port_str_,
http::verb::post,
HTTP_PATH_GET_OR_CREATE_RUNTIME_ENV,
std::move(payload),
/*succ_callback=*/
Expand Down Expand Up @@ -481,6 +495,7 @@ class HttpRuntimeEnvAgentClient : public RuntimeEnvAgentClient {
io_context_,
address_,
port_str_,
http::verb::post,
HTTP_PATH_DELETE_RUNTIME_ENV_IF_POSSIBLE,
std::move(payload),
/*succ_callback=*/
Expand Down
Loading