Skip to content

Commit

Permalink
adds threaded version for block fetch; verification between the diffe…
Browse files Browse the repository at this point in the history
…rent modes but not verification with the old version
  • Loading branch information
Steve Plaza committed May 29, 2015
1 parent 3ab95ea commit 40148d4
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 64 deletions.
4 changes: 2 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ if (NOT ${BUILDEM_DIR} STREQUAL "None")

set (LIBDVID_DEPS ${jsoncpp_NAME} ${libpng_NAME} ${libcurl_NAME}
${libjpeg_NAME} ${lz4_NAME} ${boost_NAME} ${numpy_NAME})
set (boost_LIBS ${BUILDEM_LIB_DIR}/libboost_system.${BUILDEM_PLATFORM_DYLIB_EXTENSION})
set (boost_LIBS ${BUILDEM_LIB_DIR}/libboost_thread.${BUILDEM_PLATFORM_DYLIB_EXTENSION} ${BUILDEM_LIB_DIR}/libboost_system.${BUILDEM_PLATFORM_DYLIB_EXTENSION})
if (LIBDVID_WRAP_PYTHON)
include (python)
# CMake's built-in PythonInterp module for the FIND_PACKAGE() macro uses
Expand All @@ -95,7 +95,7 @@ else ()
include_directories(${Boost_INCLUDE_DIR})

set (json_LIB jsoncpp)
set (boost_LIBS boost_system)
set (boost_LIBS boost_thread boost_system)
if (LIBDVID_WRAP_PYTHON)
set(boostpython_LIB boost_python)
endif()
Expand Down
2 changes: 1 addition & 1 deletion load_tests/loadtest_sparsegray.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ int main(int argc, char** argv)
{
// call using the ND raw
ScopeTime overall_time;
blocks = libdvid::get_body_blocks(dvid_node, argv[3], argv[4], atoi(argv[5]), 1, false, 1);
blocks = libdvid::get_body_blocks(dvid_node, argv[3], argv[4], atoi(argv[5]), 2, false, 1);
}

{
Expand Down
3 changes: 2 additions & 1 deletion src/DVIDConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ int DVIDConnection::make_request(string endpoint, ConnectionMethod method,
headers = curl_slist_append(headers, "Content-Type: application/octet-stream");
}
curl_easy_setopt(curl_connection, CURLOPT_HTTPHEADER, headers);

curl_easy_setopt(curl_connection, CURLOPT_NOSIGNAL, 1);

// load url
string url = get_uri_root() + endpoint;
curl_easy_setopt(curl_connection, CURLOPT_URL, url.c_str());
Expand Down
194 changes: 134 additions & 60 deletions src/DVIDThreadedFetch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,69 +2,40 @@
#include <libdvid/DVIDException.h>

#include <vector>
#include <boost/thread/thread.hpp>

using std::string;
using std::vector;

//! Max blocks to request at one tiem
static const int MAX_BLOCKS = 4096;


namespace libdvid {

vector<BinaryDataPtr> get_body_blocks(DVIDNodeService& service, string labelvol_name,
string grayscale_name, uint64 bodyid, int num_threads,
bool use_blocks, int request_efficiency)
{
vector<BlockXYZ> blockcoords;

if (!service.get_coarse_body(labelvol_name, bodyid, blockcoords)) {
throw ErrMsg("Body not found, no grayscale blocks could be retrieved");
}

int num_requests = 0;

vector<BinaryDataPtr> blocks;

uint8* blockdata = 0;
if ((request_efficiency == 1) && !use_blocks) {
blockdata = new uint8[DEFBLOCKSIZE*DEFBLOCKSIZE*DEFBLOCKSIZE];
}

// !! probably unnecessary copying going on
// iterate through block coords and call ND or blocks one by one or contig
int xmin;
int curr_runlength = 0;
for (unsigned int i = 0; i < blockcoords.size(); ++i) {
int z = blockcoords[i].z;
int y = blockcoords[i].y;
int x = blockcoords[i].x;
if (curr_runlength == 0) {
xmin = x;
}
curr_runlength += 1;

bool requestblocks = false;

if (request_efficiency == 0) {
// if fetching 1 by 1 always request
requestblocks = true;
} else if (curr_runlength == MAX_BLOCKS) {
// if there are too many blocks to fetch
requestblocks = true;
} else if (i == (blockcoords.size()-1)) {
// if there are no more blocks fetch
requestblocks = true;
} else if (i < (blockcoords.size()-1)) {
// if y or z are different or x is non-contiguous time to fetch
if ((blockcoords[i+1].z != z) || (blockcoords[i+1].y != y) ||
(((blockcoords[i+1].x)) != (x+1))) {
requestblocks = true;
}
struct FetchGrayBlocks {
FetchGrayBlocks(DVIDNodeService& service_, string grayscale_name_,
bool use_blocks_, int request_efficiency_, int start_, int count_,
vector<vector<int> >* spans_, vector<BinaryDataPtr>* blocks_) :
service(service_), grayscale_name(grayscale_name_),
use_blocks(use_blocks_), request_efficiency(request_efficiency_),
start(start_), count(count_), spans(spans_), blocks(blocks_) {}

void operator()()
{
uint8* blockdata = 0;
if ((request_efficiency == 1) && !use_blocks) {
blockdata = new uint8[DEFBLOCKSIZE*DEFBLOCKSIZE*DEFBLOCKSIZE];
}
// iterate only for the threads parts
for (int index = start; index < (start+count); ++index) {
// load span info
vector<int> span = (*spans)[index];
int xmin = span[0];
int y = span[1];
int z = span[2];
int curr_runlength = span[3];
int block_index = span[4];

if (requestblocks) {
++num_requests;
if (use_blocks) {
// use block interface (currently most re-copy)
vector<int> block_coords;
Expand All @@ -74,7 +45,8 @@ vector<BinaryDataPtr> get_body_blocks(DVIDNodeService& service, string labelvol_
GrayscaleBlocks blocks2 = service.get_grayblocks(grayscale_name, block_coords, curr_runlength);
for (int j = 0; j < curr_runlength; ++j) {
BinaryDataPtr ptr = BinaryData::create_binary_data((const char*)blocks2[j], DEFBLOCKSIZE*DEFBLOCKSIZE*DEFBLOCKSIZE);
blocks.push_back(ptr);
(*blocks)[block_index] = ptr;
++block_index;
}
} else {
Dims_t dims;
Expand All @@ -88,13 +60,14 @@ vector<BinaryDataPtr> get_body_blocks(DVIDNodeService& service, string labelvol_

Grayscale3D grayvol = service.get_gray3D(grayscale_name,
dims, offset, false);

if (curr_runlength == 1) {
// do a simple copy for just one block
blocks.push_back(grayvol.get_binary());
(*blocks)[block_index] = grayvol.get_binary();
++block_index;
} else {
const uint8* raw_data = grayvol.get_raw();

// otherwise create a buffer and do something more complicated
for (int j = 0; j < curr_runlength; ++j) {
int offsetx = j * DEFBLOCKSIZE;
Expand All @@ -115,20 +88,121 @@ vector<BinaryDataPtr> get_body_blocks(DVIDNodeService& service, string labelvol_
}
}
BinaryDataPtr ptr = BinaryData::create_binary_data((const char*) blockdata, DEFBLOCKSIZE*DEFBLOCKSIZE*DEFBLOCKSIZE);
blocks.push_back(ptr);
(*blocks)[block_index] = ptr;
++block_index;
}
}
}
}

curr_runlength = 0;
if (blockdata) {
delete []blockdata;
}
}


DVIDNodeService service;
string grayscale_name;
bool use_blocks;
int request_efficiency;
int start; int count;
vector<vector<int> >* spans;
vector<BinaryDataPtr>* blocks;
};




vector<BinaryDataPtr> get_body_blocks(DVIDNodeService& service, string labelvol_name,
string grayscale_name, uint64 bodyid, int num_threads,
bool use_blocks, int request_efficiency)
{
vector<BlockXYZ> blockcoords;
vector<vector<int> > spans;

if (!service.get_coarse_body(labelvol_name, bodyid, blockcoords)) {
throw ErrMsg("Body not found, no grayscale blocks could be retrieved");
}

int num_requests = 0;

vector<BinaryDataPtr> blocks;



// !! probably unnecessary copying going on
// iterate through block coords and call ND or blocks one by one or contig
int xmin;
int curr_runlength = 0;
int start_index = 0;
for (unsigned int i = 0; i < blockcoords.size(); ++i) {
int z = blockcoords[i].z;
int y = blockcoords[i].y;
int x = blockcoords[i].x;
if (curr_runlength == 0) {
xmin = x;
}
curr_runlength += 1;

bool requestblocks = false;

if (request_efficiency == 0) {
// if fetching 1 by 1 always request
requestblocks = true;
} else if (curr_runlength == MAX_BLOCKS) {
// if there are too many blocks to fetch
requestblocks = true;
} else if (i == (blockcoords.size()-1)) {
// if there are no more blocks fetch
requestblocks = true;
} else if (i < (blockcoords.size()-1)) {
// if y or z are different or x is non-contiguous time to fetch
if ((blockcoords[i+1].z != z) || (blockcoords[i+1].y != y) ||
(((blockcoords[i+1].x)) != (x+1))) {
requestblocks = true;
}
}

if (requestblocks) {
++num_requests;

// load into queue
vector<int> span;
span.push_back(xmin);
span.push_back(y);
span.push_back(z);
span.push_back(curr_runlength);
span.push_back(start_index);
start_index += curr_runlength;
spans.push_back(span);
curr_runlength = 0;
}
}

if (blockdata) {
delete []blockdata;
// launch threads
boost::thread_group threads;

if (num_requests < num_threads) {
num_threads = num_requests;
}
blocks.resize(start_index);

int incr = num_requests / num_threads;
int start = 0;
int count_check = 0;

for (int i = 0; i < num_threads; ++i) {
int count = incr;
if (i == (num_threads-1)) {
count = num_requests - start;
}
count_check += count;
threads.create_thread(FetchGrayBlocks(service, grayscale_name,
use_blocks, request_efficiency, start, count, &spans, &blocks));
start += incr;
}
threads.join_all();
assert(count_check == num_requests);
std::cout << "Performed " << num_requests << " requests" << std::endl;
return blocks;
}
Expand Down
7 changes: 7 additions & 0 deletions todo
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
-parallelize body fetch (see if there is speedup in memory, out of memory) --
verify
-parallelize tile fetch (see if there is speedup in memory, out of memory) --
have option in current tile loadtest -- verify





-add more correct-by-compilation
Expand Down

0 comments on commit 40148d4

Please sign in to comment.