Skip to content

Commit

Permalink
Merge pull request #644 from lukemartinlogan/dev
Browse files Browse the repository at this point in the history
Sort targets by bandwidth for now.
  • Loading branch information
lukemartinlogan authored Nov 5, 2023
2 parents f950fc5 + ac64c10 commit 5895b2a
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 13 deletions.
3 changes: 1 addition & 2 deletions hermes_adapters/posix/posix_fs_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,7 @@ class PosixFs : public hermes::adapter::fs::Filesystem {
std::vector<char> filename(kMaxPathLen);
snprintf(proclnk.data(), kMaxPathLen - 1, "/proc/self/fd/%d", fd);
size_t r = readlink(proclnk.data(), filename.data(), kMaxPathLen);
filename[r] = '\0';
return std::string(filename.data(), filename.size());
return std::string(filename.data(), r);
}
};

Expand Down
22 changes: 13 additions & 9 deletions tasks/data_stager/include/data_stager/factory/binary_stager.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace hermes::data_stager {

class BinaryFileStager : public AbstractStager {
public:
int fd_;
int fd_ = -1;
size_t page_size_;
std::string path_;

Expand All @@ -21,9 +21,7 @@ class BinaryFileStager : public AbstractStager {
BinaryFileStager() = default;

/** Destructor */
~BinaryFileStager() {
HERMES_POSIX_API->close(fd_);
}
~BinaryFileStager() {}

/** Build file url */
static hshm::charbuf BuildFileUrl(const std::string &path, size_t page_size) {
Expand All @@ -48,10 +46,6 @@ class BinaryFileStager : public AbstractStager {
/** Create the data stager payload */
void RegisterStager(RegisterStagerTask *task, RunContext &rctx) override {
ParseFileUrl(task->url_->str(), path_, page_size_);
fd_ = HERMES_POSIX_API->open(path_.c_str(), O_RDWR);
if (fd_ < 0) {
HELOG(kError, "Failed to open file {}", path_);
}
}

/** Stage data in from remote source */
Expand All @@ -61,6 +55,11 @@ class BinaryFileStager : public AbstractStager {
HILOG(kDebug, "Attempting to stage {} bytes from the backend file {} at offset {}",
page_size_, url_, plcmnt.bucket_off_);
LPointer<char> blob = HRUN_CLIENT->AllocateBuffer<TASK_YIELD_STD>(page_size_);
fd_ = HERMES_POSIX_API->open(path_.c_str(), O_CREAT | O_RDWR, 0666);
if (fd_ < 0) {
HELOG(kError, "Failed to open file {}", path_);
return;
}
ssize_t real_size = HERMES_POSIX_API->pread(fd_,
blob.ptr_,
page_size_,
Expand All @@ -72,7 +71,6 @@ class BinaryFileStager : public AbstractStager {
} else if (real_size == 0) {
return;
}
// memcpy(blob.ptr_ + plcmnt.blob_off_, blob.ptr_, real_size);
HILOG(kDebug, "Staged {} bytes from the backend file {}",
real_size, url_);
HILOG(kDebug, "Submitting put blob {} ({}) to blob mdm ({})",
Expand All @@ -97,10 +95,16 @@ class BinaryFileStager : public AbstractStager {
HILOG(kDebug, "Attempting to stage {} bytes to the backend file {} at offset {}",
page_size_, url_, plcmnt.bucket_off_);
char *data = HRUN_CLIENT->GetDataPointer(task->data_);
fd_ = HERMES_POSIX_API->open(path_.c_str(), O_CREAT | O_RDWR, 0666);
if (fd_ < 0) {
HELOG(kError, "Failed to open file {}", path_);
return;
}
ssize_t real_size = HERMES_POSIX_API->pwrite(fd_,
data,
task->data_size_,
(off_t)plcmnt.bucket_off_);
HERMES_POSIX_API->close(fd_);
if (real_size < 0) {
HELOG(kError, "Failed to stage out {} bytes from {}",
task->data_size_, url_);
Expand Down
4 changes: 4 additions & 0 deletions tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ class Server : public TaskLib {
client.AsyncCreateComplete(tgt_task);
target_map_.emplace(client.id_, &client);
}
std::sort(targets_.begin(), targets_.end(),
[](const bdev::Client &a, const bdev::Client &b) {
return a.bandwidth_ > b.bandwidth_;
});
blob_mdm_.Init(id_);
HILOG(kInfo, "(node {}) Created Blob MDM", HRUN_CLIENT->node_id_);
task->SetModuleComplete();
Expand Down
1 change: 1 addition & 0 deletions test/unit/hermes_adapters/filesystem_tests.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ class FilesystemTests {
/* Delete the files from both Hermes and the backend. */
TrackAllFiles();
RemoveAllFiles();
Flush();
}

virtual void RegisterFiles() = 0;
Expand Down
5 changes: 3 additions & 2 deletions test/unit/hermes_adapters/stdio/stdio_adapter_basic_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,11 @@ TEST_CASE("SingleWrite", "[process=" + std::to_string(TEST_INFO->comm_size_) +
REQUIRE(TEST_INFO->status_orig_ == 0);
}

SECTION("write to new file") {
SECTION("write to new file") {
TEST_INFO->test_fopen(TEST_INFO->new_file_, "w+");
REQUIRE(TEST_INFO->fh_orig_ != nullptr);
TEST_INFO->test_fwrite(TEST_INFO->write_data_.data(), TEST_INFO->request_size_);
TEST_INFO->test_fwrite(TEST_INFO->write_data_.data(),
TEST_INFO->request_size_);
REQUIRE(TEST_INFO->size_written_orig_ == TEST_INFO->request_size_);
TEST_INFO->test_fclose();
REQUIRE(TEST_INFO->status_orig_ == 0);
Expand Down

0 comments on commit 5895b2a

Please sign in to comment.