diff --git a/cpp/include/cuvs/neighbors/hnsw.h b/cpp/include/cuvs/neighbors/hnsw.h index a7597a939..d88fd3b4e 100644 --- a/cpp/include/cuvs/neighbors/hnsw.h +++ b/cpp/include/cuvs/neighbors/hnsw.h @@ -51,9 +51,9 @@ struct cuvsHnswIndexParams { /** Size of the candidate list during hierarchy construction when hierarchy is `CPU`*/ int ef_construction; /** Number of host threads to use to construct hierarchy when hierarchy is `CPU` - NOTE: Constructing the hierarchy when converting from a CAGRA graph is highly sensitive - to parallelism, and increasing the number of threads can reduce the quality of the index. - */ + When the value is 0, the number of threads is automatically determined to the maximum + number of threads available. + */ int num_threads; }; @@ -158,8 +158,8 @@ cuvsError_t cuvsHnswExtendParamsDestroy(cuvsHnswExtendParams_t params); * NOTE: When hierarchy is: * 1. `NONE`: This method uses the filesystem to write the CAGRA index in * `/tmp/.bin` before reading it as an hnswlib index, then deleting the temporary - * file. The returned index is immutable and can only be searched by the hnswlib wrapper in cuVS, as - * the format is not compatible with the original hnswlib. + * file. The returned index is immutable and can only be searched by the hnswlib wrapper in cuVS, + * as the format is not compatible with the original hnswlib. * 2. `CPU`: The returned index is mutable and can be extended with additional vectors. The * serialized index is also compatible with the original hnswlib library. * @@ -364,10 +364,10 @@ cuvsError_t cuvsHnswSearch(cuvsResources_t res, /** * @brief Serialize a CAGRA index to a file as an hnswlib index - * NOTE: When hierarchy is `NONE`, the saved hnswlib index is immutable and can only be read by the - * hnswlib wrapper in cuVS, as the serialization format is not compatible with the original hnswlib. - * However, when hierarchy is `CPU`, the saved hnswlib index is compatible with the original hnswlib - * library. + * NOTE: When hierarchy is `NONE`, the saved hnswlib index is immutable and can only be read by + * the hnswlib wrapper in cuVS, as the serialization format is not compatible with the original + * hnswlib. However, when hierarchy is `CPU`, the saved hnswlib index is compatible with the + * original hnswlib library. * * @param[in] res cuvsResources_t opaque C handle * @param[in] filename the name of the file to save the index @@ -406,8 +406,8 @@ cuvsError_t cuvsHnswSerialize(cuvsResources_t res, const char* filename, cuvsHns /** * Load hnswlib index from file which was serialized from a HNSW index. * NOTE: When hierarchy is `NONE`, the loaded hnswlib index is immutable, and only be read by the - * hnswlib wrapper in cuVS, as the serialization format is not compatible with the original hnswlib. - * Experimental, both the API and the serialization format are subject to change. + * hnswlib wrapper in cuVS, as the serialization format is not compatible with the original + * hnswlib. Experimental, both the API and the serialization format are subject to change. * * @code{.c} * #include diff --git a/cpp/include/cuvs/neighbors/hnsw.hpp b/cpp/include/cuvs/neighbors/hnsw.hpp index 81a823493..750f1f87f 100644 --- a/cpp/include/cuvs/neighbors/hnsw.hpp +++ b/cpp/include/cuvs/neighbors/hnsw.hpp @@ -54,10 +54,10 @@ struct index_params : cuvs::neighbors::index_params { /** Size of the candidate list during hierarchy construction when hierarchy is `CPU`*/ int ef_construction = 200; /** Number of host threads to use to construct hierarchy when hierarchy is `CPU` - NOTE: Constructing the hierarchy when converting from a CAGRA graph is highly sensitive - to parallelism, and increasing the number of threads can reduce the quality of the index. + When the value is 0, the number of threads is automatically determined to the + maximum number of threads available. */ - int num_threads = 2; + int num_threads = 0; }; /**@}*/ diff --git a/cpp/src/neighbors/detail/hnsw.hpp b/cpp/src/neighbors/detail/hnsw.hpp index 5447ae07a..07e012349 100644 --- a/cpp/src/neighbors/detail/hnsw.hpp +++ b/cpp/src/neighbors/detail/hnsw.hpp @@ -21,65 +21,13 @@ #include #include #include +#include #include #include #include namespace cuvs::neighbors::hnsw::detail { -// Multithreaded executor -// The helper function is copied from the hnswlib repository -// as for some reason, adding vectors to the hnswlib index does not -// work well with omp parallel for -template -inline void ParallelFor(size_t start, size_t end, size_t numThreads, Function fn) -{ - if (numThreads <= 0) { numThreads = std::thread::hardware_concurrency(); } - - if (numThreads == 1) { - for (size_t id = start; id < end; id++) { - fn(id, 0); - } - } else { - std::vector threads; - std::atomic current(start); - - // keep track of exceptions in threads - // https://stackoverflow.com/a/32428427/1713196 - std::exception_ptr lastException = nullptr; - std::mutex lastExceptMutex; - - for (size_t threadId = 0; threadId < numThreads; ++threadId) { - threads.push_back(std::thread([&, threadId] { - while (true) { - size_t id = current.fetch_add(1); - - if (id >= end) { break; } - - try { - fn(id, threadId); - } catch (...) { - std::unique_lock lastExcepLock(lastExceptMutex); - lastException = std::current_exception(); - /* - * This will work even when current is the largest value that - * size_t can fit, because fetch_add returns the previous value - * before the increment (what will result in overflow - * and produce 0 instead of current + 1). - */ - current = end; - break; - } - } - })); - } - for (auto& thread : threads) { - thread.join(); - } - if (lastException) { std::rethrow_exception(lastException); } - } -} - template struct hnsw_dist_t { using type = void; @@ -223,10 +171,12 @@ std::enable_if_t>> fro cagra_index.graph().extent(1) / 2, params.ef_construction); appr_algo->base_layer_init = false; // tell hnswlib to build upper layers only - ParallelFor(0, host_dataset_view.extent(0), params.num_threads, [&](size_t i, size_t threadId) { + auto num_threads = params.num_threads == 0 ? omp_get_max_threads() : params.num_threads; +#pragma omp parallel for num_threads(num_threads) + for (int64_t i = 0; i < host_dataset_view.extent(0); i++) { appr_algo->addPoint((void*)(host_dataset_view.data_handle() + i * host_dataset_view.extent(1)), i); - }); + } appr_algo->base_layer_init = true; // reset to true to allow addition of new points // move cagra graph to host @@ -242,11 +192,13 @@ std::enable_if_t>> fro // copy cagra graph to hnswlib base layer #pragma omp parallel for for (size_t i = 0; i < static_cast(host_graph.extent(0)); ++i) { - auto ll_i = appr_algo->get_linklist0(i); + auto hnsw_internal_id = appr_algo->label_lookup_.find(i)->second; + auto ll_i = appr_algo->get_linklist0(hnsw_internal_id); appr_algo->setListCount(ll_i, host_graph.extent(1)); auto* data = (uint32_t*)(ll_i + 1); for (size_t j = 0; j < static_cast(host_graph.extent(1)); ++j) { - data[j] = host_graph(i, j); + auto neighbor_internal_id = appr_algo->label_lookup_.find(host_graph(i, j))->second; + data[j] = neighbor_internal_id; } } @@ -281,19 +233,15 @@ void extend(raft::resources const& res, const_cast(idx.get_index())); auto current_element_count = hnswlib_index->getCurrentElementCount(); auto new_element_count = additional_dataset.extent(0); - auto num_threads = params.num_threads == 0 ? std::thread::hardware_concurrency() - : static_cast(params.num_threads); + auto num_threads = params.num_threads == 0 ? omp_get_max_threads() : params.num_threads; hnswlib_index->resizeIndex(current_element_count + new_element_count); - ParallelFor(current_element_count, - current_element_count + new_element_count, - num_threads, - [&](size_t i, size_t threadId) { - hnswlib_index->addPoint( - (void*)(additional_dataset.data_handle() + - (i - current_element_count) * additional_dataset.extent(1)), - i); - }); +#pragma omp parallel for num_threads(num_threads) + for (int64_t i = 0; i < additional_dataset.extent(0); i++) { + hnswlib_index->addPoint( + (void*)(additional_dataset.data_handle() + i * additional_dataset.extent(1)), + current_element_count + i); + } } template diff --git a/cpp/src/neighbors/hnsw_c.cpp b/cpp/src/neighbors/hnsw_c.cpp index 0233a510a..628d87e00 100644 --- a/cpp/src/neighbors/hnsw_c.cpp +++ b/cpp/src/neighbors/hnsw_c.cpp @@ -123,7 +123,7 @@ extern "C" cuvsError_t cuvsHnswIndexParamsCreate(cuvsHnswIndexParams_t* params) { return cuvs::core::translate_exceptions([=] { *params = new cuvsHnswIndexParams{ - .hierarchy = cuvsHnswHierarchy::NONE, .ef_construction = 200, .num_threads = 2}; + .hierarchy = cuvsHnswHierarchy::NONE, .ef_construction = 200, .num_threads = 0}; }); } diff --git a/python/cuvs/cuvs/neighbors/hnsw/hnsw.pyx b/python/cuvs/cuvs/neighbors/hnsw/hnsw.pyx index 4c44350e8..72a3617bd 100644 --- a/python/cuvs/cuvs/neighbors/hnsw/hnsw.pyx +++ b/python/cuvs/cuvs/neighbors/hnsw/hnsw.pyx @@ -52,12 +52,10 @@ cdef class IndexParams: ef_construction : int, default = 200 (optional) Maximum number of candidate list size used during construction when hierarchy is `cpu`. - num_threads : int, default = 2 (optional) + num_threads : int, default = 0 (optional) Number of CPU threads used to increase construction parallelism - when hierarchy is `cpu`. - NOTE: Constructing the hierarchy when converting from a CAGRA graph - is highly sensitive to parallelism, and increasing the number of - threads can reduce the quality of the index. + when hierarchy is `cpu`. When the value is 0, the number of threads is + automatically determined to the maximum number of threads available. """ cdef cuvsHnswIndexParams* params @@ -71,7 +69,7 @@ cdef class IndexParams: def __init__(self, *, hierarchy="none", ef_construction=200, - num_threads=2): + num_threads=0): if hierarchy == "none": self.params.hierarchy = cuvsHnswHierarchy.NONE elif hierarchy == "cpu": diff --git a/python/cuvs/cuvs/tests/test_hnsw.py b/python/cuvs/cuvs/tests/test_hnsw.py index e00b88bb7..23a0920ef 100644 --- a/python/cuvs/cuvs/tests/test_hnsw.py +++ b/python/cuvs/cuvs/tests/test_hnsw.py @@ -54,7 +54,7 @@ def run_hnsw_build_search_test( assert index.trained - hnsw_params = hnsw.IndexParams(hierarchy=hierarchy, num_threads=1) + hnsw_params = hnsw.IndexParams(hierarchy=hierarchy) hnsw_index = hnsw.from_cagra(hnsw_params, index) queries = generate_data((n_queries, n_cols), dtype) @@ -135,7 +135,7 @@ def run_hnsw_extend_test( assert index.trained - hnsw_params = hnsw.IndexParams(hierarchy="cpu", num_threads=1) + hnsw_params = hnsw.IndexParams(hierarchy="cpu") hnsw_index = hnsw.from_cagra(hnsw_params, index) hnsw.extend(hnsw.ExtendParams(), hnsw_index, add_dataset) @@ -158,7 +158,6 @@ def run_hnsw_extend_test( skl_dist, skl_idx = nn_skl.kneighbors(queries, return_distance=True) recall = calc_recall(out_idx, skl_idx) - print(recall) assert recall > 0.95 diff --git a/python/cuvs_bench/cuvs_bench/config/algos/cuvs_cagra_hnswlib.yaml b/python/cuvs_bench/cuvs_bench/config/algos/cuvs_cagra_hnswlib.yaml index 90a561bca..630dc94ff 100644 --- a/python/cuvs_bench/cuvs_bench/config/algos/cuvs_cagra_hnswlib.yaml +++ b/python/cuvs_bench/cuvs_bench/config/algos/cuvs_cagra_hnswlib.yaml @@ -1,5 +1,6 @@ name: cuvs_cagra_hnswlib constraints: + build: cuvs_bench.config.algos.constraints.cuvs_cagra_build search: cuvs_bench.config.algos.constraints.hnswlib_search groups: base: @@ -9,6 +10,5 @@ groups: graph_build_algo: ["NN_DESCENT"] hierarchy: ["none", "cpu"] ef_construction: [64, 128, 256, 512] - num_threads: [2, 5, 10] search: ef: [10, 20, 40, 60, 80, 120, 200, 400, 600, 800]