Skip to content

Commit

Permalink
refs #6: Change check watcher to prepare watcher and code clean up.
Browse files Browse the repository at this point in the history
 * In libev, check watchers are called when there ARE pending events to
   execute while prepare watcher are called when ev_run() is about to
   BLOCK due to no pending events.
   The timing when we need to flush any pending tasks is the latter,
   not the former.

   In my tests, using check watchers occaisionally gives performance
   fluctuation more than 50% (about once per 5-10 seconds), while
   prepare watchers do not have such symptoms.

 * Removes no longer used semi-coalesced codes in offloadtask.cc.
  • Loading branch information
achimnol committed Jan 13, 2016
1 parent 26922c9 commit c732a25
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 134 deletions.
2 changes: 1 addition & 1 deletion include/nba/framework/threadcontext.hh
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ public:

struct rte_ring *task_completion_queue; /* to receive completed offload tasks */
struct ev_async *task_completion_watcher;
struct ev_check *check_watcher;
struct ev_prepare *prepare_watcher;
} __cache_aligned;

struct coproc_thread_context {
Expand Down
3 changes: 1 addition & 2 deletions src/lib/elementgraph.cc
Original file line number Diff line number Diff line change
Expand Up @@ -533,8 +533,7 @@ void ElementGraph::process_batch(PacketBatch *batch)

void ElementGraph::process_offload_task(OffloadTask *otask)
{
Element *current_elem = otask->tracker.element;
OffloadableElement *offloadable = dynamic_cast<OffloadableElement*>(current_elem);
OffloadableElement *offloadable = otask->elem;
assert(offloadable->offload(this, otask, otask->tracker.input_port) == 0);
}

Expand Down
14 changes: 10 additions & 4 deletions src/lib/io.cc
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,14 @@ static void comp_task_init(struct rte_mempool *mp, void *arg, void *obj, unsigne
OffloadTask *t = (OffloadTask *) obj;
new (t) OffloadTask();
}
static void comp_check_cb(struct ev_loop *loop, struct ev_check *watcher, int revents)

static void comp_prepare_cb(struct ev_loop *loop, struct ev_prepare *watcher, int revents)
{
/* This routine is called when ev_run() is about to block.
* (i.e., there is no outstanding events)
* Calling this there allows to check any pending tasks so that
* we could eventually release any resources such as batch objects
* and allow other routines waiting for their releases to continue. */
io_thread_context *io_ctx = (io_thread_context *) ev_userdata(loop);
comp_thread_context *ctx = io_ctx->comp_ctx;
ctx->elem_graph->flush_tasks();
Expand Down Expand Up @@ -734,10 +740,10 @@ int io_loop(void *arg)
}

/* Register per-iteration check event. */
ctx->comp_ctx->check_watcher = (struct ev_check *) rte_malloc_socket(nullptr, sizeof(struct ev_check),
ctx->comp_ctx->prepare_watcher = (struct ev_prepare *) rte_malloc_socket(nullptr, sizeof(struct ev_prepare),
CACHE_LINE_SIZE, ctx->loc.node_id);
ev_check_init(ctx->comp_ctx->check_watcher, comp_check_cb);
ev_check_start(ctx->loop, ctx->comp_ctx->check_watcher);
ev_prepare_init(ctx->comp_ctx->prepare_watcher, comp_prepare_cb);
ev_prepare_start(ctx->loop, ctx->comp_ctx->prepare_watcher);

/* ==== END_OF_COMP ====*/

Expand Down
135 changes: 8 additions & 127 deletions src/lib/offloadtask.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@
using namespace std;
using namespace nba;

#define COALESCED_COPY
#undef DEBUG_HOSTSIDE

static thread_local char dummy_buffer[NBA_MAX_PACKET_SIZE] = {0,};

OffloadTask::OffloadTask()
Expand Down Expand Up @@ -77,14 +74,12 @@ OffloadTask::~OffloadTask()

void OffloadTask::prepare_read_buffer()
{
#ifdef COALESCED_COPY
// write: host-to-device input
// read: device-to-host output
cctx->get_input_current_pos(io_base, &host_write_begin, &dev_write_begin);
cctx->get_output_current_pos(io_base, &host_read_begin, &dev_read_begin);
input_alloc_size_begin = cctx->get_input_size(io_base);
output_alloc_size_begin = cctx->get_output_size(io_base);
#endif
_debug_print_inb("at-beginning", nullptr, 0);
_debug_print_outb("at-beginning", nullptr, 0);

Expand Down Expand Up @@ -203,12 +198,6 @@ bool OffloadTask::copy_h2d()
_debug_print_inb("copy_h2d.dbarray", nullptr, 0);
assert(dbarray_h != nullptr);

#ifndef COALESCED_COPY
void *item_info_h = nullptr;
memory_t item_info_d;
size_t item_info_size = 0;
#endif

for (int dbid : datablocks) {
int dbid_d = dbid_h2d[dbid];
dbarray_h[dbid_d].total_item_count_in = 0;
Expand All @@ -230,13 +219,6 @@ bool OffloadTask::copy_h2d()
/* We need to copy the size array because each item may
* have different lengths. */
assert(t->aligned_item_sizes_h != nullptr);
#ifndef COALESCED_COPY
if (item_info_h == nullptr) {
item_info_h = t->aligned_item_sizes_h;
item_info_d = t->aligned_item_sizes_d;
}
item_info_size += ALIGN_CEIL(sizeof(struct item_size_info), CACHE_LINE_SIZE);
#endif
dbarray_h[dbid_d].item_sizes_in[b] = (uint16_t *) ((char *) t->aligned_item_sizes_d.ptr
+ (uintptr_t) offsetof(struct item_size_info, sizes));
dbarray_h[dbid_d].item_sizes_out[b] = (uint16_t *) ((char *) t->aligned_item_sizes_d.ptr
Expand All @@ -263,65 +245,7 @@ bool OffloadTask::copy_h2d()
b++;
} /* endfor(batches) */
} /* endfor(dbid) */

#ifndef COALESCED_COPY
cctx->enqueue_memwrite_op(item_info_h, item_info_d, 0, item_info_size);
// FIXME: hacking by knowing internal behaviour of cuda_mempool...
cctx->enqueue_memwrite_op(dbarray_h, dbarray_d, 0, dbarray_size);
#endif

has_h2d_copies = true;

/* Coalesced H2D data copy.
* We need to check and copy not-yet-tranferred-to-GPU buffers one by
* one, but it causes high device API call overheads.
* We aggregate continuous copies to reduce the number of API calls.
* If there are no reused datablocks, all copies are shrinked into a
* single API call. */
#ifndef COALESCED_COPY
void *first_host_in_ptr = nullptr;
memory_t first_dev_in_ptr;
size_t total_size = 0;
for (int dbid : datablocks) {
if (elemgraph->check_preproc(elem, dbid)) {
for (PacketBatch *batch : batches) {
struct datablock_tracker *t = &batch->datablock_states[dbid];
if (t == nullptr || t->host_in_ptr == nullptr) {
if (first_host_in_ptr != nullptr) {
/* Discontinued copy. */
cctx->enqueue_memwrite_op(first_host_in_ptr, first_dev_in_ptr,
0, total_size);
/* Reset. */
first_host_in_ptr = nullptr;
total_size = 0;
}
continue;
}
if (t->in_count == 0) assert(t->in_size == 0);
if (t->in_count > 0) assert(t->in_size > 0);
/* IMPORTANT: IO buffer allocations are aligned by cache line size!! */
if (first_host_in_ptr == nullptr) {
first_host_in_ptr = t->host_in_ptr;
first_dev_in_ptr = t->dev_in_ptr;
}
if ((char*) first_host_in_ptr + (uintptr_t) total_size
!= (char*) t->host_in_ptr)
{
cctx->enqueue_memwrite_op(first_host_in_ptr, first_dev_in_ptr,
0, total_size);
first_host_in_ptr = t->host_in_ptr;
total_size = ALIGN_CEIL(t->in_size, CACHE_LINE_SIZE);
} else {
total_size += ALIGN_CEIL(t->in_size, CACHE_LINE_SIZE);
}
}
} /* endif(check_preproc) */
} /* endfor(dbid) */
if (first_host_in_ptr != nullptr) {
/* Finished copy. */
cctx->enqueue_memwrite_op(first_host_in_ptr, first_dev_in_ptr, 0, total_size);
}
#endif
return has_h2d_copies;
}

Expand Down Expand Up @@ -377,12 +301,9 @@ void OffloadTask::execute()
}
batch_id ++;
}
#ifdef COALESCED_COPY

size_t last_alloc_size = cctx->get_input_size(io_base);
cctx->enqueue_memwrite_op(host_write_begin, dev_write_begin, 0, last_alloc_size - input_alloc_size_begin);
#else
cctx->enqueue_memwrite_op(batch_ids_h, batch_ids_d, 0, ALIGN_CEIL(sizeof(uint16_t) * all_item_count, CACHE_LINE_SIZE) * 2);
#endif

cctx->clear_checkbits();
cctx->clear_kernel_args();
Expand All @@ -406,8 +327,13 @@ void OffloadTask::execute()
arg = {(void *) &checkbits_d, sizeof(void *), alignof(void *)};
cctx->push_kernel_arg(arg);

offload_compute_handler &handler = elem->offload_compute_handlers[cctx->type_name];
handler(cctx, &res);
//offload_compute_handler &handler = elem->offload_compute_handlers[cctx->type_name];
//handler(cctx, &res);
/* Skip kernel execution. */
res.num_workitems = 0;
res.num_threads_per_workgroup = 1;
res.num_workgroups = 1;
cctx->get_host_checkbits()[0] = 1;

} else {

Expand All @@ -424,54 +350,9 @@ bool OffloadTask::copy_d2h()
state = TASK_D2H_COPYING;

/* Coalesced D2H data copy. */
#ifdef COALESCED_COPY
size_t last_alloc_size = cctx->get_output_size(io_base);
cctx->enqueue_memread_op(host_read_begin, dev_read_begin,
0, last_alloc_size - output_alloc_size_begin);
#else
void *first_host_out_ptr = nullptr;
memory_t first_dev_out_ptr;
size_t total_size = 0;
for (int dbid : datablocks) {
if (elemgraph->check_postproc(elem, dbid)) {
DataBlock *db = comp_ctx->datablock_registry[dbid];
for (PacketBatch *batch : batches) {
struct datablock_tracker *t = &batch->datablock_states[dbid];
if (t == nullptr || t->host_out_ptr == nullptr
|| t->out_count == 0 || t->out_size == 0)
{
if (first_host_out_ptr != nullptr) {
/* Discontinued copy. */
cctx->enqueue_memread_op(first_host_out_ptr, first_dev_out_ptr, 0, total_size);
/* Reset. */
first_host_out_ptr = nullptr;
total_size = 0;
}
continue;
}
//if (t->out_count == 0) assert(t->out_size == 0);
//if (t->out_count > 0) assert(t->out_size > 0);
if (first_host_out_ptr == nullptr) {
first_host_out_ptr = t->host_out_ptr;
first_dev_out_ptr = t->dev_out_ptr;
}
if ((char*) first_host_out_ptr + (uintptr_t) total_size
!= (char*) t->host_out_ptr)
{
cctx->enqueue_memread_op(first_host_out_ptr, first_dev_out_ptr, 0, total_size);
first_host_out_ptr = t->host_out_ptr;
total_size = ALIGN_CEIL(t->out_size, CACHE_LINE_SIZE);
} else {
total_size += ALIGN_CEIL(t->out_size, CACHE_LINE_SIZE);
}
}
} /* endif(check_postproc) */
} /* endfor(dbid) */
if (first_host_out_ptr != nullptr) {
/* Finished copy. */
cctx->enqueue_memread_op(first_host_out_ptr, first_dev_out_ptr, 0, total_size);
}
#endif
return true;
}

Expand Down

0 comments on commit c732a25

Please sign in to comment.