From c732a25fa940c015c09a4e3180a6084e03d25750 Mon Sep 17 00:00:00 2001 From: Joongi Kim Date: Wed, 13 Jan 2016 15:51:10 +0900 Subject: [PATCH] refs #6: Change check watcher to prepare watcher and code clean up. * In libev, check watchers are called when there ARE pending events to execute while prepare watcher are called when ev_run() is about to BLOCK due to no pending events. The timing when we need to flush any pending tasks is the latter, not the former. In my tests, using check watchers occaisionally gives performance fluctuation more than 50% (about once per 5-10 seconds), while prepare watchers do not have such symptoms. * Removes no longer used semi-coalesced codes in offloadtask.cc. --- include/nba/framework/threadcontext.hh | 2 +- src/lib/elementgraph.cc | 3 +- src/lib/io.cc | 14 ++- src/lib/offloadtask.cc | 135 ++----------------------- 4 files changed, 20 insertions(+), 134 deletions(-) diff --git a/include/nba/framework/threadcontext.hh b/include/nba/framework/threadcontext.hh index 4a26564..51bb47e 100644 --- a/include/nba/framework/threadcontext.hh +++ b/include/nba/framework/threadcontext.hh @@ -185,7 +185,7 @@ public: struct rte_ring *task_completion_queue; /* to receive completed offload tasks */ struct ev_async *task_completion_watcher; - struct ev_check *check_watcher; + struct ev_prepare *prepare_watcher; } __cache_aligned; struct coproc_thread_context { diff --git a/src/lib/elementgraph.cc b/src/lib/elementgraph.cc index 487cc67..2b2d42d 100644 --- a/src/lib/elementgraph.cc +++ b/src/lib/elementgraph.cc @@ -533,8 +533,7 @@ void ElementGraph::process_batch(PacketBatch *batch) void ElementGraph::process_offload_task(OffloadTask *otask) { - Element *current_elem = otask->tracker.element; - OffloadableElement *offloadable = dynamic_cast(current_elem); + OffloadableElement *offloadable = otask->elem; assert(offloadable->offload(this, otask, otask->tracker.input_port) == 0); } diff --git a/src/lib/io.cc b/src/lib/io.cc index 833e3ba..4dbb236 100644 --- a/src/lib/io.cc +++ b/src/lib/io.cc @@ -94,8 +94,14 @@ static void comp_task_init(struct rte_mempool *mp, void *arg, void *obj, unsigne OffloadTask *t = (OffloadTask *) obj; new (t) OffloadTask(); } -static void comp_check_cb(struct ev_loop *loop, struct ev_check *watcher, int revents) + +static void comp_prepare_cb(struct ev_loop *loop, struct ev_prepare *watcher, int revents) { + /* This routine is called when ev_run() is about to block. + * (i.e., there is no outstanding events) + * Calling this there allows to check any pending tasks so that + * we could eventually release any resources such as batch objects + * and allow other routines waiting for their releases to continue. */ io_thread_context *io_ctx = (io_thread_context *) ev_userdata(loop); comp_thread_context *ctx = io_ctx->comp_ctx; ctx->elem_graph->flush_tasks(); @@ -734,10 +740,10 @@ int io_loop(void *arg) } /* Register per-iteration check event. */ - ctx->comp_ctx->check_watcher = (struct ev_check *) rte_malloc_socket(nullptr, sizeof(struct ev_check), + ctx->comp_ctx->prepare_watcher = (struct ev_prepare *) rte_malloc_socket(nullptr, sizeof(struct ev_prepare), CACHE_LINE_SIZE, ctx->loc.node_id); - ev_check_init(ctx->comp_ctx->check_watcher, comp_check_cb); - ev_check_start(ctx->loop, ctx->comp_ctx->check_watcher); + ev_prepare_init(ctx->comp_ctx->prepare_watcher, comp_prepare_cb); + ev_prepare_start(ctx->loop, ctx->comp_ctx->prepare_watcher); /* ==== END_OF_COMP ====*/ diff --git a/src/lib/offloadtask.cc b/src/lib/offloadtask.cc index d7e6897..3851a0a 100644 --- a/src/lib/offloadtask.cc +++ b/src/lib/offloadtask.cc @@ -19,9 +19,6 @@ using namespace std; using namespace nba; -#define COALESCED_COPY -#undef DEBUG_HOSTSIDE - static thread_local char dummy_buffer[NBA_MAX_PACKET_SIZE] = {0,}; OffloadTask::OffloadTask() @@ -77,14 +74,12 @@ OffloadTask::~OffloadTask() void OffloadTask::prepare_read_buffer() { - #ifdef COALESCED_COPY // write: host-to-device input // read: device-to-host output cctx->get_input_current_pos(io_base, &host_write_begin, &dev_write_begin); cctx->get_output_current_pos(io_base, &host_read_begin, &dev_read_begin); input_alloc_size_begin = cctx->get_input_size(io_base); output_alloc_size_begin = cctx->get_output_size(io_base); - #endif _debug_print_inb("at-beginning", nullptr, 0); _debug_print_outb("at-beginning", nullptr, 0); @@ -203,12 +198,6 @@ bool OffloadTask::copy_h2d() _debug_print_inb("copy_h2d.dbarray", nullptr, 0); assert(dbarray_h != nullptr); - #ifndef COALESCED_COPY - void *item_info_h = nullptr; - memory_t item_info_d; - size_t item_info_size = 0; - #endif - for (int dbid : datablocks) { int dbid_d = dbid_h2d[dbid]; dbarray_h[dbid_d].total_item_count_in = 0; @@ -230,13 +219,6 @@ bool OffloadTask::copy_h2d() /* We need to copy the size array because each item may * have different lengths. */ assert(t->aligned_item_sizes_h != nullptr); - #ifndef COALESCED_COPY - if (item_info_h == nullptr) { - item_info_h = t->aligned_item_sizes_h; - item_info_d = t->aligned_item_sizes_d; - } - item_info_size += ALIGN_CEIL(sizeof(struct item_size_info), CACHE_LINE_SIZE); - #endif dbarray_h[dbid_d].item_sizes_in[b] = (uint16_t *) ((char *) t->aligned_item_sizes_d.ptr + (uintptr_t) offsetof(struct item_size_info, sizes)); dbarray_h[dbid_d].item_sizes_out[b] = (uint16_t *) ((char *) t->aligned_item_sizes_d.ptr @@ -263,65 +245,7 @@ bool OffloadTask::copy_h2d() b++; } /* endfor(batches) */ } /* endfor(dbid) */ - - #ifndef COALESCED_COPY - cctx->enqueue_memwrite_op(item_info_h, item_info_d, 0, item_info_size); - // FIXME: hacking by knowing internal behaviour of cuda_mempool... - cctx->enqueue_memwrite_op(dbarray_h, dbarray_d, 0, dbarray_size); - #endif - has_h2d_copies = true; - - /* Coalesced H2D data copy. - * We need to check and copy not-yet-tranferred-to-GPU buffers one by - * one, but it causes high device API call overheads. - * We aggregate continuous copies to reduce the number of API calls. - * If there are no reused datablocks, all copies are shrinked into a - * single API call. */ - #ifndef COALESCED_COPY - void *first_host_in_ptr = nullptr; - memory_t first_dev_in_ptr; - size_t total_size = 0; - for (int dbid : datablocks) { - if (elemgraph->check_preproc(elem, dbid)) { - for (PacketBatch *batch : batches) { - struct datablock_tracker *t = &batch->datablock_states[dbid]; - if (t == nullptr || t->host_in_ptr == nullptr) { - if (first_host_in_ptr != nullptr) { - /* Discontinued copy. */ - cctx->enqueue_memwrite_op(first_host_in_ptr, first_dev_in_ptr, - 0, total_size); - /* Reset. */ - first_host_in_ptr = nullptr; - total_size = 0; - } - continue; - } - if (t->in_count == 0) assert(t->in_size == 0); - if (t->in_count > 0) assert(t->in_size > 0); - /* IMPORTANT: IO buffer allocations are aligned by cache line size!! */ - if (first_host_in_ptr == nullptr) { - first_host_in_ptr = t->host_in_ptr; - first_dev_in_ptr = t->dev_in_ptr; - } - if ((char*) first_host_in_ptr + (uintptr_t) total_size - != (char*) t->host_in_ptr) - { - cctx->enqueue_memwrite_op(first_host_in_ptr, first_dev_in_ptr, - 0, total_size); - first_host_in_ptr = t->host_in_ptr; - total_size = ALIGN_CEIL(t->in_size, CACHE_LINE_SIZE); - } else { - total_size += ALIGN_CEIL(t->in_size, CACHE_LINE_SIZE); - } - } - } /* endif(check_preproc) */ - } /* endfor(dbid) */ - if (first_host_in_ptr != nullptr) { - /* Finished copy. */ - cctx->enqueue_memwrite_op(first_host_in_ptr, first_dev_in_ptr, 0, total_size); - } - #endif return has_h2d_copies; } @@ -377,12 +301,9 @@ void OffloadTask::execute() } batch_id ++; } - #ifdef COALESCED_COPY + size_t last_alloc_size = cctx->get_input_size(io_base); cctx->enqueue_memwrite_op(host_write_begin, dev_write_begin, 0, last_alloc_size - input_alloc_size_begin); - #else - cctx->enqueue_memwrite_op(batch_ids_h, batch_ids_d, 0, ALIGN_CEIL(sizeof(uint16_t) * all_item_count, CACHE_LINE_SIZE) * 2); - #endif cctx->clear_checkbits(); cctx->clear_kernel_args(); @@ -406,8 +327,13 @@ void OffloadTask::execute() arg = {(void *) &checkbits_d, sizeof(void *), alignof(void *)}; cctx->push_kernel_arg(arg); - offload_compute_handler &handler = elem->offload_compute_handlers[cctx->type_name]; - handler(cctx, &res); + //offload_compute_handler &handler = elem->offload_compute_handlers[cctx->type_name]; + //handler(cctx, &res); + /* Skip kernel execution. */ + res.num_workitems = 0; + res.num_threads_per_workgroup = 1; + res.num_workgroups = 1; + cctx->get_host_checkbits()[0] = 1; } else { @@ -424,54 +350,9 @@ bool OffloadTask::copy_d2h() state = TASK_D2H_COPYING; /* Coalesced D2H data copy. */ - #ifdef COALESCED_COPY size_t last_alloc_size = cctx->get_output_size(io_base); cctx->enqueue_memread_op(host_read_begin, dev_read_begin, 0, last_alloc_size - output_alloc_size_begin); - #else - void *first_host_out_ptr = nullptr; - memory_t first_dev_out_ptr; - size_t total_size = 0; - for (int dbid : datablocks) { - if (elemgraph->check_postproc(elem, dbid)) { - DataBlock *db = comp_ctx->datablock_registry[dbid]; - for (PacketBatch *batch : batches) { - struct datablock_tracker *t = &batch->datablock_states[dbid]; - if (t == nullptr || t->host_out_ptr == nullptr - || t->out_count == 0 || t->out_size == 0) - { - if (first_host_out_ptr != nullptr) { - /* Discontinued copy. */ - cctx->enqueue_memread_op(first_host_out_ptr, first_dev_out_ptr, 0, total_size); - /* Reset. */ - first_host_out_ptr = nullptr; - total_size = 0; - } - continue; - } - //if (t->out_count == 0) assert(t->out_size == 0); - //if (t->out_count > 0) assert(t->out_size > 0); - if (first_host_out_ptr == nullptr) { - first_host_out_ptr = t->host_out_ptr; - first_dev_out_ptr = t->dev_out_ptr; - } - if ((char*) first_host_out_ptr + (uintptr_t) total_size - != (char*) t->host_out_ptr) - { - cctx->enqueue_memread_op(first_host_out_ptr, first_dev_out_ptr, 0, total_size); - first_host_out_ptr = t->host_out_ptr; - total_size = ALIGN_CEIL(t->out_size, CACHE_LINE_SIZE); - } else { - total_size += ALIGN_CEIL(t->out_size, CACHE_LINE_SIZE); - } - } - } /* endif(check_postproc) */ - } /* endfor(dbid) */ - if (first_host_out_ptr != nullptr) { - /* Finished copy. */ - cctx->enqueue_memread_op(first_host_out_ptr, first_dev_out_ptr, 0, total_size); - } - #endif return true; }