diff --git a/CMake/HermesConfig.cmake b/CMake/HermesConfig.cmake index 65a66bc8f..98af6de8c 100644 --- a/CMake/HermesConfig.cmake +++ b/CMake/HermesConfig.cmake @@ -10,99 +10,91 @@ find_path( Hermes_INCLUDE_DIR hermes/hermes_types.h + HINTS ENV PATH ENV CPATH ) -message("Hermes_INCLUDE_DIR: ${Hermes_INCLUDE_DIR}") - -if( Hermes_INCLUDE_DIR ) - get_filename_component(Hermes_DIR ${Hermes_INCLUDE_DIR} PATH) - - #----------------------------------------------------------------------------- - # Find all packages needed by Hermes - #----------------------------------------------------------------------------- - find_library( - Hermes_LIBRARY - NAMES hrun_client hrun_server - ) - - message("Hermes_LIBRARY: ${Hermes_LIBRARY}") +if (NOT Hermes_INCLUDE_DIR) + message(STATUS "FindHermes: Could not find Hermes.h") + set(Hermes_FOUND FALSE) + return() +endif() +get_filename_component(Hermes_DIR ${Hermes_INCLUDE_DIR} PATH) - # HermesShm - find_package(HermesShm CONFIG REQUIRED) - message(STATUS "found hermes_shm.h at ${HermesShm_INCLUDE_DIRS}") +#----------------------------------------------------------------------------- +# Find all packages needed by Hermes +#----------------------------------------------------------------------------- +find_library( + Hermes_LIBRARY + NAMES hrun_client hrun_server + HINTS ENV LD_LIBRARY_PATH ENV PATH +) +if (NOT Hermes_LIBRARY) + message(STATUS "FindHermes: Could not find libhrun_client.so") + set(Hermes_FOUND FALSE) + message(STATUS "LIBS: $ENV{LD_LIBRARY_PATH}") + return() +endif() - # YAML-CPP - find_package(yaml-cpp REQUIRED) - message(STATUS "found yaml-cpp at ${yaml-cpp_DIR}") +# HermesShm +find_package(HermesShm CONFIG REQUIRED) +message(STATUS "found hermes_shm.h at ${HermesShm_INCLUDE_DIRS}") - # Catch2 - find_package(Catch2 3.0.1 REQUIRED) - message(STATUS "found catch2.h at ${Catch2_CXX_INCLUDE_DIRS}") +# YAML-CPP +find_package(yaml-cpp REQUIRED) +message(STATUS "found yaml-cpp at ${yaml-cpp_DIR}") - # MPICH - if(BUILD_MPI_TESTS) - find_package(MPI REQUIRED COMPONENTS C CXX) - message(STATUS "found mpi.h at ${MPI_CXX_INCLUDE_DIRS}") - endif() +# Catch2 +find_package(Catch2 3.0.1 REQUIRED) +message(STATUS "found catch2.h at ${Catch2_CXX_INCLUDE_DIRS}") - # OpenMP - if(BUILD_OpenMP_TESTS) - find_package(OpenMP REQUIRED COMPONENTS C CXX) - message(STATUS "found omp.h at ${OpenMP_CXX_INCLUDE_DIRS}") - endif() +# MPICH +if(BUILD_MPI_TESTS) + find_package(MPI REQUIRED COMPONENTS C CXX) + message(STATUS "found mpi.h at ${MPI_CXX_INCLUDE_DIRS}") +endif() - # Cereal - find_package(cereal REQUIRED) - if(cereal) - message(STATUS "found cereal") - endif() +# OpenMP +if(BUILD_OpenMP_TESTS) + find_package(OpenMP REQUIRED COMPONENTS C CXX) + message(STATUS "found omp.h at ${OpenMP_CXX_INCLUDE_DIRS}") +endif() - # Boost - find_package(Boost REQUIRED COMPONENTS regex system filesystem fiber REQUIRED) - if (Boost_FOUND) - message(STATUS "found boost at ${Boost_INCLUDE_DIRS}") - endif() +# Cereal +find_package(cereal REQUIRED) +if(cereal) + message(STATUS "found cereal") +endif() - # Thallium - find_package(thallium CONFIG REQUIRED) - if(thallium_FOUND) - message(STATUS "found thallium at ${thallium_DIR}") - endif() +# Boost +find_package(Boost REQUIRED COMPONENTS regex system filesystem fiber REQUIRED) +if (Boost_FOUND) + message(STATUS "found boost at ${Boost_INCLUDE_DIRS}") +endif() - #----------------------------------------------------------------------------- - # Mark hermes as found and set all needed packages - #----------------------------------------------------------------------------- - if( Hermes_LIBRARY ) - set(Hermes_LIBRARY_DIR "") - get_filename_component(Hermes_LIBRARY_DIRS ${Hermes_LIBRARY} PATH) - # Set uncached variables as per standard. - set(Hermes_FOUND ON) - set(Hermes_INCLUDE_DIRS ${Boost_INCLUDE_DIRS} ${Hermes_INCLUDE_DIR}) - set(Hermes_LIBRARIES - ${HermesShm_LIBRARIES} - yaml-cpp - cereal::cereal - -ldl -lrt -lc -pthread - thallium - hermes - ${Boost_LIBRARIES} ${Hermes_LIBRARY}) - set(Hermes_CLIENT_LIBRARIES ${Hermes_LIBRARIES}) - set(Hermes_RUNTIME_LIBRARIES - ${Hermes_CLIENT_LIBRARIES} - hrun_runtime - ${Boost_LIBRARIES}) - set(Hermes_RUNTIME_DEPS "") - endif(Hermes_LIBRARY) +# Thallium +find_package(thallium CONFIG REQUIRED) +if(thallium_FOUND) + message(STATUS "found thallium at ${thallium_DIR}") +endif() -else(Hermes_INCLUDE_DIR) - message(STATUS "FindHermes: Could not find Hermes.h") -endif(Hermes_INCLUDE_DIR) - -if(Hermes_FOUND) - if(NOT Hermes_FIND_QUIETLY) - message(STATUS "FindHermes: Found both Hermes.h and libhrun_client.so") - endif(NOT Hermes_FIND_QUIETLY) -else(Hermes_FOUND) - if(Hermes_FIND_REQUIRED) - message(STATUS "FindHermes: Could not find Hermes.h and/or libhrun_client.so") - endif(Hermes_FIND_REQUIRED) -endif(Hermes_FOUND) +#----------------------------------------------------------------------------- +# Mark hermes as found and set all needed packages +#----------------------------------------------------------------------------- +set(Hermes_LIBRARY_DIR "") +get_filename_component(Hermes_LIBRARY_DIRS ${Hermes_LIBRARY} PATH) +# Set uncached variables as per standard. +set(Hermes_FOUND ON) +set(Hermes_INCLUDE_DIRS ${Boost_INCLUDE_DIRS} ${Hermes_INCLUDE_DIR}) +set(Hermes_LIBRARIES + ${HermesShm_LIBRARIES} + yaml-cpp + cereal::cereal + -ldl -lrt -lc -pthread + thallium + hermes + ${Boost_LIBRARIES} ${Hermes_LIBRARY}) +set(Hermes_CLIENT_LIBRARIES ${Hermes_LIBRARIES}) +set(Hermes_RUNTIME_LIBRARIES + ${Hermes_CLIENT_LIBRARIES} + hrun_runtime + ${Boost_LIBRARIES}) +set(Hermes_RUNTIME_DEPS "") diff --git a/hrun/tasks_required/TASK_NAME/include/TASK_NAME/TASK_NAME.h b/hrun/tasks_required/TASK_NAME/include/TASK_NAME/TASK_NAME.h index 0db31c06e..688839756 100644 --- a/hrun/tasks_required/TASK_NAME/include/TASK_NAME/TASK_NAME.h +++ b/hrun/tasks_required/TASK_NAME/include/TASK_NAME/TASK_NAME.h @@ -45,7 +45,7 @@ class Client : public TaskLibClient { LPointer task = AsyncCreateRoot(std::forward(args)...); task->Wait(); - Init(id_, HRUN_ADMIN->queue_id_); + Init(task->id_, HRUN_ADMIN->queue_id_); HRUN_CLIENT->DelTask(task); } diff --git a/include/hermes/bucket.h b/include/hermes/bucket.h index 2cf1dcc7d..5f4b7fa6c 100644 --- a/include/hermes/bucket.h +++ b/include/hermes/bucket.h @@ -389,18 +389,11 @@ class Bucket { /** * Reorganize a blob to a new score or node * */ - void ReorganizeBlob(const BlobId &blob_id, - float score) { - blob_mdm_->AsyncReorganizeBlobRoot(id_, blob_id, score, 0, true); - } - - /** - * Reorganize a blob to a new score or node - * */ - void ReorganizeBlob(const BlobId &blob_id, + void ReorganizeBlob(const std::string &name, float score, - Context &ctx) { - blob_mdm_->AsyncReorganizeBlobRoot(id_, blob_id, score, 0, true); + const Context &ctx = Context()) { + blob_mdm_->AsyncReorganizeBlobRoot( + id_, hshm::charbuf(name), BlobId::GetNull(), score, true, ctx); } /** @@ -408,9 +401,9 @@ class Bucket { * */ void ReorganizeBlob(const BlobId &blob_id, float score, - u32 node_id, - Context &ctx) { - blob_mdm_->AsyncReorganizeBlobRoot(id_, blob_id, score, node_id, true); + const Context &ctx = Context()) { + blob_mdm_->AsyncReorganizeBlobRoot( + id_, hshm::charbuf(""), blob_id, score, true, ctx); } /** @@ -494,6 +487,9 @@ class Bucket { Context &ctx) { Blob blob; BlobId blob_id = BaseGet(blob_name, orig_blob_id, blob, 0, ctx); + if (blob.size() == 0) { + return BlobId::GetNull(); + } std::stringstream ss(std::string(blob.data(), blob.size())); cereal::BinaryInputArchive ar(ss); ar >> data; diff --git a/include/hermes/config_manager.h b/include/hermes/config_manager.h index a2d67cea5..525bd26a2 100644 --- a/include/hermes/config_manager.h +++ b/include/hermes/config_manager.h @@ -53,7 +53,9 @@ class ConfigurationManager { op_mdm_.CreateRoot(DomainId::GetGlobal(), "hermes_op_mdm", bkt_mdm_.id_, blob_mdm_.id_); stager_mdm_.CreateRoot(DomainId::GetGlobal(), - "hermes_stager_mdm", blob_mdm_.id_); + "hermes_stager_mdm", + blob_mdm_.id_, + bkt_mdm_.id_); blob_mdm_.SetBucketMdmRoot(DomainId::GetGlobal(), bkt_mdm_.id_, stager_mdm_.id_, op_mdm_.id_); diff --git a/tasks/data_stager/include/data_stager/data_stager.h b/tasks/data_stager/include/data_stager/data_stager.h index ec524856c..cd50fde1b 100644 --- a/tasks/data_stager/include/data_stager/data_stager.h +++ b/tasks/data_stager/include/data_stager/data_stager.h @@ -23,12 +23,13 @@ class Client : public TaskLibClient { LPointer AsyncCreate(const TaskNode &task_node, const DomainId &domain_id, const std::string &state_name, - const TaskStateId &blob_mdm) { + const TaskStateId &blob_mdm, + const TaskStateId &bkt_mdm) { id_ = TaskStateId::GetNull(); QueueManagerInfo &qm = HRUN_CLIENT->server_config_.queue_manager_; std::vector queue_info; return HRUN_ADMIN->AsyncCreateTaskState( - task_node, domain_id, state_name, id_, queue_info, blob_mdm); + task_node, domain_id, state_name, id_, queue_info, blob_mdm, bkt_mdm); } HRUN_TASK_NODE_ROOT(AsyncCreate) template @@ -132,6 +133,29 @@ class Client : public TaskLibClient { } HRUN_TASK_NODE_PUSH_ROOT(StageOut); + /** Stage out data to a remote source */ + HSHM_ALWAYS_INLINE + void AsyncUpdateSizeConstruct(UpdateSizeTask *task, + const TaskNode &task_node, + const BucketId &bkt_id, + const hshm::charbuf &blob_name, + size_t blob_off, + size_t data_size, + u32 task_flags) { + HRUN_CLIENT->ConstructTask( + task, task_node, id_, bkt_id, + blob_name, blob_off, data_size, task_flags); + } + HSHM_ALWAYS_INLINE + void UpdateSizeRoot(const BucketId &bkt_id, + const hshm::charbuf &blob_name, + size_t blob_off, + size_t data_size, + u32 task_flags) { + AsyncUpdateSizeRoot(bkt_id, blob_name, blob_off, data_size, task_flags); + } + HRUN_TASK_NODE_PUSH_ROOT(UpdateSize); + /** Parse url */ static inline bool GetUrlProtocolAndAction(const std::string &url, diff --git a/tasks/data_stager/include/data_stager/data_stager_lib_exec.h b/tasks/data_stager/include/data_stager/data_stager_lib_exec.h index 5e955f05e..65268cf74 100644 --- a/tasks/data_stager/include/data_stager/data_stager_lib_exec.h +++ b/tasks/data_stager/include/data_stager/data_stager_lib_exec.h @@ -28,6 +28,10 @@ void Run(u32 method, Task *task, RunContext &rctx) override { StageOut(reinterpret_cast(task), rctx); break; } + case Method::kUpdateSize: { + UpdateSize(reinterpret_cast(task), rctx); + break; + } } } /** Execute a task */ @@ -57,6 +61,10 @@ void Monitor(u32 mode, Task *task, RunContext &rctx) override { MonitorStageOut(mode, reinterpret_cast(task), rctx); break; } + case Method::kUpdateSize: { + MonitorUpdateSize(mode, reinterpret_cast(task), rctx); + break; + } } } /** Delete a task */ @@ -86,6 +94,10 @@ void Del(u32 method, Task *task) override { HRUN_CLIENT->DelTask(reinterpret_cast(task)); break; } + case Method::kUpdateSize: { + HRUN_CLIENT->DelTask(reinterpret_cast(task)); + break; + } } } /** Duplicate a task */ @@ -115,6 +127,10 @@ void Dup(u32 method, Task *orig_task, std::vector> &dups) overrid hrun::CALL_DUPLICATE(reinterpret_cast(orig_task), dups); break; } + case Method::kUpdateSize: { + hrun::CALL_DUPLICATE(reinterpret_cast(orig_task), dups); + break; + } } } /** Register the duplicate output with the origin task */ @@ -144,6 +160,10 @@ void DupEnd(u32 method, u32 replica, Task *orig_task, Task *dup_task) override { hrun::CALL_DUPLICATE_END(replica, reinterpret_cast(orig_task), reinterpret_cast(dup_task)); break; } + case Method::kUpdateSize: { + hrun::CALL_DUPLICATE_END(replica, reinterpret_cast(orig_task), reinterpret_cast(dup_task)); + break; + } } } /** Ensure there is space to store replicated outputs */ @@ -173,6 +193,10 @@ void ReplicateStart(u32 method, u32 count, Task *task) override { hrun::CALL_REPLICA_START(count, reinterpret_cast(task)); break; } + case Method::kUpdateSize: { + hrun::CALL_REPLICA_START(count, reinterpret_cast(task)); + break; + } } } /** Determine success and handle failures */ @@ -202,6 +226,10 @@ void ReplicateEnd(u32 method, Task *task) override { hrun::CALL_REPLICA_END(reinterpret_cast(task)); break; } + case Method::kUpdateSize: { + hrun::CALL_REPLICA_END(reinterpret_cast(task)); + break; + } } } /** Serialize a task when initially pushing into remote */ @@ -231,6 +259,10 @@ std::vector SaveStart(u32 method, BinaryOutputArchive &ar, T ar << *reinterpret_cast(task); break; } + case Method::kUpdateSize: { + ar << *reinterpret_cast(task); + break; + } } return ar.Get(); } @@ -268,6 +300,11 @@ TaskPointer LoadStart(u32 method, BinaryInputArchive &ar) override { ar >> *reinterpret_cast(task_ptr.ptr_); break; } + case Method::kUpdateSize: { + task_ptr.ptr_ = HRUN_CLIENT->NewEmptyTask(task_ptr.shm_); + ar >> *reinterpret_cast(task_ptr.ptr_); + break; + } } return task_ptr; } @@ -298,6 +335,10 @@ std::vector SaveEnd(u32 method, BinaryOutputArchive &ar, Ta ar << *reinterpret_cast(task); break; } + case Method::kUpdateSize: { + ar << *reinterpret_cast(task); + break; + } } return ar.Get(); } @@ -328,6 +369,10 @@ void LoadEnd(u32 replica, u32 method, BinaryInputArchive &ar, Task *task) ar.Deserialize(replica, *reinterpret_cast(task)); break; } + case Method::kUpdateSize: { + ar.Deserialize(replica, *reinterpret_cast(task)); + break; + } } } /** Get the grouping of the task */ @@ -351,6 +396,9 @@ u32 GetGroup(u32 method, Task *task, hshm::charbuf &group) override { case Method::kStageOut: { return reinterpret_cast(task)->GetGroup(group); } + case Method::kUpdateSize: { + return reinterpret_cast(task)->GetGroup(group); + } } return -1; } diff --git a/tasks/data_stager/include/data_stager/data_stager_methods.h b/tasks/data_stager/include/data_stager/data_stager_methods.h index 2f7d2af52..a1dabbbf9 100644 --- a/tasks/data_stager/include/data_stager/data_stager_methods.h +++ b/tasks/data_stager/include/data_stager/data_stager_methods.h @@ -7,6 +7,7 @@ struct Method : public TaskMethod { TASK_METHOD_T kUnregisterStager = kLast + 1; TASK_METHOD_T kStageIn = kLast + 2; TASK_METHOD_T kStageOut = kLast + 3; + TASK_METHOD_T kUpdateSize = kLast + 4; }; #endif // HRUN_DATA_STAGER_METHODS_H_ \ No newline at end of file diff --git a/tasks/data_stager/include/data_stager/data_stager_methods.yaml b/tasks/data_stager/include/data_stager/data_stager_methods.yaml index a2cac4953..0dec22c7b 100644 --- a/tasks/data_stager/include/data_stager/data_stager_methods.yaml +++ b/tasks/data_stager/include/data_stager/data_stager_methods.yaml @@ -1,4 +1,5 @@ kRegisterStager: 0 kUnregisterStager: 1 kStageIn: 2 -kStageOut: 3 \ No newline at end of file +kStageOut: 3 +kUpdateSize: 4 \ No newline at end of file diff --git a/tasks/data_stager/include/data_stager/data_stager_tasks.h b/tasks/data_stager/include/data_stager/data_stager_tasks.h index 8a2bdd03c..5184c84b5 100644 --- a/tasks/data_stager/include/data_stager/data_stager_tasks.h +++ b/tasks/data_stager/include/data_stager/data_stager_tasks.h @@ -25,6 +25,7 @@ using hrun::proc_queue::PushTask; using hrun::Admin::CreateTaskStateTask; struct ConstructTask : public CreateTaskStateTask { TaskStateId blob_mdm_; + TaskStateId bkt_mdm_; /** SHM default constructor */ HSHM_ALWAYS_INLINE explicit @@ -39,14 +40,16 @@ struct ConstructTask : public CreateTaskStateTask { const std::string &state_name, const TaskStateId &id, const std::vector &queue_info, - const TaskStateId &blob_mdm) + const TaskStateId &blob_mdm, + const TaskStateId &bkt_mdm) : CreateTaskStateTask(alloc, task_node, domain_id, state_name, "data_stager", id, queue_info) { // Custom params blob_mdm_ = blob_mdm; + bkt_mdm_ = bkt_mdm; std::stringstream ss; cereal::BinaryOutputArchive ar(ss); - ar(blob_mdm_); + ar(blob_mdm_, bkt_mdm_); std::string data = ss.str(); *custom_ = data; } @@ -56,7 +59,7 @@ struct ConstructTask : public CreateTaskStateTask { std::string data = custom_->str(); std::stringstream ss(data); cereal::BinaryInputArchive ar(ss); - ar(blob_mdm_); + ar(blob_mdm_, bkt_mdm_); } HSHM_ALWAYS_INLINE @@ -337,6 +340,57 @@ struct StageOutTask : public Task, TaskFlags { } }; +/** + * A task to stage data out in a Hermes deployment + * */ +struct UpdateSizeTask : public Task, TaskFlags { + IN hermes::BucketId bkt_id_; + IN hipc::ShmArchive blob_name_; + IN size_t blob_off_, data_size_; + + /** SHM default constructor */ + HSHM_ALWAYS_INLINE explicit + UpdateSizeTask(hipc::Allocator *alloc) : Task(alloc) {} + + /** Emplace constructor */ + HSHM_ALWAYS_INLINE explicit + UpdateSizeTask(hipc::Allocator *alloc, + const TaskNode &task_node, + const TaskStateId &state_id, + const BucketId &bkt_id, + const hshm::charbuf &blob_name, + size_t blob_off, + size_t data_size, + u32 task_flags): Task(alloc) { + // Initialize task + task_node_ = task_node; + lane_hash_ = bkt_id.hash_; + prio_ = TaskPrio::kLowLatency; + task_state_ = state_id; + method_ = Method::kUpdateSize; + task_flags_.SetBits(task_flags | TASK_FIRE_AND_FORGET | TASK_LOW_LATENCY | TASK_REMOTE_DEBUG_MARK); + domain_id_ = DomainId::GetLocal(); + + // Custom params + bkt_id_ = bkt_id; + HSHM_MAKE_AR(blob_name_, alloc, blob_name); + blob_off_ = blob_off; + data_size_ = data_size; + } + + /** Destructor */ + HSHM_ALWAYS_INLINE + ~UpdateSizeTask() { + HSHM_DESTROY_AR(blob_name_) + } + + /** Create group */ + HSHM_ALWAYS_INLINE + u32 GetGroup(hshm::charbuf &group) { + return TASK_UNORDERED; + } +}; + } // namespace hrun::data_stager #endif // HRUN_TASKS_TASK_TEMPL_INCLUDE_data_stager_data_stager_TASKS_H_ diff --git a/tasks/data_stager/include/data_stager/factory/abstract_stager.h b/tasks/data_stager/include/data_stager/factory/abstract_stager.h index 02483101c..80e000f20 100644 --- a/tasks/data_stager/include/data_stager/factory/abstract_stager.h +++ b/tasks/data_stager/include/data_stager/factory/abstract_stager.h @@ -6,6 +6,7 @@ #define HERMES_TASKS_DATA_STAGER_SRC_ABSTRACT_STAGER_H_ #include "../data_stager.h" +#include "hermes_bucket_mdm/hermes_bucket_mdm.h" namespace hermes::data_stager { @@ -20,6 +21,7 @@ class AbstractStager { virtual void RegisterStager(RegisterStagerTask *task, RunContext &rctx) = 0; virtual void StageIn(blob_mdm::Client &blob_mdm, StageInTask *task, RunContext &rctx) = 0; virtual void StageOut(blob_mdm::Client &blob_mdm, StageOutTask *task, RunContext &rctx) = 0; + virtual void UpdateSize(bucket_mdm::Client &bkt_mdm, UpdateSizeTask *task, RunContext &rctx) = 0; }; } // namespace hermes diff --git a/tasks/data_stager/include/data_stager/factory/binary_stager.h b/tasks/data_stager/include/data_stager/factory/binary_stager.h index 2b40d15db..cbc921dc7 100644 --- a/tasks/data_stager/include/data_stager/factory/binary_stager.h +++ b/tasks/data_stager/include/data_stager/factory/binary_stager.h @@ -14,6 +14,7 @@ class BinaryFileStager : public AbstractStager { public: size_t page_size_; std::string path_; + bitfield32_t flags_; public: /** Default constructor */ @@ -23,20 +24,26 @@ class BinaryFileStager : public AbstractStager { ~BinaryFileStager() {} /** Build context for staging */ - static Context BuildContext(size_t page_size) { + static Context BuildContext(size_t page_size, + u32 flags = 0, + size_t elmt_size = 1) { Context ctx; ctx.flags_.SetBits(HERMES_SHOULD_STAGE); - ctx.bkt_params_ = BuildFileParams(page_size); + ctx.bkt_params_ = BuildFileParams(page_size, flags, elmt_size); return ctx; } /** Build serialized file parameter pack */ - static std::string BuildFileParams(size_t page_size) { - std::string params; + static std::string BuildFileParams(size_t page_size, + u32 flags = 0, + size_t elmt_size = 1) { + hshm::charbuf params(32); + page_size = (page_size / elmt_size) * elmt_size; hrun::LocalSerialize srl(params); srl << std::string("file"); + srl << flags; srl << page_size; - return params; + return params.str(); } /** Create the data stager payload */ @@ -45,12 +52,16 @@ class BinaryFileStager : public AbstractStager { std::string protocol; hrun::LocalDeserialize srl(params); srl >> protocol; + srl >> flags_.bits_; srl >> page_size_; path_ = task->tag_name_->str(); } /** Stage data in from remote source */ void StageIn(blob_mdm::Client &blob_mdm, StageInTask *task, RunContext &rctx) override { + if (flags_.Any(HERMES_STAGE_NO_READ)) { + return; + } adapter::BlobPlacement plcmnt; plcmnt.DecodeBlobName(*task->blob_name_, page_size_); HILOG(kDebug, "Attempting to stage {} bytes from the backend file {} at offset {}", @@ -68,8 +79,8 @@ class BinaryFileStager : public AbstractStager { (off_t)plcmnt.bucket_off_); HERMES_POSIX_API->close(fd); if (real_size < 0) { - HELOG(kError, "Failed to stage in {} bytes from {}", - page_size_, path_); +// HELOG(kError, "Failed to stage in {} bytes from {}", +// page_size_, path_); HRUN_CLIENT->FreeBuffer(blob); return; } else if (real_size == 0) { @@ -95,6 +106,9 @@ class BinaryFileStager : public AbstractStager { /** Stage data out to remote source */ void StageOut(blob_mdm::Client &blob_mdm, StageOutTask *task, RunContext &rctx) override { + if (flags_.Any(HERMES_STAGE_NO_WRITE)) { + return; + } adapter::BlobPlacement plcmnt; plcmnt.DecodeBlobName(*task->blob_name_, page_size_); HILOG(kDebug, "Attempting to stage {} bytes to the backend file {} at offset {}", @@ -117,6 +131,16 @@ class BinaryFileStager : public AbstractStager { HILOG(kDebug, "Staged out {} bytes to the backend file {}", real_size, path_); } + + void UpdateSize(bucket_mdm::Client &bkt_mdm, UpdateSizeTask *task, RunContext &rctx) override { + adapter::BlobPlacement p; + std::string blob_name_str = task->blob_name_->str(); + p.DecodeBlobName(blob_name_str, page_size_); + bkt_mdm.AsyncUpdateSize(task->task_node_ + 1, + task->bkt_id_, + p.bucket_off_ + task->blob_off_ + task->data_size_, + bucket_mdm::UpdateSizeMode::kCap); + } }; } // namespace hermes::data_stager diff --git a/tasks/data_stager/src/data_stager.cc b/tasks/data_stager/src/data_stager.cc index 08275d05a..319c3469a 100644 --- a/tasks/data_stager/src/data_stager.cc +++ b/tasks/data_stager/src/data_stager.cc @@ -9,6 +9,7 @@ #include "hermes_adapters/posix/posix_api.h" #include "hermes_blob_mdm/hermes_blob_mdm.h" #include "data_stager/factory/stager_factory.h" +#include "hermes_bucket_mdm/hermes_bucket_mdm.h" namespace hermes::data_stager { @@ -16,6 +17,7 @@ class Server : public TaskLib { public: std::vector>> url_map_; blob_mdm::Client blob_mdm_; + bucket_mdm::Client bkt_mdm_; public: Server() = default; @@ -25,6 +27,7 @@ class Server : public TaskLib { task->Deserialize(); url_map_.resize(HRUN_QM_RUNTIME->max_lanes_); blob_mdm_.Init(task->blob_mdm_, HRUN_ADMIN->queue_id_); + bkt_mdm_.Init(task->bkt_mdm_, HRUN_ADMIN->queue_id_); HILOG(kInfo, "(node {}) BLOB MDM: {}", HRUN_CLIENT->node_id_, blob_mdm_.id_); task->SetModuleComplete(); } @@ -99,6 +102,22 @@ class Server : public TaskLib { void MonitorStageOut(u32 mode, StageOutTask *task, RunContext &rctx) { } + /** Update the size of the bucket */ + void UpdateSize(UpdateSizeTask *task, RunContext &rctx) { + std::unordered_map>::iterator it = + url_map_[rctx.lane_id_].find(task->bkt_id_); + if (it == url_map_[rctx.lane_id_].end()) { + HELOG(kError, "Could not find stager for bucket: {}", task->bkt_id_); + task->SetModuleComplete(); + return; + } + std::unique_ptr &stager = it->second; + stager->UpdateSize(bkt_mdm_, task, rctx); + task->SetModuleComplete(); + } + void MonitorUpdateSize(u32 mode, UpdateSizeTask *task, RunContext &rctx) { + } + public: #include "data_stager/data_stager_lib_exec.h" }; diff --git a/tasks/hermes_blob_mdm/include/hermes_blob_mdm/hermes_blob_mdm.h b/tasks/hermes_blob_mdm/include/hermes_blob_mdm/hermes_blob_mdm.h index 3cb367c9d..6bb285e14 100644 --- a/tasks/hermes_blob_mdm/include/hermes_blob_mdm/hermes_blob_mdm.h +++ b/tasks/hermes_blob_mdm/include/hermes_blob_mdm/hermes_blob_mdm.h @@ -154,7 +154,8 @@ class Client : public TaskLibClient { Context ctx = Context(), u32 flags = 0) { LPointer> push_task = - AsyncGetBlobRoot(tag_id, hshm::charbuf(""), blob_id, off, data_size, data, ctx, flags); + AsyncGetBlobRoot(tag_id, hshm::charbuf(""), + blob_id, off, data_size, data, ctx, flags); push_task->Wait(); GetBlobTask *task = push_task->get(); data = task->data_; @@ -174,15 +175,16 @@ class Client : public TaskLibClient { void AsyncReorganizeBlobConstruct(ReorganizeBlobTask *task, const TaskNode &task_node, const TagId &tag_id, + const hshm::charbuf &blob_name, const BlobId &blob_id, float score, - u32 node_id, bool user_score, + const Context &ctx = Context(), u32 task_flags = TASK_LOW_LATENCY | TASK_FIRE_AND_FORGET) { // HILOG(kDebug, "Beginning REORGANIZE (task_node={})", task_node); HRUN_CLIENT->ConstructTask( task, task_node, DomainId::GetNode(blob_id.node_id_), id_, - tag_id, blob_id, score, node_id, user_score, task_flags); + tag_id, blob_name, blob_id, score, user_score, ctx, task_flags); } HRUN_TASK_NODE_PUSH_ROOT(ReorganizeBlob); diff --git a/tasks/hermes_blob_mdm/include/hermes_blob_mdm/hermes_blob_mdm_tasks.h b/tasks/hermes_blob_mdm/include/hermes_blob_mdm/hermes_blob_mdm_tasks.h index f3307a772..0e217aee2 100644 --- a/tasks/hermes_blob_mdm/include/hermes_blob_mdm/hermes_blob_mdm_tasks.h +++ b/tasks/hermes_blob_mdm/include/hermes_blob_mdm/hermes_blob_mdm_tasks.h @@ -229,10 +229,12 @@ class PutBlobPhase { #define HERMES_BLOB_APPEND BIT_OPT(u32, 1) #define HERMES_DID_STAGE_IN BIT_OPT(u32, 2) #define HERMES_SHOULD_STAGE BIT_OPT(u32, 3) -#define HERMES_BLOB_DID_CREATE BIT_OPT(u32, 4) -#define HERMES_GET_BLOB_ID BIT_OPT(u32, 5) -#define HERMES_HAS_DERIVED BIT_OPT(u32, 6) -#define HERMES_USER_SCORE_STATIONARY BIT_OPT(u32, 7) +#define HERMES_STAGE_NO_WRITE BIT_OPT(u32, 4) +#define HERMES_STAGE_NO_READ BIT_OPT(u32, 5) +#define HERMES_BLOB_DID_CREATE BIT_OPT(u32, 6) +#define HERMES_GET_BLOB_ID BIT_OPT(u32, 7) +#define HERMES_HAS_DERIVED BIT_OPT(u32, 8) +#define HERMES_USER_SCORE_STATIONARY BIT_OPT(u32, 9) /** A task to put data in a blob */ struct PutBlobTask : public Task, TaskFlags { @@ -1082,6 +1084,7 @@ struct ReorganizeBlobPhase { /** A task to reorganize a blob's composition in the hierarchy */ struct ReorganizeBlobTask : public Task, TaskFlags { + IN hipc::ShmArchive blob_name_; IN BlobId blob_id_; IN float score_; IN u32 node_id_; @@ -1104,10 +1107,11 @@ struct ReorganizeBlobTask : public Task, TaskFlags { const DomainId &domain_id, const TaskStateId &state_id, const TagId &tag_id, + const hshm::charbuf &blob_name, const BlobId &blob_id, float score, - u32 node_id, bool is_user_score, + const Context &ctx, u32 task_flags = TASK_LOW_LATENCY | TASK_FIRE_AND_FORGET) : Task(alloc) { // Initialize task task_node_ = task_node; @@ -1120,17 +1124,23 @@ struct ReorganizeBlobTask : public Task, TaskFlags { // Custom params tag_id_ = tag_id; + HSHM_MAKE_AR(blob_name_, alloc, blob_name); blob_id_ = blob_id; score_ = score; - node_id_ = node_id; + node_id_ = ctx.node_id_; is_user_score_ = is_user_score; } + /** Destructor */ + ~ReorganizeBlobTask() { + HSHM_DESTROY_AR(blob_name_) + } + /** (De)serialize message call */ template void SerializeStart(Ar &ar) { task_serialize(ar); - ar(tag_id_, blob_id_, score_, node_id_); + ar(tag_id_, blob_name_, blob_id_, score_, node_id_); } /** (De)serialize message return */ diff --git a/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc b/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc index 3ab584a35..1aa9e3bfe 100644 --- a/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc +++ b/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc @@ -273,11 +273,13 @@ class Server : public TaskLib { float new_score = MakeScore(blob_info, now); blob_info.score_ = new_score; if (ShouldReorganize(blob_info, new_score, task->task_node_)) { + Context ctx; LPointer reorg_task = blob_mdm_.AsyncReorganizeBlob(task->task_node_ + 1, blob_info.tag_id_, + hshm::charbuf(""), blob_info.blob_id_, - new_score, 0, false, + new_score, false, ctx, TASK_LOW_LATENCY); reorg_task->Wait(task); HRUN_CLIENT->DelTask(reorg_task); @@ -469,14 +471,11 @@ class Server : public TaskLib { // Update information if (task->flags_.Any(HERMES_SHOULD_STAGE)) { - // TODO(llogan): Move to data stager - adapter::BlobPlacement p; - std::string blob_name_str = task->blob_name_->str(); - p.DecodeBlobName(blob_name_str, 1 << 20); - bkt_mdm_.AsyncUpdateSize(task->task_node_ + 1, - task->tag_id_, - p.bucket_off_ + task->blob_off_ + task->data_size_, - bucket_mdm::UpdateSizeMode::kCap); + stager_mdm_.AsyncUpdateSize(task->task_node_ + 1, + task->tag_id_, + blob_info.name_, + task->blob_off_, + task->data_size_, 0); } else { bkt_mdm_.AsyncUpdateSize(task->task_node_ + 1, task->tag_id_, @@ -859,6 +858,12 @@ class Server : public TaskLib { void ReorganizeBlob(ReorganizeBlobTask *task, RunContext &rctx) { switch (task->phase_) { case ReorganizeBlobPhase::kGet: { + hshm::charbuf blob_name = hshm::to_charbuf(*task->blob_name_); + if (task->blob_id_.IsNull()) { + bitfield32_t flags; + task->blob_id_ = GetOrCreateBlobId(task->tag_id_, task->lane_hash_, + blob_name, rctx, flags); + } BLOB_MAP_T &blob_map = blob_map_[rctx.lane_id_]; auto it = blob_map.find(task->blob_id_); if (it == blob_map.end()) { diff --git a/tasks/hermes_bucket_mdm/src/hermes_bucket_mdm.cc b/tasks/hermes_bucket_mdm/src/hermes_bucket_mdm.cc index 9c81cacca..786e96f8b 100644 --- a/tasks/hermes_bucket_mdm/src/hermes_bucket_mdm.cc +++ b/tasks/hermes_bucket_mdm/src/hermes_bucket_mdm.cc @@ -323,7 +323,7 @@ class Server : public TaskLib { } stager_mdm_.AsyncUnregisterStager(task->task_node_ + 1, task->tag_id_); - HILOG(kInfo, "Destroying the tag: {}", tag.name_.str()); + HILOG(kDebug, "Destroying the tag: {}", tag.name_.str()); task->phase_ = DestroyTagPhase::kWaitDestroyBlobs; return; } @@ -340,7 +340,7 @@ class Server : public TaskLib { HSHM_DESTROY_AR(task->destroy_blob_tasks_); TAG_MAP_T &tag_map = tag_map_[rctx.lane_id_]; tag_map.erase(task->tag_id_); - HILOG(kInfo, "Finished destroying the tag"); + HILOG(kDebug, "Finished destroying the tag"); task->SetModuleComplete(); } } diff --git a/tasks/posix_bdev/src/posix_bdev.cc b/tasks/posix_bdev/src/posix_bdev.cc index 3a73f8e02..3be455dee 100644 --- a/tasks/posix_bdev/src/posix_bdev.cc +++ b/tasks/posix_bdev/src/posix_bdev.cc @@ -115,7 +115,7 @@ class Server : public TaskLib, public bdev::Server { } } #else - ssize_t count = pwrite(fd_, task->buf_, task->size_, (off_t)task->disk_off_); + ssize_t count = pwrite64(fd_, task->buf_, task->size_, (off64_t)task->disk_off_); if (count != task->size_) { HELOG(kError, "BORG: wrote {} bytes, but expected {}: {}", count, task->size_, strerror(errno)); @@ -169,7 +169,7 @@ class Server : public TaskLib, public bdev::Server { } } #else - ssize_t count = pread(fd_, task->buf_, task->size_, (off_t)task->disk_off_); + ssize_t count = pread64(fd_, task->buf_, task->size_, (off64_t)task->disk_off_); if (count != task->size_) { HELOG(kError, "BORG: read {} bytes, but expected {}", count, task->size_); diff --git a/test/unit/hermes/test_bucket.cc b/test/unit/hermes/test_bucket.cc index 6f5a60ce6..f713999b3 100644 --- a/test/unit/hermes/test_bucket.cc +++ b/test/unit/hermes/test_bucket.cc @@ -317,7 +317,7 @@ TEST_CASE("TestHermesReorganizeBlob") { hermes::Blob blob(KILOBYTES(4)); memset(blob.data(), i % 256, blob.size()); hermes::BlobId blob_id = bkt.Put(std::to_string(i), blob, ctx); - bkt.ReorganizeBlob(blob_id, .5, 0, ctx); + bkt.ReorganizeBlob(blob_id, .5, ctx); hermes::Blob blob2; bkt.Get(blob_id, blob2, ctx); REQUIRE(blob.size() == blob2.size());