Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Use spdlog fd sink within pipe logger #50173

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/ray/util/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ ray_cc_library(
srcs = ["pipe_logger.cc"],
deps = [
":compat",
":spdlog_fd_sink",
":stream_redirection_options",
":thread_utils",
":util",
Expand Down
231 changes: 77 additions & 154 deletions src/ray/util/pipe_logger.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <thread>

#include "absl/strings/str_split.h"
#include "ray/util/spdlog_fd_sink.h"
#include "ray/util/thread_utils.h"
#include "spdlog/sinks/basic_file_sink.h"
#include "spdlog/sinks/rotating_file_sink.h"
Expand All @@ -39,24 +40,13 @@ struct StreamDumper {
std::deque<std::string> content ABSL_GUARDED_BY(mu);
};

// Used to write to dup-ed stdout and stderr; use shared pointer to make it copy
// constructible.
struct StdOstream {
std::shared_ptr<boost::iostreams::stream<boost::iostreams::file_descriptor_sink>>
stdout_ostream;
std::shared_ptr<boost::iostreams::stream<boost::iostreams::file_descriptor_sink>>
stderr_ostream;
};

// Start two threads:
// 1. A reader thread which continuously reads from [pipe_stream] until close;
// 2. A dumper thread which writes content to sink via [write_func].
template <typename WriteFunc, typename FlushFunc>
void StartStreamDump(
std::shared_ptr<boost::iostreams::stream<boost::iostreams::file_descriptor_source>>
pipe_instream,
WriteFunc write_func,
FlushFunc flush_func,
std::shared_ptr<spdlog::logger> logger,
std::function<void()> on_close_completion) {
auto stream_dumper = std::make_shared<StreamDumper>();

Expand All @@ -70,11 +60,6 @@ void StartStreamDump(

// Exit at pipe read EOF.
while (std::getline(*pipe_instream, newline)) {
// Backfill newliner for current segment.
if (!pipe_instream->eof()) {
newline += '\n';
}

absl::MutexLock lock(&stream_dumper->mu);
stream_dumper->content.emplace_back(std::move(newline));
}
Expand All @@ -87,8 +72,7 @@ void StartStreamDump(
}).detach();

std::thread([stream_dumper = stream_dumper,
write_func = std::move(write_func),
flush_func = std::move(flush_func),
logger = std::move(logger),
on_close_completion = std::move(on_close_completion)]() {
SetThreadName("PipeDumpThd");

Expand All @@ -108,22 +92,25 @@ void StartStreamDump(
curline = std::move(stream_dumper->content.front());
stream_dumper->content.pop_front();
} else if (stream_dumper->stopped) {
flush_func();
logger->flush();
on_close_completion();
return;
}
}

// Perform IO operation out of critical section.
write_func(std::move(curline));
logger->log(spdlog::level::info, std::move(curline));
}
}).detach();
}

// Create a spdlog logger with all sinks specified by the given option.
std::shared_ptr<spdlog::logger> CreateLogger(
const StreamRedirectionOption &stream_redirect_opt) {
std::vector<spdlog::sink_ptr> logging_sinks;
// TODO(hjiang): Could optimize to reduce heap allocation.
std::vector<spdlog::sink_ptr> sinks;

// Setup file sink.
spdlog::sink_ptr file_sink = nullptr;
if (stream_redirect_opt.rotation_max_size != std::numeric_limits<size_t>::max()) {
file_sink = std::make_shared<spdlog::sinks::rotating_file_sink_mt>(
Expand All @@ -135,9 +122,56 @@ std::shared_ptr<spdlog::logger> CreateLogger(
stream_redirect_opt.file_path);
}
file_sink->set_level(spdlog::level::info);
sinks.emplace_back(std::move(file_sink));

// Setup fd sink for stdout and stderr.
#if defined(__APPLE__) || defined(__linux__)
if (stream_redirect_opt.tee_to_stdout) {
int duped_stdout_fd = dup(STDOUT_FILENO);
RAY_CHECK_NE(duped_stdout_fd, -1) << "Fails to duplicate stdout: " << strerror(errno);
auto stdout_sink = std::make_shared<non_owned_fd_sink_st>(duped_stdout_fd);
sinks.emplace_back(std::move(stdout_sink));
}
if (stream_redirect_opt.tee_to_stderr) {
int duped_stderr_fd = dup(STDERR_FILENO);
RAY_CHECK_NE(duped_stderr_fd, -1) << "Fails to duplicate stderr: " << strerror(errno);
auto stderr_sink = std::make_shared<non_owned_fd_sink_st>(duped_stderr_fd);
sinks.emplace_back(std::move(stderr_sink));
}

#elif defined(_WIN32)
if (stream_redirect_opt.tee_to_stdout) {
HANDLE duped_stdout_handle;
BOOL result = DuplicateHandle(GetCurrentProcess(),
GetStdHandle(STD_OUTPUT_HANDLE),
GetCurrentProcess(),
&duped_stdout_handle,
0,
FALSE,
DUPLICATE_SAME_ACCESS);
RAY_CHECK(result) << "Fails to duplicate stdout handle";
auto stdout_sink = std::make_shared<non_owned_fd_sink_st>(duped_stdout_handle);
sinks.emplace_back(std::move(stdout_sink));
}
if (stream_redirect_opt.tee_to_stderr) {
HANDLE duped_stderr_handle;
BOOL result = DuplicateHandle(GetCurrentProcess(),
GetStdHandle(STD_ERROR_HANDLE),
GetCurrentProcess(),
&duped_stderr_handle,
0,
FALSE,
DUPLICATE_SAME_ACCESS);
RAY_CHECK(result) << "Fails to duplicate stderr handle";
auto stderr_sink = std::make_shared<non_owned_fd_sink_st>(duped_stderr_handle);
sinks.emplace_back(std::move(stderr_sink));
}
#endif

auto logger = std::make_shared<spdlog::logger>(
/*name=*/absl::StrFormat("pipe-logger-%s", stream_redirect_opt.file_path),
std::move(file_sink));
std::make_move_iterator(sinks.begin()),
std::make_move_iterator(sinks.end()));
logger->set_level(spdlog::level::info);
logger->set_pattern("%v"); // Only message string is logged.
return logger;
Expand All @@ -154,27 +188,22 @@ bool ShouldUsePipeStream(const StreamRedirectionOption &stream_redirect_opt) {
}

RedirectionFileHandle OpenFileForRedirection(const std::string &file_path) {
boost::iostreams::file_descriptor_sink sink{file_path, std::ios_base::out};
auto handle = sink.handle();
boost::iostreams::file_descriptor_sink fd_sink{file_path, std::ios_base::out};
auto handle = fd_sink.handle();
auto ostream =
std::make_shared<boost::iostreams::stream<boost::iostreams::file_descriptor_sink>>(
std::move(sink));
auto flush_fn = [ostream, handle]() {
// Flush stream internal buffer to fd.
ostream->flush();
// Flush file handle.
#if defined(__APPLE__) || defined(__linux__)
RAY_CHECK_EQ(fdatasync(handle), 0);
#elif defined(_WIN32)
RAY_CHECK(FlushFileBuffers(handle));
#endif
};
auto close_fn = [flush_fn, ostream]() {
flush_fn();
ostream->close();
};
return RedirectionFileHandle{
handle, std::move(ostream), std::move(flush_fn), std::move(close_fn)};
std::move(fd_sink));

auto logger_sink = std::make_shared<non_owned_fd_sink_st>(handle);
auto logger = std::make_shared<spdlog::logger>(
/*name=*/absl::StrFormat("pipe-logger-%s", file_path), std::move(logger_sink));
logger->set_level(spdlog::level::info);
logger->set_pattern("%v"); // Only message string is logged.

Comment on lines +197 to +202
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the case for no tee no rotation? In this case, we don't write to the logger?

Copy link
Contributor Author

@dentiny dentiny Feb 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes you're right, logger is not necessity here, but otherwise we will have two implementation for StreamRedirectionHandle,

  • one using file descriptor, and take flush_fn
  • another using logger, no need for flush_fn, which beats the purpose for this PR to simplify code structure

// Lifecycle for the file handle is bound at [ostream] thus [close_fn].
auto close_fn = [ostream = std::move(ostream)]() { ostream->close(); };

return RedirectionFileHandle{handle, std::move(logger), std::move(close_fn)};
}
} // namespace

Expand All @@ -195,87 +224,22 @@ RedirectionFileHandle CreateRedirectionFileHandle(
// Invoked after flush and close finished.
auto on_close_completion = [promise = promise]() { promise->set_value(); };

StdOstream std_ostream{};

#if defined(__APPLE__) || defined(__linux__)
if (stream_redirect_opt.tee_to_stdout) {
int duped_stdout_fd = dup(STDOUT_FILENO);
RAY_CHECK_NE(duped_stdout_fd, -1) << "Fails to duplicate stdout: " << strerror(errno);

boost::iostreams::file_descriptor_sink sink{
duped_stdout_fd, /*file_descriptor_flags=*/boost::iostreams::close_handle};
std_ostream.stdout_ostream = std::make_shared<
boost::iostreams::stream<boost::iostreams::file_descriptor_sink>>(
std::move(sink));
}
if (stream_redirect_opt.tee_to_stderr) {
int duped_stderr_fd = dup(STDERR_FILENO);
RAY_CHECK_NE(duped_stderr_fd, -1) << "Fails to duplicate stderr: " << strerror(errno);

boost::iostreams::file_descriptor_sink sink{
duped_stderr_fd, /*file_descriptor_flags=*/boost::iostreams::close_handle};
std_ostream.stderr_ostream = std::make_shared<
boost::iostreams::stream<boost::iostreams::file_descriptor_sink>>(
std::move(sink));
}

int pipefd[2] = {0};
RAY_CHECK_EQ(pipe(pipefd), 0);
int read_handle = pipefd[0];
int write_handle = pipefd[1];
boost::iostreams::file_descriptor_source pipe_read_source{
read_handle, /*file_descriptor_flags=*/boost::iostreams::close_handle};
boost::iostreams::file_descriptor_sink pipe_write_sink{
write_handle, /*file_descriptor_flags=*/boost::iostreams::close_handle};

#elif defined(_WIN32)
if (stream_redirect_opt.tee_to_stdout) {
HANDLE duped_stdout_handle;
BOOL result = DuplicateHandle(GetCurrentProcess(),
GetStdHandle(STD_OUTPUT_HANDLE),
GetCurrentProcess(),
&duped_stdout_handle,
0,
FALSE,
DUPLICATE_SAME_ACCESS);
RAY_CHECK(result) << "Fails to duplicate stdout handle";

boost::iostreams::file_descriptor_sink sink{
duped_stdout_handle, /*file_descriptor_flags=*/boost::iostreams::close_handle};
std_ostream.stdout_ostream = std::make_shared<
boost::iostreams::stream<boost::iostreams::file_descriptor_sink>>(
std::move(sink));
}
if (stream_redirect_opt.tee_to_stderr) {
HANDLE duped_stderr_handle;
BOOL result = DuplicateHandle(GetCurrentProcess(),
GetStdHandle(STD_ERROR_HANDLE),
GetCurrentProcess(),
&duped_stderr_handle,
0,
FALSE,
DUPLICATE_SAME_ACCESS);
RAY_CHECK(result) << "Fails to duplicate stderr handle";

boost::iostreams::file_descriptor_sink sink{
duped_stderr_handle, /*file_descriptor_flags=*/boost::iostreams::close_handle};
std_ostream.stderr_ostream = std::make_shared<
boost::iostreams::stream<boost::iostreams::file_descriptor_sink>>(
std::move(sink));
}

HANDLE read_handle = nullptr;
HANDLE write_handle = nullptr;
SECURITY_ATTRIBUTES sa = {sizeof(SECURITY_ATTRIBUTES), nullptr, TRUE};
RAY_CHECK(CreatePipe(&read_handle, &write_handle, &sa, 0)) << "Fails to create pipe";
#endif

boost::iostreams::file_descriptor_source pipe_read_source{
read_handle,
/*file_descriptor_flags=*/boost::iostreams::close_handle};
read_handle, /*file_descriptor_flags=*/boost::iostreams::close_handle};
boost::iostreams::file_descriptor_sink pipe_write_sink{
write_handle,
/*file_descriptor_flags=*/boost::iostreams::close_handle};

#endif
write_handle, /*file_descriptor_flags=*/boost::iostreams::close_handle};

auto pipe_instream = std::make_shared<
boost::iostreams::stream<boost::iostreams::file_descriptor_source>>(
Expand All @@ -291,51 +255,10 @@ RedirectionFileHandle CreateRedirectionFileHandle(
};

auto logger = CreateLogger(stream_redirect_opt);

// [content] is exactly what application writes to pipe, including the trailing
// newliner, if any.
auto write_fn = [logger,
stream_redirect_opt = stream_redirect_opt,
std_ostream = std_ostream](std::string content) {
if (stream_redirect_opt.tee_to_stdout) {
std_ostream.stdout_ostream->write(content.data(), content.length());
RAY_CHECK(std_ostream.stdout_ostream->good());
}
if (stream_redirect_opt.tee_to_stderr) {
std_ostream.stderr_ostream->write(content.data(), content.length());
RAY_CHECK(std_ostream.stderr_ostream->good());
}
if (logger != nullptr) {
// spdlog adds newliner for every content, no need to maintan the application-passed
// one.
if (!content.empty() && content.back() == '\n') {
content.pop_back();
}
logger->log(spdlog::level::info, content);
}
};
auto flush_fn =
[logger, stream_redirect_opt = stream_redirect_opt, std_ostream = std_ostream]() {
if (logger != nullptr) {
logger->flush();
}
if (stream_redirect_opt.tee_to_stdout) {
std_ostream.stdout_ostream->flush();
RAY_CHECK(std_ostream.stdout_ostream->good());
}
if (stream_redirect_opt.tee_to_stderr) {
std_ostream.stderr_ostream->flush();
RAY_CHECK(std_ostream.stderr_ostream->good());
}
};

StartStreamDump(std::move(pipe_instream),
std::move(write_fn),
flush_fn,
std::move(on_close_completion));
StartStreamDump(std::move(pipe_instream), logger, std::move(on_close_completion));

RedirectionFileHandle redirection_file_handle{
write_handle, std::move(pipe_ostream), std::move(flush_fn), std::move(close_fn)};
write_handle, logger, std::move(close_fn)};

return redirection_file_handle;
}
Expand Down
Loading