Skip to content

Commit

Permalink
Fix indexing bug when using parallelism to build CPU hierarchy in HNSW (
Browse files Browse the repository at this point in the history
#620)

hnswlib uses an internal indexing system which assigns an ID to points, atomically, in-order that they are added to the index. When using parallelism to add points to the index, the internal ID may be different than the "label" of the point (label, for us, is just the index of the row in the dataset) as a consequence of adding points in-parallel in no deterministic order.

The bug was that I was using the label itself to write out the CPU hierarchy, when I should have been using hnswlib's internal ID for the point associated with that label.

Authors:
  - Divye Gala (https://github.com/divyegala)
  - Corey J. Nolet (https://github.com/cjnolet)

Approvers:
  - Corey J. Nolet (https://github.com/cjnolet)

URL: #620
  • Loading branch information
divyegala authored Jan 30, 2025
1 parent 4ca47c9 commit 7609d18
Show file tree
Hide file tree
Showing 7 changed files with 38 additions and 93 deletions.
22 changes: 11 additions & 11 deletions cpp/include/cuvs/neighbors/hnsw.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ struct cuvsHnswIndexParams {
/** Size of the candidate list during hierarchy construction when hierarchy is `CPU`*/
int ef_construction;
/** Number of host threads to use to construct hierarchy when hierarchy is `CPU`
NOTE: Constructing the hierarchy when converting from a CAGRA graph is highly sensitive
to parallelism, and increasing the number of threads can reduce the quality of the index.
*/
When the value is 0, the number of threads is automatically determined to the maximum
number of threads available.
*/
int num_threads;
};

Expand Down Expand Up @@ -158,8 +158,8 @@ cuvsError_t cuvsHnswExtendParamsDestroy(cuvsHnswExtendParams_t params);
* NOTE: When hierarchy is:
* 1. `NONE`: This method uses the filesystem to write the CAGRA index in
* `/tmp/<random_number>.bin` before reading it as an hnswlib index, then deleting the temporary
* file. The returned index is immutable and can only be searched by the hnswlib wrapper in cuVS, as
* the format is not compatible with the original hnswlib.
* file. The returned index is immutable and can only be searched by the hnswlib wrapper in cuVS,
* as the format is not compatible with the original hnswlib.
* 2. `CPU`: The returned index is mutable and can be extended with additional vectors. The
* serialized index is also compatible with the original hnswlib library.
*
Expand Down Expand Up @@ -364,10 +364,10 @@ cuvsError_t cuvsHnswSearch(cuvsResources_t res,

/**
* @brief Serialize a CAGRA index to a file as an hnswlib index
* NOTE: When hierarchy is `NONE`, the saved hnswlib index is immutable and can only be read by the
* hnswlib wrapper in cuVS, as the serialization format is not compatible with the original hnswlib.
* However, when hierarchy is `CPU`, the saved hnswlib index is compatible with the original hnswlib
* library.
* NOTE: When hierarchy is `NONE`, the saved hnswlib index is immutable and can only be read by
* the hnswlib wrapper in cuVS, as the serialization format is not compatible with the original
* hnswlib. However, when hierarchy is `CPU`, the saved hnswlib index is compatible with the
* original hnswlib library.
*
* @param[in] res cuvsResources_t opaque C handle
* @param[in] filename the name of the file to save the index
Expand Down Expand Up @@ -406,8 +406,8 @@ cuvsError_t cuvsHnswSerialize(cuvsResources_t res, const char* filename, cuvsHns
/**
* Load hnswlib index from file which was serialized from a HNSW index.
* NOTE: When hierarchy is `NONE`, the loaded hnswlib index is immutable, and only be read by the
* hnswlib wrapper in cuVS, as the serialization format is not compatible with the original hnswlib.
* Experimental, both the API and the serialization format are subject to change.
* hnswlib wrapper in cuVS, as the serialization format is not compatible with the original
* hnswlib. Experimental, both the API and the serialization format are subject to change.
*
* @code{.c}
* #include <cuvs/core/c_api.h>
Expand Down
6 changes: 3 additions & 3 deletions cpp/include/cuvs/neighbors/hnsw.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ struct index_params : cuvs::neighbors::index_params {
/** Size of the candidate list during hierarchy construction when hierarchy is `CPU`*/
int ef_construction = 200;
/** Number of host threads to use to construct hierarchy when hierarchy is `CPU`
NOTE: Constructing the hierarchy when converting from a CAGRA graph is highly sensitive
to parallelism, and increasing the number of threads can reduce the quality of the index.
When the value is 0, the number of threads is automatically determined to the
maximum number of threads available.
*/
int num_threads = 2;
int num_threads = 0;
};

/**@}*/
Expand Down
84 changes: 16 additions & 68 deletions cpp/src/neighbors/detail/hnsw.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,65 +21,13 @@
#include <hnswlib/hnswalg.h>
#include <hnswlib/hnswlib.h>
#include <memory>
#include <omp.h>
#include <raft/core/logger.hpp>
#include <random>
#include <thread>

namespace cuvs::neighbors::hnsw::detail {

// Multithreaded executor
// The helper function is copied from the hnswlib repository
// as for some reason, adding vectors to the hnswlib index does not
// work well with omp parallel for
template <class Function>
inline void ParallelFor(size_t start, size_t end, size_t numThreads, Function fn)
{
if (numThreads <= 0) { numThreads = std::thread::hardware_concurrency(); }

if (numThreads == 1) {
for (size_t id = start; id < end; id++) {
fn(id, 0);
}
} else {
std::vector<std::thread> threads;
std::atomic<size_t> current(start);

// keep track of exceptions in threads
// https://stackoverflow.com/a/32428427/1713196
std::exception_ptr lastException = nullptr;
std::mutex lastExceptMutex;

for (size_t threadId = 0; threadId < numThreads; ++threadId) {
threads.push_back(std::thread([&, threadId] {
while (true) {
size_t id = current.fetch_add(1);

if (id >= end) { break; }

try {
fn(id, threadId);
} catch (...) {
std::unique_lock<std::mutex> lastExcepLock(lastExceptMutex);
lastException = std::current_exception();
/*
* This will work even when current is the largest value that
* size_t can fit, because fetch_add returns the previous value
* before the increment (what will result in overflow
* and produce 0 instead of current + 1).
*/
current = end;
break;
}
}
}));
}
for (auto& thread : threads) {
thread.join();
}
if (lastException) { std::rethrow_exception(lastException); }
}
}

template <typename T>
struct hnsw_dist_t {
using type = void;
Expand Down Expand Up @@ -223,10 +171,12 @@ std::enable_if_t<hierarchy == HnswHierarchy::CPU, std::unique_ptr<index<T>>> fro
cagra_index.graph().extent(1) / 2,
params.ef_construction);
appr_algo->base_layer_init = false; // tell hnswlib to build upper layers only
ParallelFor(0, host_dataset_view.extent(0), params.num_threads, [&](size_t i, size_t threadId) {
auto num_threads = params.num_threads == 0 ? omp_get_max_threads() : params.num_threads;
#pragma omp parallel for num_threads(num_threads)
for (int64_t i = 0; i < host_dataset_view.extent(0); i++) {
appr_algo->addPoint((void*)(host_dataset_view.data_handle() + i * host_dataset_view.extent(1)),
i);
});
}
appr_algo->base_layer_init = true; // reset to true to allow addition of new points

// move cagra graph to host
Expand All @@ -242,11 +192,13 @@ std::enable_if_t<hierarchy == HnswHierarchy::CPU, std::unique_ptr<index<T>>> fro
// copy cagra graph to hnswlib base layer
#pragma omp parallel for
for (size_t i = 0; i < static_cast<size_t>(host_graph.extent(0)); ++i) {
auto ll_i = appr_algo->get_linklist0(i);
auto hnsw_internal_id = appr_algo->label_lookup_.find(i)->second;
auto ll_i = appr_algo->get_linklist0(hnsw_internal_id);
appr_algo->setListCount(ll_i, host_graph.extent(1));
auto* data = (uint32_t*)(ll_i + 1);
for (size_t j = 0; j < static_cast<size_t>(host_graph.extent(1)); ++j) {
data[j] = host_graph(i, j);
auto neighbor_internal_id = appr_algo->label_lookup_.find(host_graph(i, j))->second;
data[j] = neighbor_internal_id;
}
}

Expand Down Expand Up @@ -281,19 +233,15 @@ void extend(raft::resources const& res,
const_cast<void*>(idx.get_index()));
auto current_element_count = hnswlib_index->getCurrentElementCount();
auto new_element_count = additional_dataset.extent(0);
auto num_threads = params.num_threads == 0 ? std::thread::hardware_concurrency()
: static_cast<size_t>(params.num_threads);
auto num_threads = params.num_threads == 0 ? omp_get_max_threads() : params.num_threads;

hnswlib_index->resizeIndex(current_element_count + new_element_count);
ParallelFor(current_element_count,
current_element_count + new_element_count,
num_threads,
[&](size_t i, size_t threadId) {
hnswlib_index->addPoint(
(void*)(additional_dataset.data_handle() +
(i - current_element_count) * additional_dataset.extent(1)),
i);
});
#pragma omp parallel for num_threads(num_threads)
for (int64_t i = 0; i < additional_dataset.extent(0); i++) {
hnswlib_index->addPoint(
(void*)(additional_dataset.data_handle() + i * additional_dataset.extent(1)),
current_element_count + i);
}
}

template <typename T>
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/neighbors/hnsw_c.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ extern "C" cuvsError_t cuvsHnswIndexParamsCreate(cuvsHnswIndexParams_t* params)
{
return cuvs::core::translate_exceptions([=] {
*params = new cuvsHnswIndexParams{
.hierarchy = cuvsHnswHierarchy::NONE, .ef_construction = 200, .num_threads = 2};
.hierarchy = cuvsHnswHierarchy::NONE, .ef_construction = 200, .num_threads = 0};
});
}

Expand Down
10 changes: 4 additions & 6 deletions python/cuvs/cuvs/neighbors/hnsw/hnsw.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,10 @@ cdef class IndexParams:
ef_construction : int, default = 200 (optional)
Maximum number of candidate list size used during construction
when hierarchy is `cpu`.
num_threads : int, default = 2 (optional)
num_threads : int, default = 0 (optional)
Number of CPU threads used to increase construction parallelism
when hierarchy is `cpu`.
NOTE: Constructing the hierarchy when converting from a CAGRA graph
is highly sensitive to parallelism, and increasing the number of
threads can reduce the quality of the index.
when hierarchy is `cpu`. When the value is 0, the number of threads is
automatically determined to the maximum number of threads available.
"""

cdef cuvsHnswIndexParams* params
Expand All @@ -71,7 +69,7 @@ cdef class IndexParams:
def __init__(self, *,
hierarchy="none",
ef_construction=200,
num_threads=2):
num_threads=0):
if hierarchy == "none":
self.params.hierarchy = cuvsHnswHierarchy.NONE
elif hierarchy == "cpu":
Expand Down
5 changes: 2 additions & 3 deletions python/cuvs/cuvs/tests/test_hnsw.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def run_hnsw_build_search_test(

assert index.trained

hnsw_params = hnsw.IndexParams(hierarchy=hierarchy, num_threads=1)
hnsw_params = hnsw.IndexParams(hierarchy=hierarchy)
hnsw_index = hnsw.from_cagra(hnsw_params, index)

queries = generate_data((n_queries, n_cols), dtype)
Expand Down Expand Up @@ -135,7 +135,7 @@ def run_hnsw_extend_test(

assert index.trained

hnsw_params = hnsw.IndexParams(hierarchy="cpu", num_threads=1)
hnsw_params = hnsw.IndexParams(hierarchy="cpu")
hnsw_index = hnsw.from_cagra(hnsw_params, index)
hnsw.extend(hnsw.ExtendParams(), hnsw_index, add_dataset)

Expand All @@ -158,7 +158,6 @@ def run_hnsw_extend_test(
skl_dist, skl_idx = nn_skl.kneighbors(queries, return_distance=True)

recall = calc_recall(out_idx, skl_idx)
print(recall)
assert recall > 0.95


Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
name: cuvs_cagra_hnswlib
constraints:
build: cuvs_bench.config.algos.constraints.cuvs_cagra_build
search: cuvs_bench.config.algos.constraints.hnswlib_search
groups:
base:
Expand All @@ -9,6 +10,5 @@ groups:
graph_build_algo: ["NN_DESCENT"]
hierarchy: ["none", "cpu"]
ef_construction: [64, 128, 256, 512]
num_threads: [2, 5, 10]
search:
ef: [10, 20, 40, 60, 80, 120, 200, 400, 600, 800]

0 comments on commit 7609d18

Please sign in to comment.