diff --git a/src/ray/util/BUILD b/src/ray/util/BUILD index 80162cd425d14..f723e84131039 100644 --- a/src/ray/util/BUILD +++ b/src/ray/util/BUILD @@ -269,6 +269,7 @@ ray_cc_library( srcs = ["pipe_logger.cc"], deps = [ ":compat", + ":spdlog_fd_sink", ":stream_redirection_options", ":thread_utils", ":util", diff --git a/src/ray/util/pipe_logger.cc b/src/ray/util/pipe_logger.cc index 97536a047d6be..5d594fb7e1bdf 100644 --- a/src/ray/util/pipe_logger.cc +++ b/src/ray/util/pipe_logger.cc @@ -24,6 +24,7 @@ #include #include "absl/strings/str_split.h" +#include "ray/util/spdlog_fd_sink.h" #include "ray/util/thread_utils.h" #include "spdlog/sinks/basic_file_sink.h" #include "spdlog/sinks/rotating_file_sink.h" @@ -39,24 +40,13 @@ struct StreamDumper { std::deque content ABSL_GUARDED_BY(mu); }; -// Used to write to dup-ed stdout and stderr; use shared pointer to make it copy -// constructible. -struct StdOstream { - std::shared_ptr> - stdout_ostream; - std::shared_ptr> - stderr_ostream; -}; - // Start two threads: // 1. A reader thread which continuously reads from [pipe_stream] until close; // 2. A dumper thread which writes content to sink via [write_func]. -template void StartStreamDump( std::shared_ptr> pipe_instream, - WriteFunc write_func, - FlushFunc flush_func, + std::shared_ptr logger, std::function on_close_completion) { auto stream_dumper = std::make_shared(); @@ -70,11 +60,6 @@ void StartStreamDump( // Exit at pipe read EOF. while (std::getline(*pipe_instream, newline)) { - // Backfill newliner for current segment. - if (!pipe_instream->eof()) { - newline += '\n'; - } - absl::MutexLock lock(&stream_dumper->mu); stream_dumper->content.emplace_back(std::move(newline)); } @@ -87,8 +72,7 @@ void StartStreamDump( }).detach(); std::thread([stream_dumper = stream_dumper, - write_func = std::move(write_func), - flush_func = std::move(flush_func), + logger = std::move(logger), on_close_completion = std::move(on_close_completion)]() { SetThreadName("PipeDumpThd"); @@ -108,14 +92,14 @@ void StartStreamDump( curline = std::move(stream_dumper->content.front()); stream_dumper->content.pop_front(); } else if (stream_dumper->stopped) { - flush_func(); + logger->flush(); on_close_completion(); return; } } // Perform IO operation out of critical section. - write_func(std::move(curline)); + logger->log(spdlog::level::info, std::move(curline)); } }).detach(); } @@ -123,7 +107,10 @@ void StartStreamDump( // Create a spdlog logger with all sinks specified by the given option. std::shared_ptr CreateLogger( const StreamRedirectionOption &stream_redirect_opt) { - std::vector logging_sinks; + // TODO(hjiang): Could optimize to reduce heap allocation. + std::vector sinks; + + // Setup file sink. spdlog::sink_ptr file_sink = nullptr; if (stream_redirect_opt.rotation_max_size != std::numeric_limits::max()) { file_sink = std::make_shared( @@ -135,9 +122,56 @@ std::shared_ptr CreateLogger( stream_redirect_opt.file_path); } file_sink->set_level(spdlog::level::info); + sinks.emplace_back(std::move(file_sink)); + + // Setup fd sink for stdout and stderr. +#if defined(__APPLE__) || defined(__linux__) + if (stream_redirect_opt.tee_to_stdout) { + int duped_stdout_fd = dup(STDOUT_FILENO); + RAY_CHECK_NE(duped_stdout_fd, -1) << "Fails to duplicate stdout: " << strerror(errno); + auto stdout_sink = std::make_shared(duped_stdout_fd); + sinks.emplace_back(std::move(stdout_sink)); + } + if (stream_redirect_opt.tee_to_stderr) { + int duped_stderr_fd = dup(STDERR_FILENO); + RAY_CHECK_NE(duped_stderr_fd, -1) << "Fails to duplicate stderr: " << strerror(errno); + auto stderr_sink = std::make_shared(duped_stderr_fd); + sinks.emplace_back(std::move(stderr_sink)); + } + +#elif defined(_WIN32) + if (stream_redirect_opt.tee_to_stdout) { + HANDLE duped_stdout_handle; + BOOL result = DuplicateHandle(GetCurrentProcess(), + GetStdHandle(STD_OUTPUT_HANDLE), + GetCurrentProcess(), + &duped_stdout_handle, + 0, + FALSE, + DUPLICATE_SAME_ACCESS); + RAY_CHECK(result) << "Fails to duplicate stdout handle"; + auto stdout_sink = std::make_shared(duped_stdout_handle); + sinks.emplace_back(std::move(stdout_sink)); + } + if (stream_redirect_opt.tee_to_stderr) { + HANDLE duped_stderr_handle; + BOOL result = DuplicateHandle(GetCurrentProcess(), + GetStdHandle(STD_ERROR_HANDLE), + GetCurrentProcess(), + &duped_stderr_handle, + 0, + FALSE, + DUPLICATE_SAME_ACCESS); + RAY_CHECK(result) << "Fails to duplicate stderr handle"; + auto stderr_sink = std::make_shared(duped_stderr_handle); + sinks.emplace_back(std::move(stderr_sink)); + } +#endif + auto logger = std::make_shared( /*name=*/absl::StrFormat("pipe-logger-%s", stream_redirect_opt.file_path), - std::move(file_sink)); + std::make_move_iterator(sinks.begin()), + std::make_move_iterator(sinks.end())); logger->set_level(spdlog::level::info); logger->set_pattern("%v"); // Only message string is logged. return logger; @@ -154,27 +188,22 @@ bool ShouldUsePipeStream(const StreamRedirectionOption &stream_redirect_opt) { } RedirectionFileHandle OpenFileForRedirection(const std::string &file_path) { - boost::iostreams::file_descriptor_sink sink{file_path, std::ios_base::out}; - auto handle = sink.handle(); + boost::iostreams::file_descriptor_sink fd_sink{file_path, std::ios_base::out}; + auto handle = fd_sink.handle(); auto ostream = std::make_shared>( - std::move(sink)); - auto flush_fn = [ostream, handle]() { - // Flush stream internal buffer to fd. - ostream->flush(); -// Flush file handle. -#if defined(__APPLE__) || defined(__linux__) - RAY_CHECK_EQ(fdatasync(handle), 0); -#elif defined(_WIN32) - RAY_CHECK(FlushFileBuffers(handle)); -#endif - }; - auto close_fn = [flush_fn, ostream]() { - flush_fn(); - ostream->close(); - }; - return RedirectionFileHandle{ - handle, std::move(ostream), std::move(flush_fn), std::move(close_fn)}; + std::move(fd_sink)); + + auto logger_sink = std::make_shared(handle); + auto logger = std::make_shared( + /*name=*/absl::StrFormat("pipe-logger-%s", file_path), std::move(logger_sink)); + logger->set_level(spdlog::level::info); + logger->set_pattern("%v"); // Only message string is logged. + + // Lifecycle for the file handle is bound at [ostream] thus [close_fn]. + auto close_fn = [ostream = std::move(ostream)]() { ostream->close(); }; + + return RedirectionFileHandle{handle, std::move(logger), std::move(close_fn)}; } } // namespace @@ -195,87 +224,22 @@ RedirectionFileHandle CreateRedirectionFileHandle( // Invoked after flush and close finished. auto on_close_completion = [promise = promise]() { promise->set_value(); }; - StdOstream std_ostream{}; - #if defined(__APPLE__) || defined(__linux__) - if (stream_redirect_opt.tee_to_stdout) { - int duped_stdout_fd = dup(STDOUT_FILENO); - RAY_CHECK_NE(duped_stdout_fd, -1) << "Fails to duplicate stdout: " << strerror(errno); - - boost::iostreams::file_descriptor_sink sink{ - duped_stdout_fd, /*file_descriptor_flags=*/boost::iostreams::close_handle}; - std_ostream.stdout_ostream = std::make_shared< - boost::iostreams::stream>( - std::move(sink)); - } - if (stream_redirect_opt.tee_to_stderr) { - int duped_stderr_fd = dup(STDERR_FILENO); - RAY_CHECK_NE(duped_stderr_fd, -1) << "Fails to duplicate stderr: " << strerror(errno); - - boost::iostreams::file_descriptor_sink sink{ - duped_stderr_fd, /*file_descriptor_flags=*/boost::iostreams::close_handle}; - std_ostream.stderr_ostream = std::make_shared< - boost::iostreams::stream>( - std::move(sink)); - } - int pipefd[2] = {0}; RAY_CHECK_EQ(pipe(pipefd), 0); int read_handle = pipefd[0]; int write_handle = pipefd[1]; - boost::iostreams::file_descriptor_source pipe_read_source{ - read_handle, /*file_descriptor_flags=*/boost::iostreams::close_handle}; - boost::iostreams::file_descriptor_sink pipe_write_sink{ - write_handle, /*file_descriptor_flags=*/boost::iostreams::close_handle}; - #elif defined(_WIN32) - if (stream_redirect_opt.tee_to_stdout) { - HANDLE duped_stdout_handle; - BOOL result = DuplicateHandle(GetCurrentProcess(), - GetStdHandle(STD_OUTPUT_HANDLE), - GetCurrentProcess(), - &duped_stdout_handle, - 0, - FALSE, - DUPLICATE_SAME_ACCESS); - RAY_CHECK(result) << "Fails to duplicate stdout handle"; - - boost::iostreams::file_descriptor_sink sink{ - duped_stdout_handle, /*file_descriptor_flags=*/boost::iostreams::close_handle}; - std_ostream.stdout_ostream = std::make_shared< - boost::iostreams::stream>( - std::move(sink)); - } - if (stream_redirect_opt.tee_to_stderr) { - HANDLE duped_stderr_handle; - BOOL result = DuplicateHandle(GetCurrentProcess(), - GetStdHandle(STD_ERROR_HANDLE), - GetCurrentProcess(), - &duped_stderr_handle, - 0, - FALSE, - DUPLICATE_SAME_ACCESS); - RAY_CHECK(result) << "Fails to duplicate stderr handle"; - - boost::iostreams::file_descriptor_sink sink{ - duped_stderr_handle, /*file_descriptor_flags=*/boost::iostreams::close_handle}; - std_ostream.stderr_ostream = std::make_shared< - boost::iostreams::stream>( - std::move(sink)); - } - HANDLE read_handle = nullptr; HANDLE write_handle = nullptr; SECURITY_ATTRIBUTES sa = {sizeof(SECURITY_ATTRIBUTES), nullptr, TRUE}; RAY_CHECK(CreatePipe(&read_handle, &write_handle, &sa, 0)) << "Fails to create pipe"; +#endif + boost::iostreams::file_descriptor_source pipe_read_source{ - read_handle, - /*file_descriptor_flags=*/boost::iostreams::close_handle}; + read_handle, /*file_descriptor_flags=*/boost::iostreams::close_handle}; boost::iostreams::file_descriptor_sink pipe_write_sink{ - write_handle, - /*file_descriptor_flags=*/boost::iostreams::close_handle}; - -#endif + write_handle, /*file_descriptor_flags=*/boost::iostreams::close_handle}; auto pipe_instream = std::make_shared< boost::iostreams::stream>( @@ -291,51 +255,10 @@ RedirectionFileHandle CreateRedirectionFileHandle( }; auto logger = CreateLogger(stream_redirect_opt); - - // [content] is exactly what application writes to pipe, including the trailing - // newliner, if any. - auto write_fn = [logger, - stream_redirect_opt = stream_redirect_opt, - std_ostream = std_ostream](std::string content) { - if (stream_redirect_opt.tee_to_stdout) { - std_ostream.stdout_ostream->write(content.data(), content.length()); - RAY_CHECK(std_ostream.stdout_ostream->good()); - } - if (stream_redirect_opt.tee_to_stderr) { - std_ostream.stderr_ostream->write(content.data(), content.length()); - RAY_CHECK(std_ostream.stderr_ostream->good()); - } - if (logger != nullptr) { - // spdlog adds newliner for every content, no need to maintan the application-passed - // one. - if (!content.empty() && content.back() == '\n') { - content.pop_back(); - } - logger->log(spdlog::level::info, content); - } - }; - auto flush_fn = - [logger, stream_redirect_opt = stream_redirect_opt, std_ostream = std_ostream]() { - if (logger != nullptr) { - logger->flush(); - } - if (stream_redirect_opt.tee_to_stdout) { - std_ostream.stdout_ostream->flush(); - RAY_CHECK(std_ostream.stdout_ostream->good()); - } - if (stream_redirect_opt.tee_to_stderr) { - std_ostream.stderr_ostream->flush(); - RAY_CHECK(std_ostream.stderr_ostream->good()); - } - }; - - StartStreamDump(std::move(pipe_instream), - std::move(write_fn), - flush_fn, - std::move(on_close_completion)); + StartStreamDump(std::move(pipe_instream), logger, std::move(on_close_completion)); RedirectionFileHandle redirection_file_handle{ - write_handle, std::move(pipe_ostream), std::move(flush_fn), std::move(close_fn)}; + write_handle, logger, std::move(close_fn)}; return redirection_file_handle; } diff --git a/src/ray/util/pipe_logger.h b/src/ray/util/pipe_logger.h index bfee9f120f3c6..7c2377466fbe9 100644 --- a/src/ray/util/pipe_logger.h +++ b/src/ray/util/pipe_logger.h @@ -53,19 +53,12 @@ class RedirectionFileHandle { // @param termination_synchronizer is used to block wait until destruction operation // finishes. - RedirectionFileHandle( - MEMFD_TYPE_NON_UNIQUE write_handle, - std::shared_ptr> - pipe_ostream, - std::function flush_fn, - std::function close_fn) + RedirectionFileHandle(MEMFD_TYPE_NON_UNIQUE write_handle, + std::shared_ptr logger, + std::function close_fn) : write_handle_(write_handle), - pipe_ostream_(std::move(pipe_ostream)), - flush_fn_(std::move(flush_fn)), - close_fn_(std::move(close_fn)) { - RAY_CHECK(flush_fn_); - RAY_CHECK(close_fn_); - } + logger_(std::move(logger)), + close_fn_(std::move(close_fn)) {} RedirectionFileHandle(const RedirectionFileHandle &) = delete; RedirectionFileHandle &operator=(const RedirectionFileHandle &) = delete; ~RedirectionFileHandle() = default; @@ -73,8 +66,7 @@ class RedirectionFileHandle { RedirectionFileHandle(RedirectionFileHandle &&rhs) { write_handle_ = rhs.write_handle_; rhs.write_handle_ = INVALID_FD; - pipe_ostream_ = std::move(rhs.pipe_ostream_); - flush_fn_ = std::move(rhs.flush_fn_); + logger_ = std::move(rhs.logger_); close_fn_ = std::move(rhs.close_fn_); } RedirectionFileHandle &operator=(RedirectionFileHandle &&rhs) { @@ -83,18 +75,17 @@ class RedirectionFileHandle { } write_handle_ = rhs.write_handle_; rhs.write_handle_ = INVALID_FD; - pipe_ostream_ = std::move(rhs.pipe_ostream_); - flush_fn_ = std::move(rhs.flush_fn_); + logger_ = std::move(rhs.logger_); close_fn_ = std::move(rhs.close_fn_); return *this; } void Close() { if (write_handle_ != INVALID_FD) { close_fn_(); - write_handle_ = INVALID_FD; - // Unset flush and close functor to close logger and underlying file handle. - flush_fn_ = nullptr; + // Destruct all resources. + write_handle_ = INVALID_FD; + logger_ = nullptr; close_fn_ = nullptr; } } @@ -104,27 +95,29 @@ class RedirectionFileHandle { // TODO(hjiang): Current method only flushes whatever we send to logger, but not those // in the pipe; a better approach is flush pipe, send FLUSH indicator and block wait // until logger sync over. - void Flush() { - RAY_CHECK(flush_fn_); - flush_fn_(); - } + void Flush() { logger_->flush(); } MEMFD_TYPE_NON_UNIQUE GetWriteHandle() const { return write_handle_; } // Write the given data into redirection handle; currently only for testing usage. - void CompleteWrite(const char *data, size_t len) { pipe_ostream_->write(data, len); } + // + // TODO(hjiang): Use platform compatible API, see + // https://github.com/ray-project/ray/pull/50170 + void CompleteWrite(const char *data, size_t len) { +#if defined(__APPLE__) || defined(__linux__) + [[maybe_unused]] auto x = write(write_handle_, data, len); +#elif defined(_WIN32) + DWORD bytes_written; + [[maybe_unused]] auto x = + WriteFile(write_handle_, data, (DWORD)len, &bytes_written, NULL); +#endif + } private: MEMFD_TYPE_NON_UNIQUE write_handle_; - // A high-level wrapper for [write_handle_]. - std::shared_ptr> - pipe_ostream_; - - // Used to flush log message. - std::function flush_fn_; + std::shared_ptr logger_; - // Used to close write handle, and block until destruction completes std::function close_fn_; }; diff --git a/src/ray/util/tests/pipe_logger_test.cc b/src/ray/util/tests/pipe_logger_test.cc index cc6a5b1e7042c..9f2a9509736ac 100644 --- a/src/ray/util/tests/pipe_logger_test.cc +++ b/src/ray/util/tests/pipe_logger_test.cc @@ -150,7 +150,7 @@ TEST(PipeLoggerCompatTest, CompatibilityTest) { stream_redirection_handle.Close(); const std::string stdout_content = testing::internal::GetCapturedStdout(); - EXPECT_EQ(stdout_content, kContent); + EXPECT_EQ(stdout_content, "hello\n"); // Pipe logger automatically adds a newliner at the end. const auto actual_content = ReadEntireFile(test_file_path); @@ -175,7 +175,7 @@ TEST(PipeLoggerCompatTest, CompatibilityTest) { stream_redirection_handle.Close(); const std::string stdout_content = testing::internal::GetCapturedStdout(); - EXPECT_EQ(stdout_content, kContent); + EXPECT_EQ(stdout_content, "hello\n"); const auto actual_content = ReadEntireFile(test_file_path); RAY_ASSERT_OK(actual_content); @@ -199,7 +199,7 @@ TEST(PipeLoggerCompatTest, CompatibilityTest) { stream_redirection_handle.Close(); const std::string stdout_content = testing::internal::GetCapturedStdout(); - EXPECT_EQ(stdout_content, kContent); + EXPECT_EQ(stdout_content, "hello\nworld\n"); // Pipe logger automatically adds a newliner at the end. const auto actual_content = ReadEntireFile(test_file_path); @@ -224,7 +224,7 @@ TEST(PipeLoggerCompatTest, CompatibilityTest) { stream_redirection_handle.Close(); const std::string stdout_content = testing::internal::GetCapturedStdout(); - EXPECT_EQ(stdout_content, kContent); + EXPECT_EQ(stdout_content, "hello\nworld\n"); const auto actual_content = ReadEntireFile(test_file_path); RAY_EXPECT_OK(actual_content); @@ -248,7 +248,7 @@ TEST(PipeLoggerCompatTest, CompatibilityTest) { stream_redirection_handle.Close(); const std::string stdout_content = testing::internal::GetCapturedStdout(); - EXPECT_EQ(stdout_content, kContent); + EXPECT_EQ(stdout_content, "helloworld\n\n\n"); const auto actual_content = ReadEntireFile(test_file_path); RAY_EXPECT_OK(actual_content); @@ -272,7 +272,7 @@ TEST(PipeLoggerCompatTest, CompatibilityTest) { stream_redirection_handle.Close(); const std::string stdout_content = testing::internal::GetCapturedStdout(); - EXPECT_EQ(stdout_content, kContent); + EXPECT_EQ(stdout_content, "hello\n\n\nworld\n"); // Pipe logger automatically adds a newliner at the end. const auto actual_content = ReadEntireFile(test_file_path); @@ -297,7 +297,7 @@ TEST(PipeLoggerCompatTest, CompatibilityTest) { stream_redirection_handle.Close(); const std::string stdout_content = testing::internal::GetCapturedStdout(); - EXPECT_EQ(stdout_content, kContent); + EXPECT_EQ(stdout_content, "hello\n\nworld\n\n"); // Pipe logger automatically adds a newliner at the end. const auto actual_content = ReadEntireFile(test_file_path);