Skip to content

Commit

Permalink
[DAPHNE-daphne-eu#767] Initial HDFS support
Browse files Browse the repository at this point in the history
This commit adds initial support to read and write files from Hadoop Filesystems in distributed mode. Besides the read, write and distributed functionality, this also contains new configuration options and a new context object to manage the connection information to the distributed filesystem. Finally, this feature requires the installation of more external dependencies. The compilation is therefore optional and can be activated with the --hdfs flag to build.sh.

Closes daphne-eu#767

Co-authored-by: KostasBitsakos <[email protected]>
Co-authored-by: Mark Dokter <[email protected]>
  • Loading branch information
3 people committed Sep 18, 2024
1 parent 9c89049 commit d735331
Show file tree
Hide file tree
Showing 38 changed files with 3,460 additions and 652 deletions.
36 changes: 36 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,42 @@ if(USE_FPGAOPENCL)
add_definitions(-DUSE_FPGAOPENCL)
endif()


# HDFS library
# look through provided CMAKE_PREFIX_PATHs
option(USE_HDFS "Whether to activate compilation of HDSF support" OFF)
if(USE_HDFS)
foreach(path ${CMAKE_PREFIX_PATH})
if(NOT DEFINED HDFS_LIB_FOUND)
set(HDFS_LIB_LOCATION ${path}/hdfs)
if(EXISTS ${HDFS_LIB_LOCATION})
set(HDFS_LIB_FOUND TRUE)
else()
unset(HDFS_LIB_LOCATION)
unset(HDFS_LIB_FOUND)
endif()
endif()
endforeach(path)

# fallback if not using CMAKE_PREFIX_PATH (e.g., system/container install)
if(NOT DEFINED CMAKE_PREFIX_PATH OR NOT DEFINED HDFS_LIB_FOUND)
set(HDFS_LIB_LOCATION /usr/local/include/hdfs)
if(EXISTS ${HDFS_LIB_LOCATION})
set(HDFS_LIB_FOUND TRUE)
else()
unset(HDFS_LIB_LOCATION)
endif()
endif()

include_directories(${PROJECT_SOURCE_DIR} ${HDFS_LIB_LOCATION})
find_library(hdfs3 NAMES libhdfs3.so HINTS ${PROJECT_BINARY_DIR}/installed/lib REQUIRED)
message(STATUS "HDFS_LIB_FOUND: ${HDFS_LIB_FOUND}")
if(DEFINED HDFS_LIB_FOUND)
add_definitions(-DUSE_HDFS)
endif()
endif()


set(CMAKE_VERBOSE_MAKEFILE ON)

# *****************************************************************************
Expand Down
3 changes: 3 additions & 0 deletions UserConfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
"taskPartitioningScheme": "STATIC",
"numberOfThreads": -1,
"minimumTaskSize": 1,
"useHdfs": false,
"hdfsAddress": "",
"hdfsUsername": "",
"libdir": "{exedir}/../lib",
"daphnedsl_import_paths": {},
"force_cuda": false,
Expand Down
7 changes: 6 additions & 1 deletion src/api/cli/DaphneUserConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,12 @@ struct DaphneUserConfig {
size_t max_distributed_serialization_chunk_size = std::numeric_limits<int>::max() - 1024; // 2GB (-1KB to make up for gRPC headers etc.) - which is the maximum size allowed by gRPC / MPI. TODO: Investigate what might be the optimal.
int numberOfThreads = -1;
int minimumTaskSize = 1;


// hdfs
bool use_hdfs = false;
std::string hdfs_Address = "";
std::string hdfs_username = "";

// minimum considered log level (e.g., no logging below ERROR (essentially suppressing WARN, INFO, DEBUG and TRACE)
spdlog::level::level_enum log_level_limit = spdlog::level::err;
std::vector<LogConfig> loggers;
Expand Down
40 changes: 39 additions & 1 deletion src/api/internal/daphne_internal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ int startDAPHNE(int argc, const char** argv, DaphneLibResult* daphneLibRes, int
static OptionCategory daphneOptions("DAPHNE Options");
static OptionCategory schedulingOptions("Advanced Scheduling Knobs");
static OptionCategory distributedBackEndSetupOptions("Distributed Backend Knobs");
static OptionCategory HDFSOptions("HDFS Knobs");


// Options ----------------------------------------------------------------
Expand All @@ -154,6 +155,22 @@ int startDAPHNE(int argc, const char** argv, DaphneLibResult* daphneLibRes, int
init(std::numeric_limits<int>::max() - 1024)
);

// HDFS knobs
static opt<bool> use_hdfs(
"enable-hdfs", cat(HDFSOptions),
desc("Enable HDFS filesystem")
);
static opt<string> hdfs_Address(
"hdfs-ip", cat(HDFSOptions),
desc("IP of the HDFS filesystem (including port)."),
init("")
);
static opt<string> hdfs_username(
"hdfs-username", cat(HDFSOptions),
desc("Username of the HDFS filesystem."),
init("")
);


// Scheduling options

Expand Down Expand Up @@ -404,6 +421,7 @@ int startDAPHNE(int argc, const char** argv, DaphneLibResult* daphneLibRes, int
visibleCategories.push_back(&daphneOptions);
visibleCategories.push_back(&schedulingOptions);
visibleCategories.push_back(&distributedBackEndSetupOptions);
visibleCategories.push_back(&HDFSOptions);

HideUnrelatedOptions(visibleCategories);

Expand Down Expand Up @@ -484,7 +502,27 @@ int startDAPHNE(int argc, const char** argv, DaphneLibResult* daphneLibRes, int
if(user_config.distributedBackEndSetup!=ALLOCATION_TYPE::DIST_MPI && user_config.distributedBackEndSetup!=ALLOCATION_TYPE::DIST_GRPC_SYNC && user_config.distributedBackEndSetup!=ALLOCATION_TYPE::DIST_GRPC_ASYNC)
spdlog::warn("No backend has been selected. Wiil use the default 'MPI'");
}
user_config.max_distributed_serialization_chunk_size = maxDistrChunkSize;
user_config.max_distributed_serialization_chunk_size = maxDistrChunkSize;

// only overwrite with non-defaults
if (use_hdfs) {
user_config.use_hdfs = use_hdfs;
}
if (hdfs_Address != "") {
user_config.hdfs_Address = hdfs_Address;
}
if (hdfs_username != "") {
user_config.hdfs_username = hdfs_username;
}
if (user_config.use_hdfs && (user_config.hdfs_Address == "" || user_config.hdfs_username == "")){
spdlog::warn("HDFS is enabled, but the HDFS IP address or username were not provided.");
}
#ifndef USE_HDFS
if (user_config.use_hdfs){
throw std::runtime_error("you are trying to use HDFS, but Daphne was not build with --hdfs option\n");
}
#endif

for (auto explain : explainArgList) {
switch (explain) {
case kernels:
Expand Down
5 changes: 5 additions & 0 deletions src/compiler/lowering/InsertDaphneContextPass.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ void InsertDaphneContextPass::runOnOperation()
if (user_config.use_distributed){
builder.create<daphne::CreateDistributedContextOp>(loc);
}
#ifdef USE_HDFS
if(user_config.use_hdfs) {
builder.create<daphne::CreateHDFSContextOp>(loc);
}
#endif
#ifdef USE_FPGAOPENCL
if(user_config.use_fpgaopencl) {
builder.create<daphne::CreateFPGAContextOp>(loc);
Expand Down
5 changes: 5 additions & 0 deletions src/ir/daphneir/DaphneOps.td
Original file line number Diff line number Diff line change
Expand Up @@ -1537,6 +1537,11 @@ def Daphne_CreateDistributedContextOp : Daphne_Op<"createDistributedContext", []
let results = (outs);
}

def Daphne_CreateHDFSContextOp : Daphne_Op<"createHDFSContext", []> {
let arguments = (ins);
let results = (outs);
}

def Daphne_CreateFPGAContextOp : Daphne_Op<"createFPGAContext", [FPGAOPENCLSupport]> {
let arguments = (ins);
let results = (outs);
Expand Down
6 changes: 6 additions & 0 deletions src/parser/config/ConfigParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,12 @@ void ConfigParser::readUserConfig(const std::string& filename, DaphneUserConfig&
config.numberOfThreads = jf.at(DaphneConfigJsonParams::NUMBER_OF_THREADS).get<int>();
if (keyExists(jf, DaphneConfigJsonParams::MINIMUM_TASK_SIZE))
config.minimumTaskSize = jf.at(DaphneConfigJsonParams::MINIMUM_TASK_SIZE).get<int>();
if (keyExists(jf, DaphneConfigJsonParams::USE_HDFS_))
config.use_hdfs = jf.at(DaphneConfigJsonParams::USE_HDFS_).get<bool>();
if (keyExists(jf, DaphneConfigJsonParams::HDFS_ADDRESS))
config.hdfs_Address = jf.at(DaphneConfigJsonParams::HDFS_ADDRESS).get<std::string>();
if (keyExists(jf, DaphneConfigJsonParams::HDFS_USERNAME))
config.hdfs_username = jf.at(DaphneConfigJsonParams::HDFS_USERNAME).get<std::string>();
#ifdef USE_CUDA
if (keyExists(jf, DaphneConfigJsonParams::CUDA_DEVICES))
config.cuda_devices = jf.at(DaphneConfigJsonParams::CUDA_DEVICES).get<std::vector<int>>();
Expand Down
6 changes: 6 additions & 0 deletions src/parser/config/JsonParams.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ struct DaphneConfigJsonParams {
inline static const std::string TASK_PARTITIONING_SCHEME = "taskPartitioningScheme";
inline static const std::string NUMBER_OF_THREADS = "numberOfThreads";
inline static const std::string MINIMUM_TASK_SIZE = "minimumTaskSize";
inline static const std::string USE_HDFS_ = "useHdfs";
inline static const std::string HDFS_ADDRESS = "hdfsAddress";
inline static const std::string HDFS_USERNAME = "hdfsUsername";
inline static const std::string CUDA_DEVICES = "cuda_devices";
inline static const std::string LIB_DIR = "libdir";
inline static const std::string DAPHNEDSL_IMPORT_PATHS = "daphnedsl_import_paths";
Expand Down Expand Up @@ -97,6 +100,9 @@ struct DaphneConfigJsonParams {
TASK_PARTITIONING_SCHEME,
NUMBER_OF_THREADS,
MINIMUM_TASK_SIZE,
USE_HDFS_,
HDFS_ADDRESS,
HDFS_USERNAME,
CUDA_DEVICES,
LIB_DIR,
DAPHNEDSL_IMPORT_PATHS,
Expand Down
5 changes: 5 additions & 0 deletions src/parser/metadata/JsonKeys.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ struct JsonKeys {

// optional key
inline static const std::string NUM_NON_ZEROS = "numNonZeros"; // int (default: -1)
inline static const std::string HDFS = "hdfs"; // json
struct HDFSKeys {
inline static const std::string isHDFS = "isHDFS";
inline static const std::string HDFSFilename = "HDFSFilename";
};
};

#endif
86 changes: 55 additions & 31 deletions src/parser/metadata/MetaDataParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,20 @@
#include <parser/metadata/JsonKeys.h>

#include <fstream>
#include <iostream>
#include <filesystem>

FileMetaData MetaDataParser::readMetaData(const std::string& filename_) {
std::string metaFilename = filename_ + ".meta";
std::ifstream ifs(metaFilename, std::ios::in);
if (!ifs.good())
throw std::runtime_error("Could not open file '" + metaFilename + "' for reading meta data.");

nlohmann::json jf = nlohmann::json::parse(ifs);
std::stringstream buffer;
buffer << ifs.rdbuf();
return MetaDataParser::readMetaDataFromString(buffer.str());
}
FileMetaData MetaDataParser::readMetaDataFromString(const std::string& str) {
nlohmann::json jf = nlohmann::json::parse(str);

if (!keyExists(jf, JsonKeys::NUM_ROWS) || !keyExists(jf, JsonKeys::NUM_COLS)) {
throw std::invalid_argument("A meta data JSON file should always contain \"" + JsonKeys::NUM_ROWS + "\" and \""
Expand All @@ -34,14 +40,21 @@ FileMetaData MetaDataParser::readMetaData(const std::string& filename_) {

const size_t numRows = jf.at(JsonKeys::NUM_ROWS).get<size_t>();
const size_t numCols = jf.at(JsonKeys::NUM_COLS).get<size_t>();
const bool isHDFS = (keyExists(jf, JsonKeys::HDFS));
const bool isSingleValueType = !(keyExists(jf, JsonKeys::SCHEMA));
const ssize_t numNonZeros = (keyExists(jf, JsonKeys::NUM_NON_ZEROS)) ? jf.at(JsonKeys::NUM_NON_ZEROS).get<ssize_t>()
: -1;

HDFSMetaData hdfs;
if (isHDFS){
// TODO check if key exist and throw errors if not
hdfs.isHDFS = jf.at(JsonKeys::HDFS)["isHDFS"];;
hdfs.HDFSFilename = jf.at(JsonKeys::HDFS)["HDFSFilename"];
}
if (isSingleValueType) {
if (keyExists(jf, JsonKeys::VALUE_TYPE)) {
ValueTypeCode vtc = jf.at(JsonKeys::VALUE_TYPE).get<ValueTypeCode>();
return {numRows, numCols, isSingleValueType, vtc, numNonZeros};
return {numRows, numCols, isSingleValueType, vtc, numNonZeros, hdfs};
}
else {
throw std::invalid_argument("A (matrix) meta data JSON file should contain the \"" + JsonKeys::VALUE_TYPE
Expand All @@ -68,7 +81,7 @@ FileMetaData MetaDataParser::readMetaData(const std::string& filename_) {
schema.emplace_back(vtc);
labels.emplace_back(column.getLabel());
}
return {numRows, numCols, isSingleValueType, schema, labels, numNonZeros};
return {numRows, numCols, isSingleValueType, schema, labels, numNonZeros, hdfs};
}
else {
throw std::invalid_argument("A (frame) meta data JSON file should contain the \"" + JsonKeys::SCHEMA
Expand All @@ -77,39 +90,50 @@ FileMetaData MetaDataParser::readMetaData(const std::string& filename_) {
}
}

std::string MetaDataParser::writeMetaDataToString(const FileMetaData& metaData) {
nlohmann::json json;

json[JsonKeys::NUM_ROWS] = metaData.numRows;
json[JsonKeys::NUM_COLS] = metaData.numCols;

if (metaData.isSingleValueType) {
if (metaData.schema.size() != 1)
throw std::runtime_error("inappropriate meta data tried to be written to file");
json[JsonKeys::VALUE_TYPE] = metaData.schema[0];
}
else {
std::vector<SchemaColumn> schemaColumns;
// assume that the schema and labels are the same lengths
for (unsigned int i = 0; i < metaData.schema.size(); i++) {
SchemaColumn schemaColumn;
schemaColumn.setLabel(metaData.labels[i]);
schemaColumn.setValueType(metaData.schema[i]);
schemaColumns.emplace_back(schemaColumn);
}
json[JsonKeys::SCHEMA] = schemaColumns;
}

if (metaData.numNonZeros != -1)
json[JsonKeys::NUM_NON_ZEROS] = metaData.numNonZeros;

// HDFS
if (metaData.hdfs.isHDFS){
json[JsonKeys::HDFS][JsonKeys::HDFSKeys::isHDFS] = metaData.hdfs.isHDFS;
std::filesystem::path filePath(metaData.hdfs.HDFSFilename);
auto baseFileName = filePath.filename().string();

json[JsonKeys::HDFS][JsonKeys::HDFSKeys::HDFSFilename] = "/" + baseFileName;
}
return json.dump();
}
void MetaDataParser::writeMetaData(const std::string& filename_, const FileMetaData& metaData) {
std::string metaFilename = filename_ + ".meta";
std::string metaFilename = filename_ + ".meta";
std::ofstream ofs(metaFilename, std::ios::out);
if (!ofs.good())
throw std::runtime_error("could not open file '" + metaFilename + "' for writing meta data");

if(ofs.is_open()) {
nlohmann::json json;

json[JsonKeys::NUM_ROWS] = metaData.numRows;
json[JsonKeys::NUM_COLS] = metaData.numCols;

if (metaData.isSingleValueType) {
if (metaData.schema.size() != 1)
throw std::runtime_error("inappropriate meta data tried to be written to file");
json[JsonKeys::VALUE_TYPE] = metaData.schema[0];
}
else {
std::vector<SchemaColumn> schemaColumns;
// assume that the schema and labels are the same lengths
for (unsigned int i = 0; i < metaData.schema.size(); i++) {
SchemaColumn schemaColumn;
schemaColumn.setLabel(metaData.labels[i]);
schemaColumn.setValueType(metaData.schema[i]);
schemaColumns.emplace_back(schemaColumn);
}
json[JsonKeys::SCHEMA] = schemaColumns;
}

if (metaData.numNonZeros != -1)
json[JsonKeys::NUM_NON_ZEROS] = metaData.numNonZeros;

ofs << json.dump();
ofs << MetaDataParser::writeMetaDataToString(metaData);
}
else
throw std::runtime_error("could not open file '" + metaFilename + "' for writing meta data");
Expand Down
3 changes: 2 additions & 1 deletion src/parser/metadata/MetaDataParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class MetaDataParser {
* keys or if the file doesn't contain all the metadata.
*/
static FileMetaData readMetaData(const std::string& filename);

static FileMetaData readMetaDataFromString(const std::string& str);
/**
* @brief Saves the file meta data to the specified file.
*
Expand All @@ -76,6 +76,7 @@ class MetaDataParser {
* @throws std::runtime_error Thrown if the specified file could not be openn.
*/
static void writeMetaData(const std::string& filename, const FileMetaData& metaData);
static std::string writeMetaDataToString(const FileMetaData& metaData);

private:
/**
Expand Down
Loading

0 comments on commit d735331

Please sign in to comment.