Skip to content

Commit

Permalink
Merge pull request #645 from lukemartinlogan/dev
Browse files Browse the repository at this point in the history
Various updates
  • Loading branch information
lukemartinlogan authored Nov 15, 2023
2 parents 5895b2a + 7020424 commit 73e3b76
Show file tree
Hide file tree
Showing 45 changed files with 1,306 additions and 215 deletions.
9 changes: 9 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,15 @@ if(PkgConfig)
message(STATUS "found pkg config")
endif()

# LIBAIO
find_library(LIBAIO_LIBRARY NAMES aio)
if(LIBAIO_LIBRARY)
message(STATUS "found libaio at ${LIBAIO_LIBRARY}")
else()
set(LIBAIO_LIBRARY aio)
message(STATUS "Assuming it was installed with our aio spack")
endif()

# Zeromq
#pkg_check_modules(ZMQ REQUIRED libzmq)
#include_directories(${ZMQ_INCLUDE_DIRS})
Expand Down
4 changes: 4 additions & 0 deletions benchmark/hermes_api_bench.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,11 @@ void GetTest(int nprocs, int rank,
void PutGetTest(int nprocs, int rank, int repeat,
size_t blobs_per_rank, size_t blob_size) {
PutTest(nprocs, rank, repeat, blobs_per_rank, blob_size);
HILOG(kInfo, "Beginning barrier")
MPI_Barrier(MPI_COMM_WORLD);
HILOG(kInfo, "Beginning flushing")
HRUN_ADMIN->FlushRoot(DomainId::GetGlobal());
HILOG(kInfo, "Finished flushing")
GetTest(nprocs, rank, repeat, blobs_per_rank, blob_size);
}

Expand Down
11 changes: 6 additions & 5 deletions benchmark/test_latency.cc
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ TEST_CASE("TestHshmQueueGetLane") {

/** Single-thread performance of getting, emplacing, and popping a queue */
TEST_CASE("TestHshmQueueAllocateEmplacePop") {
TRANSPARENT_HERMES();
hrun::QueueId qid(0, 3);
std::vector<PriorityInfo> queue_info = {
{16, 16, 256, 0}
Expand Down Expand Up @@ -251,8 +252,7 @@ TEST_CASE("TestWorkerLatency") {

/** Time to process a request */
TEST_CASE("TestRoundTripLatency") {
TRANSPARENT_HRUN();
HERMES->ClientInit();
TRANSPARENT_HERMES();
hrun::small_message::Client client;
HRUN_ADMIN->RegisterTaskLibRoot(hrun::DomainId::GetLocal(), "small_message");
// int count = 25;
Expand All @@ -263,14 +263,15 @@ TEST_CASE("TestRoundTripLatency") {
client.CreateRoot(hrun::DomainId::GetLocal(), "ipc_test");
hshm::Timer t;

int pid = getpid();
ProcessAffiner::SetCpuAffinity(pid, 8);
// int pid = getpid();
// ProcessAffiner::SetCpuAffinity(pid, 8);

t.Resume();
size_t ops = (1 << 20);
// size_t ops = 1024;
for (size_t i = 0; i < ops; ++i) {
client.MdPushRoot(hrun::DomainId::GetLocal());
client.MdRoot(hrun::DomainId::GetLocal());
// client.MdPushRoot(hrun::DomainId::GetLocal());
}
t.Pause();

Expand Down
1 change: 1 addition & 0 deletions ci/hermes/packages/hermes/package.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class Hermes(CMakePackage):
depends_on('mpi')
depends_on('cereal')
depends_on('yaml-cpp')
depends_on('libaio')
depends_on('[email protected]')
depends_on('[email protected]: +context +fiber +filesystem +system +atomic +chrono +serialization +signals +pic +regex')
depends_on('libfabric fabrics=sockets,tcp,udp,verbs',
Expand Down
1 change: 1 addition & 0 deletions ci/hermes/packages/hermes_shm/package.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class HermesShm(CMakePackage):
depends_on('mpi')
depends_on('cereal')
depends_on('yaml-cpp')
depends_on('libaio')
depends_on('[email protected]')
depends_on('[email protected]: +context +fiber +filesystem +system +atomic +chrono +serialization +signals +pic +regex')
depends_on('libfabric fabrics=sockets,tcp,udp,verbs',
Expand Down
57 changes: 57 additions & 0 deletions ci/hermes/packages/libaio/package.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Copyright 2013-2023 Lawrence Livermore National Security, LLC and other
# Spack Project Developers. See the top-level COPYRIGHT file for details.
#
# SPDX-License-Identifier: (Apache-2.0 OR MIT)


from spack.package import *


class Libaio(MakefilePackage):
"""Linux native Asynchronous I/O interface library.
AIO enables even a single application thread to overlap I/O operations
with other processing, by providing an interface for submitting one or
more I/O requests in one system call (io_submit()) without waiting for
completion, and a separate interface (io_getevents()) to reap completed
I/O operations associated with a given completion group.
"""

homepage = "http://lse.sourceforge.net/io/aio.html"
url = (
"https://debian.inf.tu-dresden.de/debian/pool/main/liba/libaio/libaio_0.3.110.orig.tar.gz"
)

version("0.3.113", sha256="2c44d1c5fd0d43752287c9ae1eb9c023f04ef848ea8d4aafa46e9aedb678200b")
version("0.3.110", sha256="e019028e631725729376250e32b473012f7cb68e1f7275bfc1bbcdd0f8745f7e")

conflicts("platform=darwin", msg="libaio is a linux specific library")

@property
def install_targets(self):
return ["prefix={0}".format(self.spec.prefix), "install"]

def set_include(self, env, path):
env.append_flags('CFLAGS', '-I{}'.format(path))
env.append_flags('CXXFLAGS', '-I{}'.format(path))
env.prepend_path('INCLUDE', '{}'.format(path))
env.prepend_path('CPATH', '{}'.format(path))

def set_lib(self, env, path):
env.prepend_path('LIBRARY_PATH', path)
env.prepend_path('LD_LIBRARY_PATH', path)
env.append_flags('LDFLAGS', '-L{}'.format(path))
env.prepend_path('PYTHONPATH', '{}'.format(path))

def set_flags(self, env):
self.set_include(env, '{}/include'.format(self.prefix))
self.set_include(env, '{}/include'.format(self.prefix))
self.set_lib(env, '{}/lib'.format(self.prefix))
self.set_lib(env, '{}/lib64'.format(self.prefix))
env.prepend_path('CMAKE_PREFIX_PATH', '{}/cmake'.format(self.prefix))

def setup_dependent_environment(self, spack_env, run_env, dependent_spec):
self.set_flags(spack_env)

def setup_run_environment(self, env):
self.set_flags(env)
8 changes: 6 additions & 2 deletions config/hermes_server_default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,12 @@ system_view_state_update_interval_ms: 1000

### Runtime orchestration settings
work_orchestrator:
# The number of worker threads to spawn
max_workers: 4
# The max number of dedicated worker threads
max_dworkers: 4
# The max number of overlapping threads
max_oworkers: 32
# The max number of total dedicated cores
owork_per_core: 32

### Queue Manager settings
queue_manager:
Expand Down
8 changes: 6 additions & 2 deletions hrun/config/hrun_server_default.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
### Runtime orchestration settings
work_orchestrator:
# The number of worker threads to spawn
max_workers: 4
# The max number of dedicated worker threads
max_dworkers: 4
# The max number of overlapping threads
max_oworkers: 32
# The max number of total dedicated cores
owork_per_core: 32

### Queue Manager settings
queue_manager:
Expand Down
51 changes: 18 additions & 33 deletions hrun/include/hrun/api/hrun_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -242,64 +242,49 @@ class Client : public ConfigurationManager {
/** Allocate a buffer in a task */
template<int THREAD_MODEL>
HSHM_ALWAYS_INLINE
LPointer<char> AllocateBuffer(size_t size, Task *yield_task) {
LPointer<char> AllocateBufferServer(size_t size, Task *yield_task) {
LPointer<char> p;
// HILOG(kInfo, "Heap size: {}", data_alloc_->GetCurrentlyAllocatedSize());
while (true) {
try {
p = data_alloc_->AllocateLocalPtr<char>(size);
} catch (...) {
p.shm_.SetNull();
}
if (!p.shm_.IsNull()) {
break;
}
HILOG(kInfo, "{} Could not allocate buffer of size {} (2)?", THREAD_MODEL, size);
Yield<THREAD_MODEL>(yield_task);
}
p = main_alloc_->AllocateLocalPtr<char>(size);
return p;
}

/** Allocate a buffer */
template<int THREAD_MODEL>
HSHM_ALWAYS_INLINE
LPointer<char> AllocateBuffer(size_t size) {
// HILOG(kInfo, "{} Heap size: {}", THREAD_MODEL, data_alloc_->GetCurrentlyAllocatedSize());
LPointer<char> AllocateBufferServer(size_t size) {
LPointer<char> p;
while (true) {
try {
p = data_alloc_->AllocateLocalPtr<char>(size);
} catch (...) {
p.shm_.SetNull();
}
if (!p.shm_.IsNull()) {
break;
}
Yield<THREAD_MODEL>();
HILOG(kInfo, "{} Could not allocate buffer of size {} (1)?", THREAD_MODEL, size);
}
p = main_alloc_->AllocateLocalPtr<char>(size);
return p;
}

/** Free a buffer */
HSHM_ALWAYS_INLINE
void FreeBuffer(hipc::Pointer &p) {
// HILOG(kInfo, "Heap size: {}", data_alloc_->GetCurrentlyAllocatedSize());
data_alloc_->Free(p);
auto alloc = HERMES_MEMORY_MANAGER->GetAllocator(p.allocator_id_);
alloc->Free(p);
HILOG(kDebug, "Heap size (1) for {}/{}: {}",
p.allocator_id_.bits_.major_,
p.allocator_id_.bits_.minor_,
data_alloc_->GetCurrentlyAllocatedSize());
}

/** Free a buffer */
HSHM_ALWAYS_INLINE
void FreeBuffer(LPointer<char> &p) {
// HILOG(kInfo, "Heap size: {}", data_alloc_->GetCurrentlyAllocatedSize());
data_alloc_->FreeLocalPtr(p);
auto alloc = HERMES_MEMORY_MANAGER->GetAllocator(p.shm_.allocator_id_);
alloc->FreeLocalPtr(p);
HILOG(kDebug, "Heap size (2) for {}/{}: {}",
alloc->GetId().bits_.major_,
alloc->GetId().bits_.minor_,
data_alloc_->GetCurrentlyAllocatedSize());
}

/** Convert pointer to char* */
template<typename T = char>
HSHM_ALWAYS_INLINE
T* GetDataPointer(const hipc::Pointer &p) {
return data_alloc_->Convert<T, hipc::Pointer>(p);
auto alloc = HERMES_MEMORY_MANAGER->GetAllocator(p.allocator_id_);
return alloc->Convert<T, hipc::Pointer>(p);
}

/** Get a queue by its ID */
Expand Down
8 changes: 6 additions & 2 deletions hrun/include/hrun/config/config_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,12 @@ namespace hrun::config {
* Work orchestrator information defined in server config
* */
struct WorkOrchestratorInfo {
/** Maximum number of workers to spawn */
size_t max_workers_;
/** Maximum number of dedicated workers */
size_t max_dworkers_;
/** Maximum number of overlapping workers */
size_t max_oworkers_;
/** Overlapped workers per core */
size_t owork_per_core_;
};

/**
Expand Down
8 changes: 6 additions & 2 deletions hrun/include/hrun/config/config_server_default.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@
const inline char* kHrunServerDefaultConfigStr =
"### Runtime orchestration settings\n"
"work_orchestrator:\n"
" # The number of worker threads to spawn\n"
" max_workers: 4\n"
" # The max number of dedicated worker threads\n"
" max_dworkers: 4\n"
" # The max number of overlapping threads\n"
" max_oworkers: 32\n"
" # The max number of total dedicated cores\n"
" owork_per_core: 32\n"
"\n"
"### Queue Manager settings\n"
"queue_manager:\n"
Expand Down
11 changes: 9 additions & 2 deletions hrun/include/hrun/queue_manager/queues/hshm_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#define HRUN_INCLUDE_HRUN_QUEUE_MANAGER_HSHM_QUEUE_H_

#include "hrun/queue_manager/queue.h"
#include "mpsc_queue.h"

namespace hrun {

Expand All @@ -24,7 +25,7 @@ struct LaneData {
};

/** Represents a lane tasks can be stored */
typedef hipc::mpsc_queue<LaneData> Lane;
typedef hrun::mpsc_queue<LaneData> Lane;

/** Prioritization of different lanes in the queue */
struct LaneGroup : public PriorityInfo {
Expand Down Expand Up @@ -75,6 +76,12 @@ struct LaneGroup : public PriorityInfo {
return flags_.Any(QUEUE_LONG_RUNNING) || prio_ == 0;
}

/** Check if this group is long-running or ADMIN */
HSHM_ALWAYS_INLINE
bool IsLowLatency() {
return flags_.Any(QUEUE_LOW_LATENCY);
}

/** Get lane */
Lane& GetLane(u32 lane_id) {
return (*lanes_)[lane_id];
Expand Down Expand Up @@ -121,7 +128,7 @@ struct MultiQueueT<Hshm> : public hipc::ShmContainer {
lane_group.lanes_->reserve(prio_info.max_lanes_);
lane_group.prio_ = prio;
for (u32 lane_id = 0; lane_id < lane_group.num_lanes_; ++lane_id) {
lane_group.lanes_->emplace_back(lane_group.depth_);
lane_group.lanes_->emplace_back(lane_group.depth_, id_);
Lane &lane = lane_group.lanes_->back();
lane.flags_ = prio_info.flags_;
}
Expand Down
Loading

0 comments on commit 73e3b76

Please sign in to comment.