Skip to content

Commit

Permalink
Use KvikIO to enable file's fast host read and host write (#17764)
Browse files Browse the repository at this point in the history
This PR makes the following improvements on I/O:
- Remove legacy cuFile integration to simplify code maintenance. Use KvikIO to manage the GDS setting and compatibility mode.
- Remove file utility classes and functions. Use KvikIO for all file-related operations.
- Replace in-house implementation of `host_read` (for `file_source`) and `host_write` (for `file_sink`) with KvikIO's parallel counterpart.
- Update the documentation on compatibility mode/GDS.

Closes #16418 
Issue #17228

Authors:
  - Tianyu Liu (https://github.com/kingcrimsontianyu)
  - Vukasin Milovanovic (https://github.com/vuule)

Approvers:
  - Vukasin Milovanovic (https://github.com/vuule)
  - Shruti Shivakumar (https://github.com/shrshi)
  - Bradley Dice (https://github.com/bdice)
  - Vyas Ramasubramani (https://github.com/vyasr)

URL: #17764
  • Loading branch information
kingcrimsontianyu authored Feb 13, 2025
1 parent e6b1c0f commit 725f9eb
Show file tree
Hide file tree
Showing 11 changed files with 82 additions and 867 deletions.
13 changes: 1 addition & 12 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ option(CUDA_ENABLE_LINEINFO
option(CUDA_WARNINGS_AS_ERRORS "Enable -Werror=all-warnings for all CUDA compilation" ON)
# cudart can be statically linked or dynamically linked. The python ecosystem wants dynamic linking
option(CUDA_STATIC_RUNTIME "Statically link the CUDA runtime" OFF)
option(CUDA_STATIC_CUFILE "Statically link cuFile" OFF)

set(DEFAULT_CUDF_BUILD_STREAMS_TEST_UTIL ON)
if(CUDA_STATIC_RUNTIME OR NOT BUILD_SHARED_LIBS)
Expand Down Expand Up @@ -546,7 +545,6 @@ add_library(
src/io/utilities/data_casting.cu
src/io/utilities/data_sink.cpp
src/io/utilities/datasource.cpp
src/io/utilities/file_io_utilities.cpp
src/io/utilities/row_selection.cpp
src/io/utilities/type_inference.cu
src/io/utilities/trie.cu
Expand Down Expand Up @@ -922,15 +920,6 @@ target_compile_definitions(
# Enable remote IO through KvikIO
target_compile_definitions(cudf PRIVATE $<$<BOOL:${CUDF_KVIKIO_REMOTE_IO}>:CUDF_KVIKIO_REMOTE_IO>)

# Enable cuFile support
set(_cufile_suffix)
if(CUDA_STATIC_CUFILE)
set(_cufile_suffix _static)
endif()
if(TARGET CUDA::cuFile${_cufile_suffix})
target_compile_definitions(cudf PRIVATE CUDF_CUFILE_FOUND)
endif()

# Remove this after upgrading to a CCCL that has a proper CMake option. See
# https://github.com/NVIDIA/cccl/pull/2844
target_compile_definitions(cudf PRIVATE THRUST_FORCE_32_BIT_OFFSET_TYPE=1)
Expand All @@ -943,7 +932,7 @@ target_link_libraries(
cudf
PUBLIC CCCL::CCCL rapids_logger::rapids_logger rmm::rmm $<BUILD_LOCAL_INTERFACE:BS::thread_pool>
PRIVATE $<BUILD_LOCAL_INTERFACE:nvtx3::nvtx3-cpp> cuco::cuco ZLIB::ZLIB nvcomp::nvcomp
kvikio::kvikio $<TARGET_NAME_IF_EXISTS:CUDA::cuFile${_cufile_suffix}> nanoarrow
kvikio::kvikio nanoarrow
)

# Add Conda library, and include paths if specified
Expand Down
21 changes: 3 additions & 18 deletions cpp/include/cudf/io/config_utils.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
* Copyright (c) 2024-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,22 +19,7 @@

namespace CUDF_EXPORT cudf {
namespace io {
namespace cufile_integration {

/**
* @brief Returns true if cuFile and its compatibility mode are enabled.
*/
bool is_always_enabled();

/**
* @brief Returns true if only direct IO through cuFile is enabled (compatibility mode is disabled).
*/
bool is_gds_enabled();

/**
* @brief Returns true if KvikIO is enabled.
*/
bool is_kvikio_enabled();
namespace kvikio_integration {

/**
* @brief Set KvikIO parameters, including:
Expand All @@ -45,7 +30,7 @@ bool is_kvikio_enabled();
*/
void set_up_kvikio();

} // namespace cufile_integration
} // namespace kvikio_integration

namespace nvcomp_integration {

Expand Down
4 changes: 2 additions & 2 deletions cpp/include/cudf/io/data_sink.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2024, NVIDIA CORPORATION.
* Copyright (c) 2020-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -122,7 +122,7 @@ class data_sink {
*
* In the case where the sink type is itself a memory buffered write, this ends up
* being effectively a second memcpy. So a useful optimization for a "smart"
* custom data_sink is to do it's own internal management of the movement
* custom data_sink is to do its own internal management of the movement
* of data between cpu and gpu; turning the internals of the writer into simply
*
* sink->device_write(device_buffer, size)
Expand Down
36 changes: 8 additions & 28 deletions cpp/src/io/utilities/config_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,46 +24,26 @@

namespace cudf::io {

namespace cufile_integration {

namespace {
/**
* @brief Defines which cuFile usage to enable.
*/
enum class usage_policy : uint8_t { OFF, GDS, ALWAYS, KVIKIO };

/**
* @brief Get the current usage policy.
*/
usage_policy get_env_policy()
{
static auto const env_val = getenv_or<std::string>("LIBCUDF_CUFILE_POLICY", "KVIKIO");
if (env_val == "OFF") return usage_policy::OFF;
if (env_val == "GDS") return usage_policy::GDS;
if (env_val == "ALWAYS") return usage_policy::ALWAYS;
if (env_val == "KVIKIO") return usage_policy::KVIKIO;
CUDF_FAIL("Invalid LIBCUDF_CUFILE_POLICY value: " + env_val);
}
} // namespace

bool is_always_enabled() { return get_env_policy() == usage_policy::ALWAYS; }

bool is_gds_enabled() { return is_always_enabled() or get_env_policy() == usage_policy::GDS; }

bool is_kvikio_enabled() { return get_env_policy() == usage_policy::KVIKIO; }
namespace kvikio_integration {

void set_up_kvikio()
{
static std::once_flag flag{};
std::call_once(flag, [] {
// Workaround for https://github.com/rapidsai/cudf/issues/14140, where cuFileDriverOpen errors
// out if no CUDA calls have been made before it. This is a no-op if the CUDA context is already
// initialized.
cudaFree(nullptr);

auto const compat_mode = kvikio::getenv_or("KVIKIO_COMPAT_MODE", kvikio::CompatMode::ON);
kvikio::defaults::compat_mode_reset(compat_mode);

auto const nthreads = getenv_or<unsigned int>("KVIKIO_NTHREADS", 4u);
kvikio::defaults::thread_pool_nthreads_reset(nthreads);
});
}
} // namespace cufile_integration

} // namespace kvikio_integration

namespace nvcomp_integration {

Expand Down
62 changes: 20 additions & 42 deletions cpp/src/io/utilities/data_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
* limitations under the License.
*/

#include "file_io_utilities.hpp"

#include <cudf/io/config_utils.hpp>
#include <cudf/io/data_sink.hpp>
#include <cudf/logger.hpp>
Expand All @@ -25,8 +23,6 @@

#include <rmm/cuda_stream_view.hpp>

#include <fstream>

namespace cudf {
namespace io {

Expand All @@ -37,47 +33,36 @@ class file_sink : public data_sink {
public:
explicit file_sink(std::string const& filepath)
{
detail::force_init_cuda_context();
_output_stream.open(filepath, std::ios::out | std::ios::binary | std::ios::trunc);
if (!_output_stream.is_open()) { detail::throw_on_file_open_failure(filepath, true); }

if (cufile_integration::is_kvikio_enabled()) {
cufile_integration::set_up_kvikio();
_kvikio_file = kvikio::FileHandle(filepath, "w");
CUDF_LOG_INFO("Writing a file using kvikIO, with compatibility mode %s.",
_kvikio_file.is_compat_mode_preferred() ? "on" : "off");
} else {
_cufile_out = detail::make_cufile_output(filepath);
}
kvikio_integration::set_up_kvikio();
_kvikio_file = kvikio::FileHandle(filepath, "w");
CUDF_EXPECTS(!_kvikio_file.closed(), "KvikIO did not open the file successfully.");
CUDF_LOG_INFO("Writing a file using kvikIO, with compatibility mode %s.",
_kvikio_file.is_compat_mode_preferred() ? "on" : "off");
}

// Marked as NOLINT because we are calling a virtual method in the destructor
~file_sink() override { flush(); } // NOLINT

void host_write(void const* data, size_t size) override
{
_output_stream.seekp(_bytes_written);
_output_stream.write(static_cast<char const*>(data), size);
_kvikio_file.pwrite(data, size, _bytes_written).get();
_bytes_written += size;
}

void flush() override { _output_stream.flush(); }
void flush() override
{
// kvikio::FileHandle::pwrite() makes system calls that reach the kernel buffer cache. This
// process does not involve application buffer. Therefore calls to ::fflush() or
// ofstream::flush() do not apply.
}

size_t bytes_written() override { return _bytes_written; }

[[nodiscard]] bool supports_device_write() const override
{
return !_kvikio_file.closed() || _cufile_out != nullptr;
}
[[nodiscard]] bool supports_device_write() const override { return true; }

[[nodiscard]] bool is_device_write_preferred(size_t size) const override
{
if (!supports_device_write()) { return false; }

// Always prefer device writes if kvikio is enabled
if (!_kvikio_file.closed()) { return true; }

return size >= _gds_write_preferred_threshold;
return supports_device_write();
}

std::future<void> device_write_async(void const* gpu_data,
Expand All @@ -89,14 +74,11 @@ class file_sink : public data_sink {
size_t const offset = _bytes_written;
_bytes_written += size;

if (!_kvikio_file.closed()) {
// KvikIO's `pwrite()` returns a `std::future<size_t>` so we convert it
// to `std::future<void>`
return std::async(std::launch::deferred, [this, gpu_data, size, offset] {
_kvikio_file.pwrite(gpu_data, size, offset).get();
});
}
return _cufile_out->write_async(gpu_data, offset, size);
// KvikIO's `pwrite()` returns a `std::future<size_t>` so we convert it
// to `std::future<void>`
return std::async(std::launch::deferred, [this, gpu_data, size, offset]() -> void {
_kvikio_file.pwrite(gpu_data, size, offset).get();
});
}

void device_write(void const* gpu_data, size_t size, rmm::cuda_stream_view stream) override
Expand All @@ -105,12 +87,8 @@ class file_sink : public data_sink {
}

private:
std::ofstream _output_stream;
size_t _bytes_written = 0;
std::unique_ptr<detail::cufile_output_impl> _cufile_out;
kvikio::FileHandle _kvikio_file;
// The write size above which GDS is faster then d2h-copy + posix-write
static constexpr size_t _gds_write_preferred_threshold = 128 << 10; // 128KB
};

/**
Expand Down Expand Up @@ -162,7 +140,7 @@ class void_sink : public data_sink {
rmm::cuda_stream_view stream) override
{
_bytes_written += size;
return std::async(std::launch::deferred, [] {});
return std::async(std::launch::deferred, []() -> void {});
}

void flush() override {}
Expand Down
Loading

0 comments on commit 725f9eb

Please sign in to comment.