Skip to content

Commit

Permalink
Merge pull request #633 from lukemartinlogan/dev
Browse files Browse the repository at this point in the history
Make it so flushing doesn't cause stall
  • Loading branch information
lukemartinlogan authored Oct 25, 2023
2 parents 37853ac + 3e18cf7 commit f776fad
Show file tree
Hide file tree
Showing 53 changed files with 1,386 additions and 616 deletions.
1 change: 0 additions & 1 deletion benchmark/test_latency.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
#include "hermes_shm/util/timer.h"
#include "hrun/work_orchestrator/affinity.h"
#include "hermes/hermes.h"
#include "hrun/work_orchestrator/worker.h"
#include "hrun/api/hrun_runtime.h"

/** The performance of getting a queue */
Expand Down
3 changes: 1 addition & 2 deletions ci/hermes/packages/hermes/package.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ class Hermes(CMakePackage):

version('master',
branch='master', submodules=True)
version('dev', git='https://github.com/lukemartinlogan/hermes.git',
branch='dev', submodules=True)
version('dev', branch='dev', submodules=True)
version('dev-priv', git='https://github.com/lukemartinlogan/hermes.git',
branch='dev', submodules=True)
version("1.0.5-beta", sha256="1f3ba51a8beda4bc1314d6541b800de1525f5e233a6f498fcde6dc43562ddcb7")
Expand Down
15 changes: 15 additions & 0 deletions codegen/refresh_methods
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def refresh_methods(TASK_ROOT):
if TASK_NAME != 'hrun_admin':
methods.insert(0, ('kConstruct', -2))
methods.insert(1, ('kDestruct', -1))
monitor_modes = ['kEstTime', 'kTrainTime', 'kFlushStat']

# Produce the TASK_NAME_methods.h file
lines = []
Expand Down Expand Up @@ -67,6 +68,20 @@ def refresh_methods(TASK_ROOT):
lines += [' }']
lines += ['}']

## Create the Monitor method
lines += ['/** Execute a task */',
'void Monitor(u32 mode, Task *task, RunContext &rctx) override {',
' switch (task->method_) {']
for method_enum_name, method_off in methods:
method_name = method_enum_name.replace('k', '', 1)
task_name = method_name + "Task"
lines += [f' case Method::{method_enum_name}: {{',
f' Monitor{method_name}(mode, reinterpret_cast<{task_name} *>(task), rctx);',
f' break;',
f' }}']
lines += [' }']
lines += ['}']

## Create the Del method
lines += ['/** Delete a task */',
'void Del(u32 method, Task *task) override {',
Expand Down
11 changes: 7 additions & 4 deletions hermes_adapters/filesystem/filesystem.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "hermes_shm/util/singleton.h"
#include "filesystem_mdm.h"
#include "hermes_adapters/mapper/mapper_factory.h"
#include "data_stager/factory/stager_factory.h"

#include <fcntl.h>
#include <filesystem>
Expand Down Expand Up @@ -55,23 +56,25 @@ void Filesystem::Open(AdapterStat &stat, File &f, const std::string &path) {
return;
}
}
// Update page size
stat.page_size_ = mdm->GetAdapterPageSize(path);
// Get or create the bucket
hshm::charbuf url = hermes::data_stager::BinaryFileStager::BuildFileUrl(
stat.path_, stat.page_size_);
if (stat.hflags_.Any(HERMES_FS_TRUNC)) {
// The file was opened with TRUNCATION
stat.bkt_id_ = HERMES->GetBucket(stat.path_, ctx, 0, HERMES_BUCKET_IS_FILE);
stat.bkt_id_ = HERMES->GetBucket(url.str(), ctx, 0, HERMES_IS_FILE);
stat.bkt_id_.Clear();
} else {
// The file was opened regularly
stat.file_size_ = io_client_->GetSize(*path_shm);
stat.bkt_id_ = HERMES->GetBucket(stat.path_, ctx, stat.file_size_, HERMES_BUCKET_IS_FILE);
stat.bkt_id_ = HERMES->GetBucket(url.str(), ctx, stat.file_size_, HERMES_IS_FILE);
}
HILOG(kDebug, "File has size: {}", stat.bkt_id_.GetSize());
// Update file position pointer
if (stat.hflags_.Any(HERMES_FS_APPEND)) {
stat.st_ptr_ = std::numeric_limits<size_t>::max();
}
// Update page size
stat.page_size_ = mdm->GetAdapterPageSize(path);
// Allocate internal hermes data
auto stat_ptr = std::make_shared<AdapterStat>(stat);
FilesystemIoClientState fs_ctx(&mdm->fs_mdm_, (void*)stat_ptr.get());
Expand Down
201 changes: 8 additions & 193 deletions hrun/include/hrun/api/hrun_runtime.h
Original file line number Diff line number Diff line change
@@ -1,196 +1,11 @@
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
* Distributed under BSD 3-Clause license. *
* Copyright by The HDF Group. *
* Copyright by the Illinois Institute of Technology. *
* All rights reserved. *
* *
* This file is part of Hermes. The full Hermes copyright notice, including *
* terms governing use, modification, and redistribution, is contained in *
* the COPYING file, which can be found at the top directory. If you do not *
* have access to the file, you may request a copy from [email protected]. *
* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
//
// Created by lukemartinlogan on 10/24/23.
//

#ifndef HRUN_INCLUDE_HRUN_CLIENT_HRUN_SERVER_H_
#define HRUN_INCLUDE_HRUN_CLIENT_HRUN_SERVER_H_
#ifndef HERMES_HRUN_INCLUDE_HRUN_API_HRUN_RUNTIME_H_
#define HERMES_HRUN_INCLUDE_HRUN_API_HRUN_RUNTIME_H_

#include "hrun/task_registry/task_registry.h"
#include "hrun/work_orchestrator/work_orchestrator.h"
#include "hrun/queue_manager/queue_manager_runtime.h"
#include "hrun_admin/hrun_admin.h"
#include "remote_queue/remote_queue.h"
#include "hrun_client.h"
#include "manager.h"
#include "hrun/network/rpc.h"
#include "hrun/network/rpc_thallium.h"
#include "hrun_runtime_.h"
#include "hrun/work_orchestrator/worker.h"

// Singleton macros
#define HRUN_RUNTIME hshm::Singleton<hrun::Runtime>::GetInstance()
#define HRUN_RUNTIME_T hrun::Runtime*
#define HRUN_REMOTE_QUEUE (&HRUN_RUNTIME->remote_queue_)
#define HRUN_THALLIUM (&HRUN_RUNTIME->thallium_)
#define HRUN_RPC (&HRUN_RUNTIME->rpc_)

namespace hrun {

class Runtime : public ConfigurationManager {
public:
int data_;
TaskRegistry task_registry_;
WorkOrchestrator work_orchestrator_;
QueueManagerRuntime queue_manager_;
remote_queue::Client remote_queue_;
RpcContext rpc_;
ThalliumRpc thallium_;
bool remote_created_ = false;

public:
/** Default constructor */
Runtime() = default;

/** Create the server-side API */
Runtime* Create(std::string server_config_path = "") {
hshm::ScopedMutex lock(lock_, 1);
if (is_initialized_) {
return this;
}
mode_ = HrunMode::kServer;
is_being_initialized_ = true;
ServerInit(std::move(server_config_path));
is_initialized_ = true;
is_being_initialized_ = false;
return this;
}

private:
/** Initialize */
void ServerInit(std::string server_config_path) {
LoadServerConfig(server_config_path);
InitSharedMemory();
rpc_.ServerInit(&server_config_);
thallium_.ServerInit(&rpc_);
header_->node_id_ = rpc_.node_id_;
header_->unique_ = 0;
header_->num_nodes_ = server_config_.rpc_.host_names_.size();
task_registry_.ServerInit(&server_config_, rpc_.node_id_, header_->unique_);
// Queue manager + client must be initialized before Work Orchestrator
queue_manager_.ServerInit(main_alloc_,
rpc_.node_id_,
&server_config_,
header_->queue_manager_);
HRUN_CLIENT->Create(server_config_path, "", true);
HERMES_THREAD_MODEL->SetThreadModel(hshm::ThreadType::kPthread);
work_orchestrator_.ServerInit(&server_config_, queue_manager_);
hipc::mptr<Admin::CreateTaskStateTask> admin_task;

// Create the admin library
HRUN_CLIENT->MakeTaskStateId();
admin_task = hipc::make_mptr<Admin::CreateTaskStateTask>();
task_registry_.RegisterTaskLib("hrun_admin");
task_registry_.CreateTaskState(
"hrun_admin",
"hrun_admin",
HRUN_QM_CLIENT->admin_task_state_,
admin_task.get());

// Create the process queue
HRUN_CLIENT->MakeTaskStateId();
admin_task = hipc::make_mptr<Admin::CreateTaskStateTask>();
task_registry_.RegisterTaskLib("proc_queue");
task_registry_.CreateTaskState(
"proc_queue",
"proc_queue",
HRUN_QM_CLIENT->process_queue_,
admin_task.get());

// Create the work orchestrator queue scheduling library
TaskStateId queue_sched_id = HRUN_CLIENT->MakeTaskStateId();
admin_task = hipc::make_mptr<Admin::CreateTaskStateTask>();
task_registry_.RegisterTaskLib("worch_queue_round_robin");
task_registry_.CreateTaskState(
"worch_queue_round_robin",
"worch_queue_round_robin",
queue_sched_id,
admin_task.get());

// Create the work orchestrator process scheduling library
TaskStateId proc_sched_id = HRUN_CLIENT->MakeTaskStateId();
admin_task = hipc::make_mptr<Admin::CreateTaskStateTask>();
task_registry_.RegisterTaskLib("worch_proc_round_robin");
task_registry_.CreateTaskState(
"worch_proc_round_robin",
"worch_proc_round_robin",
proc_sched_id,
admin_task.get());

// Set the work orchestrator queue scheduler
HRUN_ADMIN->SetWorkOrchQueuePolicyRoot(hrun::DomainId::GetLocal(), queue_sched_id);
HRUN_ADMIN->SetWorkOrchProcPolicyRoot(hrun::DomainId::GetLocal(), proc_sched_id);

// Create the remote queue library
task_registry_.RegisterTaskLib("remote_queue");
remote_queue_.CreateRoot(DomainId::GetLocal(), "remote_queue",
HRUN_CLIENT->MakeTaskStateId());
remote_created_ = true;
}

public:
/** Initialize shared-memory between daemon and client */
void InitSharedMemory() {
// Create shared-memory allocator
auto mem_mngr = HERMES_MEMORY_MANAGER;
if (server_config_.queue_manager_.shm_size_ == 0) {
server_config_.queue_manager_.shm_size_ =
hipc::MemoryManager::GetDefaultBackendSize();
}
mem_mngr->CreateBackend<hipc::PosixShmMmap>(
server_config_.queue_manager_.shm_size_,
server_config_.queue_manager_.shm_name_);
main_alloc_ =
mem_mngr->CreateAllocator<hipc::ScalablePageAllocator>(
server_config_.queue_manager_.shm_name_,
main_alloc_id_,
sizeof(HrunShm));
header_ = main_alloc_->GetCustomHeader<HrunShm>();
}

/** Finalize Hermes explicitly */
void Finalize() {}

/** Run the Hermes core Daemon */
void RunDaemon() {
thallium_.RunDaemon();
HILOG(kInfo, "Daemon is running")
// while (HRUN_WORK_ORCHESTRATOR->IsRuntimeAlive()) {
// // Scheduler callbacks?
// HERMES_THREAD_MODEL->SleepForUs(1000);
// }
HILOG(kInfo, "Finishing up last requests")
HRUN_WORK_ORCHESTRATOR->Join();
HILOG(kInfo, "Daemon is exiting")
}

/** Stop the Hermes core Daemon */
void StopDaemon() {
HRUN_WORK_ORCHESTRATOR->FinalizeRuntime();
}

/** Get the set of DomainIds */
std::vector<DomainId> ResolveDomainId(const DomainId &domain_id) {
std::vector<DomainId> ids;
if (domain_id.IsGlobal()) {
ids.reserve(rpc_.hosts_.size());
for (HostInfo &host_info : rpc_.hosts_) {
ids.push_back(DomainId::GetNode(host_info.node_id_));
}
} else if (domain_id.IsNode()) {
ids.reserve(1);
ids.push_back(domain_id);
}
// TODO(llogan): handle named domain ID sets
return ids;
}
};

} // namespace hrun

#endif // HRUN_INCLUDE_HRUN_CLIENT_HRUN_SERVER_H_
#endif // HERMES_HRUN_INCLUDE_HRUN_API_HRUN_RUNTIME_H_
76 changes: 76 additions & 0 deletions hrun/include/hrun/api/hrun_runtime_.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
* Distributed under BSD 3-Clause license. *
* Copyright by The HDF Group. *
* Copyright by the Illinois Institute of Technology. *
* All rights reserved. *
* *
* This file is part of Hermes. The full Hermes copyright notice, including *
* terms governing use, modification, and redistribution, is contained in *
* the COPYING file, which can be found at the top directory. If you do not *
* have access to the file, you may request a copy from [email protected]. *
* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */

#ifndef HRUN_INCLUDE_HRUN_CLIENT_HRUN_SERVER_H_
#define HRUN_INCLUDE_HRUN_CLIENT_HRUN_SERVER_H_

#include "hrun/task_registry/task_registry.h"
#include "hrun/work_orchestrator/work_orchestrator.h"
#include "hrun/queue_manager/queue_manager_runtime.h"
#include "hrun_admin/hrun_admin.h"
#include "remote_queue/remote_queue.h"
#include "hrun_client.h"
#include "manager.h"
#include "hrun/network/rpc.h"
#include "hrun/network/rpc_thallium.h"

// Singleton macros
#define HRUN_RUNTIME hshm::Singleton<hrun::Runtime>::GetInstance()
#define HRUN_RUNTIME_T hrun::Runtime*
#define HRUN_REMOTE_QUEUE (&HRUN_RUNTIME->remote_queue_)
#define HRUN_THALLIUM (&HRUN_RUNTIME->thallium_)
#define HRUN_RPC (&HRUN_RUNTIME->rpc_)

namespace hrun {

class Runtime : public ConfigurationManager {
public:
int data_;
TaskRegistry task_registry_;
WorkOrchestrator work_orchestrator_;
QueueManagerRuntime queue_manager_;
remote_queue::Client remote_queue_;
RpcContext rpc_;
ThalliumRpc thallium_;
bool remote_created_ = false;

public:
/** Default constructor */
Runtime() = default;

/** Create the server-side API */
Runtime* Create(std::string server_config_path = "");

private:
/** Initialize */
void ServerInit(std::string server_config_path);

public:
/** Initialize shared-memory between daemon and client */
void InitSharedMemory();

/** Finalize Hermes explicitly */
void Finalize();

/** Run the Hermes core Daemon */
void RunDaemon();

/** Stop the Hermes core Daemon */
void StopDaemon();

/** Get the set of DomainIds */
std::vector<DomainId> ResolveDomainId(const DomainId &domain_id);
};

} // namespace hrun

#endif // HRUN_INCLUDE_HRUN_CLIENT_HRUN_SERVER_H_
2 changes: 1 addition & 1 deletion hrun/include/hrun/queue_manager/queue_manager_runtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class QueueManagerRuntime : public QueueManager {
void ServerInit(hipc::Allocator *alloc, u32 node_id, ServerConfig *config, QueueManagerShm &shm) {
config_ = config;
Init(node_id);
QueueManagerInfo &qm = config_->queue_manager_;
config::QueueManagerInfo &qm = config_->queue_manager_;
// Initialize ticket queue (ticket 0 is for admin queue)
max_queues_ = qm.max_queues_;
max_lanes_ = qm.max_lanes_;
Expand Down
4 changes: 3 additions & 1 deletion hrun/include/hrun/task_registry/task.h
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,10 @@ static inline std::ostream &operator<<(std::ostream &os, const TaskNode &obj) {
#define TF_SRL_ASYM (TF_SRL_ASYM_START | TF_SRL_ASYM_END)
/** This task uses replication */
#define TF_REPLICA BIT_OPT(u32, 31)
/** This task uses duplication */
/** This task is intended to be used only locally */
#define TF_LOCAL BIT_OPT(u32, 5)
/** This task supports monitoring of all sub-methods */
#define TF_MONITOR BIT_OPT(u32, 6)

/** All tasks inherit this to easily check if a class is a task using SFINAE */
class IsTask {};
Expand Down Expand Up @@ -219,6 +220,7 @@ struct TaskFlags : public IsTask {
TASK_FLAG_T SRL_SYM_START = FLAGS & TF_SRL_SYM_START;
TASK_FLAG_T SRL_SYM_END = FLAGS & TF_SRL_SYM_END;
TASK_FLAG_T REPLICA = FLAGS & TF_REPLICA;
TASK_FLAG_T MONITOR = FLAGS & TF_MONITOR;
};

/** The type of a compile-time task flag */
Expand Down
Loading

0 comments on commit f776fad

Please sign in to comment.