diff --git a/hermes_adapters/posix/posix_fs_api.h b/hermes_adapters/posix/posix_fs_api.h index 1aa5f473d..77ed16c94 100644 --- a/hermes_adapters/posix/posix_fs_api.h +++ b/hermes_adapters/posix/posix_fs_api.h @@ -98,8 +98,7 @@ class PosixFs : public hermes::adapter::fs::Filesystem { std::vector filename(kMaxPathLen); snprintf(proclnk.data(), kMaxPathLen - 1, "/proc/self/fd/%d", fd); size_t r = readlink(proclnk.data(), filename.data(), kMaxPathLen); - filename[r] = '\0'; - return std::string(filename.data(), filename.size()); + return std::string(filename.data(), r); } }; diff --git a/tasks/data_stager/include/data_stager/factory/binary_stager.h b/tasks/data_stager/include/data_stager/factory/binary_stager.h index bd7bc6cc2..8ddae035f 100644 --- a/tasks/data_stager/include/data_stager/factory/binary_stager.h +++ b/tasks/data_stager/include/data_stager/factory/binary_stager.h @@ -12,7 +12,7 @@ namespace hermes::data_stager { class BinaryFileStager : public AbstractStager { public: - int fd_; + int fd_ = -1; size_t page_size_; std::string path_; @@ -21,9 +21,7 @@ class BinaryFileStager : public AbstractStager { BinaryFileStager() = default; /** Destructor */ - ~BinaryFileStager() { - HERMES_POSIX_API->close(fd_); - } + ~BinaryFileStager() {} /** Build file url */ static hshm::charbuf BuildFileUrl(const std::string &path, size_t page_size) { @@ -48,10 +46,6 @@ class BinaryFileStager : public AbstractStager { /** Create the data stager payload */ void RegisterStager(RegisterStagerTask *task, RunContext &rctx) override { ParseFileUrl(task->url_->str(), path_, page_size_); - fd_ = HERMES_POSIX_API->open(path_.c_str(), O_RDWR); - if (fd_ < 0) { - HELOG(kError, "Failed to open file {}", path_); - } } /** Stage data in from remote source */ @@ -61,6 +55,11 @@ class BinaryFileStager : public AbstractStager { HILOG(kDebug, "Attempting to stage {} bytes from the backend file {} at offset {}", page_size_, url_, plcmnt.bucket_off_); LPointer blob = HRUN_CLIENT->AllocateBuffer(page_size_); + fd_ = HERMES_POSIX_API->open(path_.c_str(), O_CREAT | O_RDWR, 0666); + if (fd_ < 0) { + HELOG(kError, "Failed to open file {}", path_); + return; + } ssize_t real_size = HERMES_POSIX_API->pread(fd_, blob.ptr_, page_size_, @@ -72,7 +71,6 @@ class BinaryFileStager : public AbstractStager { } else if (real_size == 0) { return; } - // memcpy(blob.ptr_ + plcmnt.blob_off_, blob.ptr_, real_size); HILOG(kDebug, "Staged {} bytes from the backend file {}", real_size, url_); HILOG(kDebug, "Submitting put blob {} ({}) to blob mdm ({})", @@ -97,10 +95,16 @@ class BinaryFileStager : public AbstractStager { HILOG(kDebug, "Attempting to stage {} bytes to the backend file {} at offset {}", page_size_, url_, plcmnt.bucket_off_); char *data = HRUN_CLIENT->GetDataPointer(task->data_); + fd_ = HERMES_POSIX_API->open(path_.c_str(), O_CREAT | O_RDWR, 0666); + if (fd_ < 0) { + HELOG(kError, "Failed to open file {}", path_); + return; + } ssize_t real_size = HERMES_POSIX_API->pwrite(fd_, data, task->data_size_, (off_t)plcmnt.bucket_off_); + HERMES_POSIX_API->close(fd_); if (real_size < 0) { HELOG(kError, "Failed to stage out {} bytes from {}", task->data_size_, url_); diff --git a/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc b/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc index ba9ee34a4..e3d486f91 100644 --- a/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc +++ b/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc @@ -90,6 +90,10 @@ class Server : public TaskLib { client.AsyncCreateComplete(tgt_task); target_map_.emplace(client.id_, &client); } + std::sort(targets_.begin(), targets_.end(), + [](const bdev::Client &a, const bdev::Client &b) { + return a.bandwidth_ > b.bandwidth_; + }); blob_mdm_.Init(id_); HILOG(kInfo, "(node {}) Created Blob MDM", HRUN_CLIENT->node_id_); task->SetModuleComplete(); diff --git a/test/unit/hermes_adapters/filesystem_tests.h b/test/unit/hermes_adapters/filesystem_tests.h index cef1104ea..daffba071 100644 --- a/test/unit/hermes_adapters/filesystem_tests.h +++ b/test/unit/hermes_adapters/filesystem_tests.h @@ -193,6 +193,7 @@ class FilesystemTests { /* Delete the files from both Hermes and the backend. */ TrackAllFiles(); RemoveAllFiles(); + Flush(); } virtual void RegisterFiles() = 0; diff --git a/test/unit/hermes_adapters/stdio/stdio_adapter_basic_test.cc b/test/unit/hermes_adapters/stdio/stdio_adapter_basic_test.cc index 5f693c5a1..c26a2dab7 100644 --- a/test/unit/hermes_adapters/stdio/stdio_adapter_basic_test.cc +++ b/test/unit/hermes_adapters/stdio/stdio_adapter_basic_test.cc @@ -81,10 +81,11 @@ TEST_CASE("SingleWrite", "[process=" + std::to_string(TEST_INFO->comm_size_) + REQUIRE(TEST_INFO->status_orig_ == 0); } - SECTION("write to new file") { + SECTION("write to new file") { TEST_INFO->test_fopen(TEST_INFO->new_file_, "w+"); REQUIRE(TEST_INFO->fh_orig_ != nullptr); - TEST_INFO->test_fwrite(TEST_INFO->write_data_.data(), TEST_INFO->request_size_); + TEST_INFO->test_fwrite(TEST_INFO->write_data_.data(), + TEST_INFO->request_size_); REQUIRE(TEST_INFO->size_written_orig_ == TEST_INFO->request_size_); TEST_INFO->test_fclose(); REQUIRE(TEST_INFO->status_orig_ == 0);