From 862f75aa14d5117c965b1bb50e5d4f7d454babb8 Mon Sep 17 00:00:00 2001 From: Shawn Wang Date: Thu, 2 Jan 2025 19:27:54 +0800 Subject: [PATCH] sparse: add block-max wand and block-max maxscore algorithm Signed-off-by: Shawn Wang --- include/knowhere/comp/index_param.h | 1 + include/knowhere/sparse_utils.h | 10 + src/index/sparse/sparse_index_node.cc | 25 + src/index/sparse/sparse_inverted_index.h | 482 +++++++++++++++++- .../sparse/sparse_inverted_index_config.h | 15 +- tests/ut/test_sparse.cc | 12 +- 6 files changed, 518 insertions(+), 27 deletions(-) diff --git a/include/knowhere/comp/index_param.h b/include/knowhere/comp/index_param.h index 4e0632c2c..1da725912 100644 --- a/include/knowhere/comp/index_param.h +++ b/include/knowhere/comp/index_param.h @@ -178,6 +178,7 @@ constexpr const char* PRQ_NUM = "nrq"; // for PRQ, number of redisual quant constexpr const char* INVERTED_INDEX_ALGO = "inverted_index_algo"; constexpr const char* DROP_RATIO_BUILD = "drop_ratio_build"; constexpr const char* DROP_RATIO_SEARCH = "drop_ratio_search"; +constexpr const char* BLOCKMAX_BLOCK_SIZE = "blockmax_block_size"; } // namespace indexparam using MetricType = std::string; diff --git a/include/knowhere/sparse_utils.h b/include/knowhere/sparse_utils.h index 9f47df999..9accd179b 100644 --- a/include/knowhere/sparse_utils.h +++ b/include/knowhere/sparse_utils.h @@ -391,6 +391,16 @@ class GrowableVectorView { return reinterpret_cast(mmap_data_)[i]; } + T& + back() { + return reinterpret_cast(mmap_data_)[size() - 1]; + } + + const T& + back() const { + return reinterpret_cast(mmap_data_)[size() - 1]; + } + class iterator : public boost::iterator_facade { public: iterator() = default; diff --git a/src/index/sparse/sparse_index_node.cc b/src/index/sparse/sparse_index_node.cc index eb9e84858..49e3c3338 100644 --- a/src/index/sparse/sparse_index_node.cc +++ b/src/index/sparse/sparse_index_node.cc @@ -377,6 +377,20 @@ class SparseInvertedIndexNode : public IndexNode { sparse::SparseMetricType::METRIC_BM25); index->SetBM25Params(k1, b, avgdl); return index; + } else if (cfg.inverted_index_algo.value() == "DAAT_BLOCKMAX_WAND") { + auto index = + new sparse::InvertedIndex( + sparse::SparseMetricType::METRIC_BM25); + index->SetBM25Params(k1, b, avgdl); + index->SetBlockmaxBlockSize(cfg.blockmax_block_size.value()); + return index; + } else if (cfg.inverted_index_algo.value() == "DAAT_BLOCKMAX_MAXSCORE") { + auto index = + new sparse::InvertedIndex( + sparse::SparseMetricType::METRIC_BM25); + index->SetBM25Params(k1, b, avgdl); + index->SetBlockmaxBlockSize(cfg.blockmax_block_size.value()); + return index; } else if (cfg.inverted_index_algo.value() == "DAAT_MAXSCORE") { auto index = new sparse::InvertedIndex( sparse::SparseMetricType::METRIC_BM25); @@ -400,6 +414,17 @@ class SparseInvertedIndexNode : public IndexNode { auto index = new sparse::InvertedIndex( sparse::SparseMetricType::METRIC_IP); return index; + } else if (cfg.inverted_index_algo.value() == "DAAT_BLOCKMAX_WAND") { + auto index = new sparse::InvertedIndex( + sparse::SparseMetricType::METRIC_IP); + index->SetBlockmaxBlockSize(cfg.blockmax_block_size.value()); + return index; + } else if (cfg.inverted_index_algo.value() == "DAAT_BLOCKMAX_MAXSCORE") { + auto index = + new sparse::InvertedIndex( + sparse::SparseMetricType::METRIC_IP); + index->SetBlockmaxBlockSize(cfg.blockmax_block_size.value()); + return index; } else if (cfg.inverted_index_algo.value() == "TAAT_NAIVE") { auto index = new sparse::InvertedIndex( sparse::SparseMetricType::METRIC_IP); diff --git a/src/index/sparse/sparse_inverted_index.h b/src/index/sparse/sparse_inverted_index.h index 4e7822464..41d7e2ba1 100644 --- a/src/index/sparse/sparse_inverted_index.h +++ b/src/index/sparse/sparse_inverted_index.h @@ -14,9 +14,11 @@ #include #include +#include #include #include +#include #include #include #include @@ -39,6 +41,8 @@ enum class InvertedIndexAlgo { TAAT_NAIVE, DAAT_WAND, DAAT_MAXSCORE, + DAAT_BLOCKMAX_WAND, + DAAT_BLOCKMAX_MAXSCORE, }; struct InvertedIndexApproxSearchParams { @@ -111,6 +115,19 @@ class InvertedIndex : public BaseInvertedIndex { close(map_fd_); map_fd_ = -1; } + if (blockmax_map_ != nullptr) { + auto res = munmap(blockmax_map_, blockmax_map_byte_size_); + if (res != 0) { + LOG_KNOWHERE_ERROR_ << "Failed to munmap when deleting sparse InvertedIndex: " << strerror(errno); + } + blockmax_map_ = nullptr; + blockmax_map_byte_size_ = 0; + } + if (blockmax_map_fd_ != -1) { + // closing the file descriptor will also cause the file to be deleted. + close(blockmax_map_fd_); + blockmax_map_fd_ = -1; + } } } @@ -122,6 +139,11 @@ class InvertedIndex : public BaseInvertedIndex { bm25_params_ = std::make_unique(k1, b, avgdl); } + void + SetBlockmaxBlockSize(size_t block_size) { + blockmax_block_size_ = block_size; + } + expected> GetDocValueComputer(const SparseInvertedIndexConfig& cfg) const override { // if metric_type is set in config, it must match with how the index was built. @@ -145,13 +167,12 @@ class InvertedIndex : public BaseInvertedIndex { "avgdl must be supplied during searching"); } auto avgdl = cfg.bm25_avgdl.value(); - if constexpr (algo == InvertedIndexAlgo::DAAT_WAND || algo == InvertedIndexAlgo::DAAT_MAXSCORE) { - // daat_wand and daat_maxscore: search time k1/b must equal load time config. + if constexpr (algo != InvertedIndexAlgo::TAAT_NAIVE) { + // daat related algorithms: search time k1/b must equal load time config. if ((cfg.bm25_k1.has_value() && cfg.bm25_k1.value() != bm25_params_->k1) || ((cfg.bm25_b.has_value() && cfg.bm25_b.value() != bm25_params_->b))) { return expected>::Err( - Status::invalid_args, - "search time k1/b must equal load time config for DAAT_WAND or DAAT_MAXSCORE algorithm."); + Status::invalid_args, "search time k1/b must equal load time config for DAAT_* algorithm."); } return GetDocValueBM25Computer(bm25_params_->k1, bm25_params_->b, avgdl); } else { @@ -187,12 +208,6 @@ class InvertedIndex : public BaseInvertedIndex { writeBinaryPOD(writer, n_rows_internal_); writeBinaryPOD(writer, max_dim_); writeBinaryPOD(writer, deprecated_value_threshold); - BitsetView bitset(nullptr, 0); - - std::vector> cursors; - for (size_t i = 0; i < inverted_index_ids_.size(); ++i) { - cursors.emplace_back(inverted_index_ids_[i], inverted_index_vals_[i], n_rows_internal_, 0, 0, bitset); - } auto dim_map_reverse = std::unordered_map(); for (const auto& [dim, dim_id] : dim_map_) { @@ -230,6 +245,7 @@ class InvertedIndex : public BaseInvertedIndex { return Status::success; } + Status Load(MemoryIOReader& reader, int map_flags, const std::string& supplement_target_filename) override { DType deprecated_value_threshold; @@ -243,7 +259,7 @@ class InvertedIndex : public BaseInvertedIndex { readBinaryPOD(reader, deprecated_value_threshold); if constexpr (mmapped) { - RETURN_IF_ERROR(PrepareMmap(reader, rows, map_flags, supplement_target_filename)); + RETURN_IF_ERROR(prepare_index_mmap(reader, rows, map_flags, supplement_target_filename)); } else { if (metric_type_ == SparseMetricType::METRIC_BM25) { bm25_params_->row_sums.reserve(rows); @@ -263,17 +279,51 @@ class InvertedIndex : public BaseInvertedIndex { reader.read(raw_row.data(), count * SparseRow::element_size()); } } - add_row_to_index(raw_row, i); + add_row_to_index(raw_row, i, false); } n_rows_internal_ = rows; + // prepare and generate blockmax information + if constexpr (algo == InvertedIndexAlgo::DAAT_BLOCKMAX_MAXSCORE || + algo == InvertedIndexAlgo::DAAT_BLOCKMAX_WAND) { + if constexpr (mmapped) { + RETURN_IF_ERROR(prepare_blockmax_mmap(map_flags, supplement_target_filename + ".blockmax")); + } else { + blockmax_last_block_sizes_.resize(inverted_index_ids_.size(), 0); + blockmax_ids_.resize(inverted_index_ids_.size()); + blockmax_scores_.resize(inverted_index_ids_.size()); + } + + for (size_t i = 0; i < inverted_index_ids_.size(); ++i) { + auto& ids = inverted_index_ids_[i]; + auto& vals = inverted_index_vals_[i]; + for (size_t j = 0; j < ids.size(); ++j) { + float score = static_cast(vals[j]); + if (metric_type_ == SparseMetricType::METRIC_BM25) { + score = bm25_params_->max_score_computer(vals[j], bm25_params_->row_sums.at(ids[j])); + } + if (blockmax_last_block_sizes_[i] == 0) { + blockmax_ids_[i].emplace_back(ids[j]); + blockmax_scores_[i].emplace_back(score); + } else { + blockmax_ids_[i].back() = ids[j]; + blockmax_scores_[i].back() = std::max(blockmax_scores_[i].back(), score); + } + if (++blockmax_last_block_sizes_[i] >= blockmax_block_size_) { + blockmax_last_block_sizes_[i] = 0; + } + } + } + } + return Status::success; } // memory in reader must be guaranteed to be valid during the lifetime of this object. Status - PrepareMmap(MemoryIOReader& reader, size_t rows, int map_flags, const std::string& supplement_target_filename) { + prepare_index_mmap(MemoryIOReader& reader, size_t rows, int map_flags, + const std::string& supplement_target_filename) { const auto initial_reader_location = reader.tellg(); const auto nnz = (reader.remaining() - (rows * sizeof(size_t))) / SparseRow::element_size(); @@ -307,7 +357,7 @@ class InvertedIndex : public BaseInvertedIndex { map_byte_size_ = inverted_index_ids_byte_size + inverted_index_vals_byte_size + plists_ids_byte_size + plists_vals_byte_size; - if constexpr (algo == InvertedIndexAlgo::DAAT_WAND || algo == InvertedIndexAlgo::DAAT_MAXSCORE) { + if constexpr (algo != InvertedIndexAlgo::TAAT_NAIVE) { map_byte_size_ += max_score_in_dim_byte_size; } if (metric_type_ == SparseMetricType::METRIC_BM25) { @@ -356,7 +406,7 @@ class InvertedIndex : public BaseInvertedIndex { inverted_index_vals_.initialize(ptr, inverted_index_vals_byte_size); ptr += inverted_index_vals_byte_size; - if constexpr (algo == InvertedIndexAlgo::DAAT_WAND || algo == InvertedIndexAlgo::DAAT_MAXSCORE) { + if constexpr (algo != InvertedIndexAlgo::TAAT_NAIVE) { max_score_in_dim_.initialize(ptr, max_score_in_dim_byte_size); ptr += max_score_in_dim_byte_size; } @@ -381,17 +431,91 @@ class InvertedIndex : public BaseInvertedIndex { size_t dim_id = 0; for (const auto& [idx, count] : idx_counts) { dim_map_[idx] = dim_id; - if constexpr (algo == InvertedIndexAlgo::DAAT_WAND || algo == InvertedIndexAlgo::DAAT_MAXSCORE) { + if constexpr (algo != InvertedIndexAlgo::TAAT_NAIVE) { max_score_in_dim_.emplace_back(0.0f); } ++dim_id; } + // in mmap mode, next_dim_id_ should never be used, but still assigning for consistency. next_dim_id_ = dim_id; return Status::success; } + Status + prepare_blockmax_mmap(int map_flags, const std::string& blockmax_mmap_filename) { + std::ofstream temp_file(blockmax_mmap_filename, std::ios::binary | std::ios::trunc); + if (!temp_file) { + LOG_KNOWHERE_ERROR_ << "Failed to create mmap file when loading sparse InvertedIndex: " << strerror(errno); + return Status::disk_file_error; + } + temp_file.close(); + + size_t blockmax_total_blocks = 0; + for (size_t i = 0; i < inverted_index_ids_.size(); ++i) { + blockmax_total_blocks += (inverted_index_ids_[i].size() + blockmax_block_size_ - 1) / blockmax_block_size_; + } + auto blockmax_last_block_sizes_byte_size = + inverted_index_ids_.size() * sizeof(typename decltype(blockmax_last_block_sizes_)::value_type); + auto blockmax_ids_dim0_byte_size = + inverted_index_ids_.size() * sizeof(typename decltype(blockmax_ids_)::value_type); + auto blockmax_scores_dim0_byte_size = + inverted_index_ids_.size() * sizeof(typename decltype(blockmax_scores_)::value_type); + auto blockmax_total_blocks_byte_size = + blockmax_total_blocks * (sizeof(typename decltype(blockmax_ids_)::value_type::value_type) + + sizeof(typename decltype(blockmax_scores_)::value_type::value_type)); + blockmax_map_byte_size_ = blockmax_last_block_sizes_byte_size + blockmax_ids_dim0_byte_size + + blockmax_scores_dim0_byte_size + blockmax_total_blocks_byte_size; + std::filesystem::resize_file(blockmax_mmap_filename, blockmax_map_byte_size_); + + blockmax_map_fd_ = open(blockmax_mmap_filename.c_str(), O_RDWR); + if (blockmax_map_fd_ == -1) { + LOG_KNOWHERE_ERROR_ << "Failed to open mmap file when loading sparse InvertedIndex: " << strerror(errno); + return Status::disk_file_error; + } + std::filesystem::remove(blockmax_mmap_filename); + // clear MAP_PRIVATE flag: we need to write to this mmapped memory/file, + // MAP_PRIVATE triggers copy-on-write and uses extra anonymous memory. + map_flags &= ~MAP_PRIVATE; + map_flags |= MAP_SHARED; + blockmax_map_ = static_cast( + mmap(nullptr, blockmax_map_byte_size_, PROT_READ | PROT_WRITE, map_flags, blockmax_map_fd_, 0)); + if (blockmax_map_ == MAP_FAILED) { + LOG_KNOWHERE_ERROR_ << "Failed to create blockmax mmap when loading sparse InvertedIndex: " + << strerror(errno) << ", size: " << blockmax_map_byte_size_ + << " on file: " << blockmax_mmap_filename; + return Status::disk_file_error; + } + + char* ptr = blockmax_map_; + + blockmax_last_block_sizes_.initialize(ptr, blockmax_last_block_sizes_byte_size); + ptr += blockmax_last_block_sizes_byte_size; + blockmax_ids_.initialize(ptr, blockmax_ids_dim0_byte_size); + ptr += blockmax_ids_dim0_byte_size; + blockmax_scores_.initialize(ptr, blockmax_scores_dim0_byte_size); + ptr += blockmax_scores_dim0_byte_size; + + for (size_t i = 0; i < inverted_index_ids_.size(); ++i) { + blockmax_last_block_sizes_[i] = 0; + } + for (size_t i = 0; i < inverted_index_ids_.size(); ++i) { + auto bcount = (inverted_index_ids_[i].size() + blockmax_block_size_ - 1) / blockmax_block_size_; + auto& bids = blockmax_ids_.emplace_back(); + bids.initialize(ptr, bcount * sizeof(typename decltype(blockmax_ids_)::value_type::value_type)); + ptr += bcount * sizeof(typename decltype(blockmax_ids_)::value_type::value_type); + } + for (size_t i = 0; i < inverted_index_ids_.size(); ++i) { + auto bcount = (inverted_index_ids_[i].size() + blockmax_block_size_ - 1) / blockmax_block_size_; + auto& bscores = blockmax_scores_.emplace_back(); + bscores.initialize(ptr, bcount * sizeof(typename decltype(blockmax_scores_)::value_type::value_type)); + ptr += bcount * sizeof(typename decltype(blockmax_scores_)::value_type::value_type); + } + + return Status::success; + } + // Non zero drop ratio is only supported for static index, i.e. data should // include all rows that'll be added to the index. Status @@ -417,8 +541,9 @@ class InvertedIndex : public BaseInvertedIndex { bm25_params_->row_sums.reserve(current_rows + rows); } for (size_t i = 0; i < rows; ++i) { - add_row_to_index(data[i], current_rows + i); + add_row_to_index(data[i], current_rows + i, true); } + n_rows_internal_ += rows; return Status::success; @@ -441,11 +566,15 @@ class InvertedIndex : public BaseInvertedIndex { } MaxMinHeap heap(k * approx_params.refine_factor); - // DAAT_WAND and DAAT_MAXSCORE are based on the implementation in PISA. + // DAAT related algorithms are based on the implementation in PISA. if constexpr (algo == InvertedIndexAlgo::DAAT_WAND) { search_daat_wand(q_vec, heap, bitset, computer, approx_params.dim_max_score_ratio); } else if constexpr (algo == InvertedIndexAlgo::DAAT_MAXSCORE) { search_daat_maxscore(q_vec, heap, bitset, computer, approx_params.dim_max_score_ratio); + } else if constexpr (algo == InvertedIndexAlgo::DAAT_BLOCKMAX_MAXSCORE) { + search_daat_blockmax_maxscore(q_vec, heap, bitset, computer, approx_params.dim_max_score_ratio); + } else if constexpr (algo == InvertedIndexAlgo::DAAT_BLOCKMAX_WAND) { + search_daat_blockmax_wand(q_vec, heap, bitset, computer, approx_params.dim_max_score_ratio); } else { search_taat_naive(q_vec, heap, bitset, computer); } @@ -525,9 +654,22 @@ class InvertedIndex : public BaseInvertedIndex { res += sizeof(typename decltype(inverted_index_vals_)::value_type::value_type) * inverted_index_vals_[i].capacity(); } - if constexpr (algo == InvertedIndexAlgo::DAAT_WAND || algo == InvertedIndexAlgo::DAAT_MAXSCORE) { + if constexpr (algo != InvertedIndexAlgo::TAAT_NAIVE) { res += sizeof(typename decltype(max_score_in_dim_)::value_type) * max_score_in_dim_.capacity(); } + if constexpr (algo == InvertedIndexAlgo::DAAT_BLOCKMAX_WAND || + algo == InvertedIndexAlgo::DAAT_BLOCKMAX_MAXSCORE) { + res += sizeof(typename decltype(blockmax_ids_)::value_type) * blockmax_ids_.capacity(); + res += sizeof(typename decltype(blockmax_scores_)::value_type) * blockmax_scores_.capacity(); + for (size_t i = 0; i < blockmax_ids_.size(); ++i) { + res += + sizeof(typename decltype(blockmax_ids_)::value_type::value_type) * blockmax_ids_[i].capacity(); + } + for (size_t i = 0; i < blockmax_scores_.size(); ++i) { + res += sizeof(typename decltype(blockmax_scores_)::value_type::value_type) * + blockmax_scores_[i].capacity(); + } + } return res; } } @@ -639,6 +781,40 @@ class InvertedIndex : public BaseInvertedIndex { } }; // struct Cursor + template + struct BlockMaxCursor : public Cursor { + public: + BlockMaxCursor(const Vector& plist_ids, const Vector& plist_vals, + const Vector& pblk_max_ids, const Vector& pblk_max_scores, size_t num_vec, + float max_score, float q_value, DocIdFilter filter, float block_max_score_ratio) + : Cursor(plist_ids, plist_vals, num_vec, max_score, q_value, filter), + pblk_max_ids_(pblk_max_ids), + pblk_max_scores_(pblk_max_scores), + scaled_q_value_(q_value * block_max_score_ratio) { + } + void + seek_block(table_t vec_id) { + while (bloc_ < pblk_max_ids_.size() && pblk_max_ids_[bloc_] < vec_id) { + ++bloc_; + } + cur_block_end_vec_id_ = (bloc_ >= pblk_max_ids_.size()) ? this->total_num_vec_ : pblk_max_ids_[bloc_]; + } + + [[nodiscard]] float + cur_block_max_score() const { + if (bloc_ >= pblk_max_scores_.size()) { + return 0; + } + return scaled_q_value_ * pblk_max_scores_[bloc_]; + } + + const Vector& pblk_max_ids_; + const Vector& pblk_max_scores_; + float scaled_q_value_ = 0.0f; + table_t cur_block_end_vec_id_ = 0; + size_t bloc_ = 0; + }; // struct BlockMaxCursor + std::vector> parse_query(const SparseRow& query, float drop_ratio_search) const { DType q_threshold = 0; @@ -679,6 +855,24 @@ class InvertedIndex : public BaseInvertedIndex { return cursors; } + template + std::vector> + make_blockmax_cursors(const std::vector>& q_vec, const DocValueComputer& computer, + DocIdFilter& filter, float dim_max_score_ratio, float block_max_score_ratio = 1.05) const { + std::vector> cursors; + cursors.reserve(q_vec.size()); + for (auto q : q_vec) { + auto& plist_ids = inverted_index_ids_[q.first]; + auto& plist_vals = inverted_index_vals_[q.first]; + auto& pblk_max_ids = blockmax_ids_[q.first]; + auto& pblk_max_scores = blockmax_scores_[q.first]; + cursors.emplace_back(plist_ids, plist_vals, pblk_max_ids, pblk_max_scores, n_rows_internal_, + max_score_in_dim_[q.first] * q.second * dim_max_score_ratio, q.second, filter, + block_max_score_ratio); + } + return cursors; + } + // find the top-k candidates using brute force search, k as specified by the capacity of the heap. // any value in q_vec that is smaller than q_threshold and any value with dimension >= n_cols() will be ignored. // TODO: may switch to row-wise brute force if filter rate is high. Benchmark needed. @@ -849,6 +1043,206 @@ class InvertedIndex : public BaseInvertedIndex { } } + template + void + search_daat_blockmax_wand(std::vector>& q_vec, MaxMinHeap& heap, + DocIdFilter& filter, const DocValueComputer& computer, + float dim_max_score_ratio) const { + std::vector> cursors = + make_blockmax_cursors(q_vec, computer, filter, dim_max_score_ratio); + std::vector*> cursor_ptrs(cursors.size()); + for (size_t i = 0; i < cursors.size(); ++i) { + cursor_ptrs[i] = &cursors[i]; + } + + auto sort_cursors = [&cursor_ptrs] { + std::sort(cursor_ptrs.begin(), cursor_ptrs.end(), + [](auto& x, auto& y) { return x->cur_vec_id_ < y->cur_vec_id_; }); + }; + sort_cursors(); + + while (true) { + float threshold = heap.full() ? heap.top().val : 0; + float global_upper_bound = 0; + table_t pivot_id; + size_t pivot; + bool found_pivot = false; + + for (pivot = 0; pivot < cursor_ptrs.size(); ++pivot) { + if (cursor_ptrs[pivot]->cur_vec_id_ >= n_rows_internal_) { + break; + } + global_upper_bound += cursor_ptrs[pivot]->max_score_; + if (global_upper_bound > threshold) { + found_pivot = true; + pivot_id = cursor_ptrs[pivot]->cur_vec_id_; + for (; pivot + 1 < cursor_ptrs.size(); ++pivot) { + if (cursor_ptrs[pivot + 1]->cur_vec_id_ != pivot_id) { + break; + } + } + break; + } + } + + if (!found_pivot) { + break; + } + + float block_upper_bound = 0.0f; + for (size_t i = 0; i <= pivot; ++i) { + if (cursor_ptrs[i]->cur_block_end_vec_id_ < pivot_id) { + cursor_ptrs[i]->seek_block(pivot_id); + } + block_upper_bound += cursor_ptrs[i]->cur_block_max_score(); + } + + if (block_upper_bound > threshold) { + if (pivot_id == cursor_ptrs[0]->cur_vec_id_) { + float score = 0.0f; + float cur_vec_sum = + metric_type_ == SparseMetricType::METRIC_BM25 ? bm25_params_->row_sums.at(pivot_id) : 0; + for (auto& cursor_ptr : cursor_ptrs) { + if (cursor_ptr->cur_vec_id_ != pivot_id) { + break; + } + score += cursor_ptr->q_value_ * computer(cursor_ptr->cur_vec_val(), cur_vec_sum); + cursor_ptr->next(); + } + + heap.push(pivot_id, score); + threshold = heap.full() ? heap.top().val : 0; + sort_cursors(); + } else { + size_t next_list = pivot; + for (; cursor_ptrs[next_list]->cur_vec_id_ == pivot_id; --next_list) { + } + + cursor_ptrs[next_list]->seek(pivot_id); + for (size_t i = next_list + 1; i < cursor_ptrs.size(); ++i) { + if (cursor_ptrs[i]->cur_vec_id_ >= cursor_ptrs[i - 1]->cur_vec_id_) { + break; + } + std::swap(cursor_ptrs[i], cursor_ptrs[i - 1]); + } + } + } else { + table_t cand_id = n_rows_internal_ - 1; + for (size_t i = 0; i <= pivot; ++i) { + if (cursor_ptrs[i]->cur_block_end_vec_id_ < cand_id) { + cand_id = cursor_ptrs[i]->cur_block_end_vec_id_; + } + } + ++cand_id; + + // cursor_ptrs[pivot + 1] must have a vec_id greater than pivot_id, + // and if this condition is met, it means pivot can start from pivot + 1 + if (pivot + 1 < cursor_ptrs.size() && cursor_ptrs[pivot + 1]->cur_vec_id_ < cand_id) { + cand_id = cursor_ptrs[pivot + 1]->cur_vec_id_; + } + assert(cand_id > pivot_id); + + size_t next_list = pivot; + while (next_list + 1 > 0) { + cursor_ptrs[next_list]->seek(cand_id); + if (cursor_ptrs[next_list]->cur_vec_id_ != cand_id) { + for (size_t i = next_list + 1; i < cursor_ptrs.size(); ++i) { + if (cursor_ptrs[i]->cur_vec_id_ >= cursor_ptrs[i - 1]->cur_vec_id_) { + break; + } + std::swap(cursor_ptrs[i], cursor_ptrs[i - 1]); + } + break; + } + --next_list; + } + } + } + } + + template + void + search_daat_blockmax_maxscore(std::vector>& q_vec, MaxMinHeap& heap, + DocIdFilter& filter, const DocValueComputer& computer, + float dim_max_score_ratio) const { + std::sort(q_vec.begin(), q_vec.end(), [this](auto& a, auto& b) { + return a.second * max_score_in_dim_[a.first] > b.second * max_score_in_dim_[b.first]; + }); + std::vector> cursors = + make_blockmax_cursors(q_vec, computer, filter, dim_max_score_ratio); + + std::vector upper_bounds(cursors.size() + 1, 0.0f); + float bound_sum = 0.0f; + for (size_t i = cursors.size() - 1; i + 1 > 0; --i) { + bound_sum += cursors[i].max_score_; + upper_bounds[i] = bound_sum; + } + + float threshold = heap.full() ? heap.top().val : 0; + + table_t ne_start_cursor_id = cursors.size(); + uint64_t curr_cand_vec_id = (*std::min_element(cursors.begin(), cursors.end(), [](auto&& lhs, auto&& rhs) { + return lhs.cur_vec_id_ < rhs.cur_vec_id_; + })).cur_vec_id_; + + while (ne_start_cursor_id > 0 && curr_cand_vec_id < n_rows_internal_) { + float score = 0; + table_t next_cand_vec_id = n_rows_internal_; + float cur_vec_sum = + metric_type_ == SparseMetricType::METRIC_BM25 ? bm25_params_->row_sums.at(curr_cand_vec_id) : 0; + + // score essential list and find next + for (size_t i = 0; i < ne_start_cursor_id; ++i) { + if (cursors[i].cur_vec_id_ == curr_cand_vec_id) { + score += cursors[i].q_value_ * computer(cursors[i].cur_vec_val(), cur_vec_sum); + cursors[i].next(); + } + if (cursors[i].cur_vec_id_ < next_cand_vec_id) { + next_cand_vec_id = cursors[i].cur_vec_id_; + } + } + + auto new_score = score + upper_bounds[ne_start_cursor_id]; + if (new_score > threshold) { + // update block index for non-essential list and check block upper bound + for (size_t i = ne_start_cursor_id; i < cursors.size(); ++i) { + if (cursors[i].cur_block_end_vec_id_ < curr_cand_vec_id) { + cursors[i].seek_block(curr_cand_vec_id); + } + new_score -= cursors[i].max_score_ - cursors[i].cur_block_max_score(); + if (new_score <= threshold) { + break; + } + } + if (new_score > threshold) { + // try to complete evaluation with non-essential lists + for (size_t i = ne_start_cursor_id; i < cursors.size(); ++i) { + cursors[i].seek(curr_cand_vec_id); + if (cursors[i].cur_vec_id_ == curr_cand_vec_id) { + new_score += cursors[i].q_value_ * computer(cursors[i].cur_vec_val(), cur_vec_sum); + } + new_score -= cursors[i].cur_block_max_score(); + + if (new_score <= threshold) { + break; + } + } + score = new_score; + } + if (score > threshold) { + heap.push(curr_cand_vec_id, score); + threshold = heap.full() ? heap.top().val : 0; + // update non-essential lists + while (ne_start_cursor_id != 0 && upper_bounds[ne_start_cursor_id - 1] <= threshold) { + --ne_start_cursor_id; + } + } + } + + curr_cand_vec_id = next_cand_vec_id; + } + } + void refine_and_collect(const SparseRow& query, MaxMinHeap& inacc_heap, size_t k, float* distances, label_t* labels, const DocValueComputer& computer, @@ -875,9 +1269,14 @@ class InvertedIndex : public BaseInvertedIndex { search_daat_wand(q_vec, heap, filter, computer, dim_max_score_ratio); } else if constexpr (algo == InvertedIndexAlgo::DAAT_MAXSCORE) { search_daat_maxscore(q_vec, heap, filter, computer, dim_max_score_ratio); + } else if constexpr (algo == InvertedIndexAlgo::DAAT_BLOCKMAX_WAND) { + search_daat_blockmax_wand(q_vec, heap, filter, computer, dim_max_score_ratio); + } else if constexpr (algo == InvertedIndexAlgo::DAAT_BLOCKMAX_MAXSCORE) { + search_daat_blockmax_maxscore(q_vec, heap, filter, computer, dim_max_score_ratio); } else { search_taat_naive(q_vec, heap, filter, computer); } + collect_result(heap, distances, labels); } @@ -893,7 +1292,7 @@ class InvertedIndex : public BaseInvertedIndex { } inline void - add_row_to_index(const SparseRow& row, table_t vec_id) { + add_row_to_index(const SparseRow& row, table_t vec_id, bool is_append) { [[maybe_unused]] float row_sum = 0; for (size_t j = 0; j < row.size(); ++j) { auto [dim, val] = row[j]; @@ -913,18 +1312,49 @@ class InvertedIndex : public BaseInvertedIndex { dim_it = dim_map_.insert({dim, next_dim_id_++}).first; inverted_index_ids_.emplace_back(); inverted_index_vals_.emplace_back(); - if constexpr (algo == InvertedIndexAlgo::DAAT_WAND || algo == InvertedIndexAlgo::DAAT_MAXSCORE) { + if constexpr (algo != InvertedIndexAlgo::TAAT_NAIVE) { max_score_in_dim_.emplace_back(0.0f); + if constexpr (algo == InvertedIndexAlgo::DAAT_BLOCKMAX_WAND || + algo == InvertedIndexAlgo::DAAT_BLOCKMAX_MAXSCORE) { + if (is_append) { + blockmax_ids_.emplace_back(); + blockmax_scores_.emplace_back(); + blockmax_last_block_sizes_.emplace_back(0); + } + } } } inverted_index_ids_[dim_it->second].emplace_back(vec_id); inverted_index_vals_[dim_it->second].emplace_back(get_quant_val(val)); - if constexpr (algo == InvertedIndexAlgo::DAAT_WAND || algo == InvertedIndexAlgo::DAAT_MAXSCORE) { + if constexpr (algo != InvertedIndexAlgo::TAAT_NAIVE) { auto score = static_cast(val); if (metric_type_ == SparseMetricType::METRIC_BM25) { score = bm25_params_->max_score_computer(val, row_sum); } max_score_in_dim_[dim_it->second] = std::max(max_score_in_dim_[dim_it->second], score); + if constexpr (algo == InvertedIndexAlgo::DAAT_BLOCKMAX_WAND || + algo == InvertedIndexAlgo::DAAT_BLOCKMAX_MAXSCORE) { + if (is_append) { + size_t cur_block_size = blockmax_last_block_sizes_[dim_it->second]; + if (cur_block_size == 0) { + // create a new block and add the first element + blockmax_ids_[dim_it->second].emplace_back(vec_id); + blockmax_scores_[dim_it->second].emplace_back(score); + } else { + // change the element of the last block + blockmax_ids_[dim_it->second].back() = vec_id; + if (score > blockmax_scores_[dim_it->second].back()) { + blockmax_scores_[dim_it->second].back() = score; + } + } + // update the last block size + ++cur_block_size; + if (cur_block_size >= blockmax_block_size_) { + cur_block_size = 0; + } + blockmax_last_block_sizes_[dim_it->second] = cur_block_size; + } + } } } if (metric_type_ == SparseMetricType::METRIC_BM25) { @@ -955,6 +1385,10 @@ class InvertedIndex : public BaseInvertedIndex { Vector> inverted_index_ids_; Vector> inverted_index_vals_; Vector max_score_in_dim_; + Vector> blockmax_ids_; + Vector> blockmax_scores_; + Vector blockmax_last_block_sizes_; + size_t blockmax_block_size_ = 0; SparseMetricType metric_type_; @@ -966,6 +1400,10 @@ class InvertedIndex : public BaseInvertedIndex { size_t map_byte_size_ = 0; int map_fd_ = -1; + char* blockmax_map_ = nullptr; + size_t blockmax_map_byte_size_ = 0; + int blockmax_map_fd_ = -1; + struct BM25Params { float k1; float b; diff --git a/src/index/sparse/sparse_inverted_index_config.h b/src/index/sparse/sparse_inverted_index_config.h index f92b8a03c..6c75c3962 100644 --- a/src/index/sparse/sparse_inverted_index_config.h +++ b/src/index/sparse/sparse_inverted_index_config.h @@ -24,6 +24,7 @@ class SparseInvertedIndexConfig : public BaseConfig { CFG_INT refine_factor; CFG_FLOAT dim_max_score_ratio; CFG_STRING inverted_index_algo; + CFG_INT blockmax_block_size; KNOHWERE_DECLARE_CONFIG(SparseInvertedIndexConfig) { // NOTE: drop_ratio_build has been deprecated, it won't change anything KNOWHERE_CONFIG_DECLARE_FIELD(drop_ratio_build) @@ -87,18 +88,26 @@ class SparseInvertedIndexConfig : public BaseConfig { .for_train() .for_deserialize() .for_deserialize_from_file(); + KNOWHERE_CONFIG_DECLARE_FIELD(blockmax_block_size) + .description("block size for blockmax-based algorithms") + .set_default(64) + .set_range(1, 65535, true, true) + .for_train_and_search() + .for_deserialize() + .for_deserialize_from_file(); } Status CheckAndAdjust(PARAM_TYPE param_type, std::string* err_msg) override { if (param_type == PARAM_TYPE::TRAIN) { - constexpr std::array legal_inverted_index_algo_list{"TAAT_NAIVE", "DAAT_WAND", - "DAAT_MAXSCORE"}; + constexpr std::array legal_inverted_index_algo_list{ + "TAAT_NAIVE", "DAAT_WAND", "DAAT_MAXSCORE", "DAAT_BLOCKMAX_WAND", "DAAT_BLOCKMAX_MAXSCORE"}; std::string inverted_index_algo_str = inverted_index_algo.value_or(""); if (std::find(legal_inverted_index_algo_list.begin(), legal_inverted_index_algo_list.end(), inverted_index_algo_str) == legal_inverted_index_algo_list.end()) { std::string msg = "sparse inverted index algo " + inverted_index_algo_str + - " not found or not supported, supported: [TAAT_NAIVE DAAT_WAND DAAT_MAXSCORE]"; + " not found or not supported, supported: [TAAT_NAIVE DAAT_WAND DAAT_MAXSCORE " + "DAAT_BLOCKMAX_WAND DAAT_BLOCKMAX_MAXSCORE]"; return HandleError(err_msg, msg, Status::invalid_args); } } diff --git a/tests/ut/test_sparse.cc b/tests/ut/test_sparse.cc index 576330ec0..1cb4ac563 100644 --- a/tests/ut/test_sparse.cc +++ b/tests/ut/test_sparse.cc @@ -47,7 +47,8 @@ TEST_CASE("Test Mem Sparse Index With Float Vector", "[float metrics]") { auto metric = GENERATE(knowhere::metric::IP, knowhere::metric::BM25); - auto inverted_index_algo = GENERATE("TAAT_NAIVE", "DAAT_WAND", "DAAT_MAXSCORE"); + std::string inverted_index_algo = + GENERATE("TAAT_NAIVE", "DAAT_WAND", "DAAT_MAXSCORE", "DAAT_BLOCKMAX_WAND", "DAAT_BLOCKMAX_MAXSCORE"); auto drop_ratio_search = metric == knowhere::metric::BM25 ? GENERATE(0.0, 0.1) : GENERATE(0.0, 0.3); @@ -69,6 +70,9 @@ TEST_CASE("Test Mem Sparse Index With Float Vector", "[float metrics]") { knowhere::Json json = base_gen(); json[knowhere::indexparam::DROP_RATIO_SEARCH] = drop_ratio_search; json[knowhere::indexparam::INVERTED_INDEX_ALGO] = inverted_index_algo; + if (inverted_index_algo == "DAAT_BLOCKMAX_WAND" || inverted_index_algo == "DAAT_BLOCKMAX_MAXSCORE") { + json[knowhere::indexparam::BLOCKMAX_BLOCK_SIZE] = GENERATE(1, 2, 64, 128); + } return json; }; @@ -464,7 +468,8 @@ TEST_CASE("Test Mem Sparse Index CC", "[float metrics]") { auto query_ds = doc_vector_gen(nq, dim); - auto inverted_index_algo = GENERATE("TAAT_NAIVE", "DAAT_WAND", "DAAT_MAXSCORE"); + std::string inverted_index_algo = + GENERATE("TAAT_NAIVE", "DAAT_WAND", "DAAT_MAXSCORE", "DAAT_BLOCKMAX_WAND", "DAAT_BLOCKMAX_MAXSCORE"); auto drop_ratio_search = GENERATE(0.0, 0.3); @@ -487,6 +492,9 @@ TEST_CASE("Test Mem Sparse Index CC", "[float metrics]") { knowhere::Json json = base_gen(); json[knowhere::indexparam::DROP_RATIO_SEARCH] = drop_ratio_search; json[knowhere::indexparam::INVERTED_INDEX_ALGO] = inverted_index_algo; + if (inverted_index_algo == "DAAT_BLOCKMAX_WAND" || inverted_index_algo == "DAAT_BLOCKMAX_MAXSCORE") { + json[knowhere::indexparam::BLOCKMAX_BLOCK_SIZE] = GENERATE(1, 2, 64, 128); + } return json; };