Skip to content

Commit

Permalink
feat: support keyword text match (milvus-io#35923)
Browse files Browse the repository at this point in the history
fix: milvus-io#35922

---------

Signed-off-by: longjiquan <[email protected]>
  • Loading branch information
longjiquan authored Sep 10, 2024
1 parent b0939fd commit 89bf226
Show file tree
Hide file tree
Showing 88 changed files with 3,359 additions and 936 deletions.
3 changes: 3 additions & 0 deletions cmd/roles/roles.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ type component interface {

const (
TmpInvertedIndexPrefix = "/tmp/milvus/inverted-index/"
TmpTextLogPrefix = "/tmp/milvus/text-log/"
)

func cleanLocalDir(path string) {
Expand Down Expand Up @@ -209,6 +210,7 @@ func (mr *MilvusRoles) runQueryNode(ctx context.Context, localMsg bool, wg *sync
cleanLocalDir(mmapDir)
}
cleanLocalDir(TmpInvertedIndexPrefix)
cleanLocalDir(TmpTextLogPrefix)

return runComponent(ctx, localMsg, wg, components.NewQueryNode, metrics.RegisterQueryNode)
}
Expand Down Expand Up @@ -239,6 +241,7 @@ func (mr *MilvusRoles) runIndexNode(ctx context.Context, localMsg bool, wg *sync
indexDataLocalPath := filepath.Join(rootPath, typeutil.IndexNodeRole)
cleanLocalDir(indexDataLocalPath)
cleanLocalDir(TmpInvertedIndexPrefix)
cleanLocalDir(TmpTextLogPrefix)

return runComponent(ctx, localMsg, wg, components.NewIndexNode, metrics.RegisterIndexNode)
}
Expand Down
1 change: 1 addition & 0 deletions internal/core/src/common/Consts.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ const char NUM_CLUSTERS[] = "num_clusters";
const char KMEANS_CLUSTER[] = "KMEANS";
const char VEC_OPT_FIELDS[] = "opt_fields";
const char PAGE_RETAIN_ORDER[] = "page_retain_order";
const char TEXT_LOG_ROOT_PATH[] = "text_log";

const char DEFAULT_PLANNODE_ID[] = "0";
const char DEAFULT_QUERY_ID[] = "0";
Expand Down
131 changes: 131 additions & 0 deletions internal/core/src/common/FieldMeta.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License

#include "common/FieldMeta.h"
#include "common/SystemProperty.h"
#include "common/protobuf_utils.h"

#include <boost/lexical_cast.hpp>

#include "Consts.h"

namespace milvus {
TokenizerParams
ParseTokenizerParams(const TypeParams& params) {
auto iter = params.find("analyzer_params");
if (iter == params.end()) {
return {};
}
nlohmann::json j = nlohmann::json::parse(iter->second);
std::map<std::string, std::string> ret;
for (const auto& [k, v] : j.items()) {
try {
ret[k] = v.get<std::string>();
} catch (std::exception& e) {
ret[k] = v.dump();
}
}
return ret;
}

bool
FieldMeta::enable_match() const {
if (!IsStringDataType(type_)) {
return false;
}
if (!string_info_.has_value()) {
return false;
}
return string_info_->enable_match;
}

TokenizerParams
FieldMeta::get_tokenizer_params() const {
Assert(enable_match());
auto params = string_info_->params;
return ParseTokenizerParams(params);
}

FieldMeta
FieldMeta::ParseFrom(const milvus::proto::schema::FieldSchema& schema_proto) {
auto field_id = FieldId(schema_proto.fieldid());
auto name = FieldName(schema_proto.name());
auto nullable = schema_proto.nullable();
if (field_id.get() < 100) {
// system field id
auto is_system =
SystemProperty::Instance().SystemFieldVerify(name, field_id);
AssertInfo(is_system,
"invalid system type: name(" + name.get() + "), id(" +
std::to_string(field_id.get()) + ")");
}

auto data_type = DataType(schema_proto.data_type());

if (IsVectorDataType(data_type)) {
auto type_map = RepeatedKeyValToMap(schema_proto.type_params());
auto index_map = RepeatedKeyValToMap(schema_proto.index_params());

int64_t dim = 0;
if (!IsSparseFloatVectorDataType(data_type)) {
AssertInfo(type_map.count("dim"), "dim not found");
dim = boost::lexical_cast<int64_t>(type_map.at("dim"));
}
if (!index_map.count("metric_type")) {
return FieldMeta{
name, field_id, data_type, dim, std::nullopt, false};
}
auto metric_type = index_map.at("metric_type");
return FieldMeta{name, field_id, data_type, dim, metric_type, false};
}

if (IsStringDataType(data_type)) {
auto type_map = RepeatedKeyValToMap(schema_proto.type_params());
AssertInfo(type_map.count(MAX_LENGTH), "max_length not found");
auto max_len = boost::lexical_cast<int64_t>(type_map.at(MAX_LENGTH));
bool enable_match = false;
if (type_map.count("enable_match")) {
auto param_str = type_map.at("enable_match");
std::transform(param_str.begin(),
param_str.end(),
param_str.begin(),
::tolower);

auto bool_cast = [](const std::string& arg) -> bool {
std::istringstream ss(arg);
bool b;
ss >> std::boolalpha >> b;
return b;
};

enable_match = bool_cast(param_str);
}
return FieldMeta{name,
field_id,
data_type,
max_len,
nullable,
enable_match,
type_map};
}

if (IsArrayDataType(data_type)) {
return FieldMeta{name,
field_id,
data_type,
DataType(schema_proto.element_type()),
nullable};
}

return FieldMeta{name, field_id, data_type, nullable};
}

} // namespace milvus
32 changes: 32 additions & 0 deletions internal/core/src/common/FieldMeta.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@
#include "common/Types.h"

namespace milvus {
using TypeParams = std::map<std::string, std::string>;
using TokenizerParams = std::map<std::string, std::string>;

TokenizerParams
ParseTokenizerParams(const TypeParams& params);

class FieldMeta {
public:
Expand Down Expand Up @@ -53,6 +58,21 @@ class FieldMeta {
Assert(IsStringDataType(type_));
}

FieldMeta(const FieldName& name,
FieldId id,
DataType type,
int64_t max_length,
bool nullable,
bool enable_match,
std::map<std::string, std::string>& params)
: name_(name),
id_(id),
type_(type),
string_info_(StringInfo{max_length, enable_match, std::move(params)}),
nullable_(nullable) {
Assert(IsStringDataType(type_));
}

FieldMeta(const FieldName& name,
FieldId id,
DataType type,
Expand Down Expand Up @@ -99,6 +119,12 @@ class FieldMeta {
return string_info_->max_length;
}

bool
enable_match() const;

TokenizerParams
get_tokenizer_params() const;

std::optional<knowhere::MetricType>
get_metric_type() const {
Assert(IsVectorDataType(type_));
Expand Down Expand Up @@ -160,13 +186,19 @@ class FieldMeta {
}
}

public:
static FieldMeta
ParseFrom(const milvus::proto::schema::FieldSchema& schema_proto);

private:
struct VectorInfo {
int64_t dim_;
std::optional<knowhere::MetricType> metric_type_;
};
struct StringInfo {
int64_t max_length;
bool enable_match;
std::map<std::string, std::string> params;
};
FieldName name_;
FieldId id_;
Expand Down
46 changes: 2 additions & 44 deletions internal/core/src/common/Schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,51 +37,9 @@ Schema::ParseFrom(const milvus::proto::schema::CollectionSchema& schema_proto) {
for (const milvus::proto::schema::FieldSchema& child :
schema_proto.fields()) {
auto field_id = FieldId(child.fieldid());
auto name = FieldName(child.name());
auto nullable = child.nullable();
if (field_id.get() < 100) {
// system field id
auto is_system =
SystemProperty::Instance().SystemFieldVerify(name, field_id);
AssertInfo(is_system,
"invalid system type: name(" + name.get() + "), id(" +
std::to_string(field_id.get()) + ")");
}

auto data_type = DataType(child.data_type());

if (IsVectorDataType(data_type)) {
auto type_map = RepeatedKeyValToMap(child.type_params());
auto index_map = RepeatedKeyValToMap(child.index_params());

int64_t dim = 0;
if (!IsSparseFloatVectorDataType(data_type)) {
AssertInfo(type_map.count("dim"), "dim not found");
dim = boost::lexical_cast<int64_t>(type_map.at("dim"));
}
if (!index_map.count("metric_type")) {
schema->AddField(
name, field_id, data_type, dim, std::nullopt, false);
} else {
auto metric_type = index_map.at("metric_type");
schema->AddField(
name, field_id, data_type, dim, metric_type, false);
}
} else if (IsStringDataType(data_type)) {
auto type_map = RepeatedKeyValToMap(child.type_params());
AssertInfo(type_map.count(MAX_LENGTH), "max_length not found");
auto max_len =
boost::lexical_cast<int64_t>(type_map.at(MAX_LENGTH));
schema->AddField(name, field_id, data_type, max_len, nullable);
} else if (IsArrayDataType(data_type)) {
schema->AddField(name,
field_id,
data_type,
DataType(child.element_type()),
nullable);
} else {
schema->AddField(name, field_id, data_type, nullable);
}
auto f = FieldMeta::ParseFrom(child);
schema->AddField(std::move(f));

if (child.is_primary_key()) {
AssertInfo(!schema->get_primary_field_id().has_value(),
Expand Down
14 changes: 14 additions & 0 deletions internal/core/src/common/Schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,20 @@ class Schema {
this->AddField(std::move(field_meta));
}

// string type
void
AddField(const FieldName& name,
const FieldId id,
DataType data_type,
int64_t max_length,
bool nullable,
bool enable_match,
std::map<std::string, std::string>& params) {
auto field_meta = FieldMeta(
name, id, data_type, max_length, nullable, enable_match, params);
this->AddField(std::move(field_meta));
}

// vector type
void
AddField(const FieldName& name,
Expand Down
1 change: 1 addition & 0 deletions internal/core/src/common/type_c.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ typedef struct CMmapConfig {
uint64_t disk_limit;
uint64_t fix_file_size;
bool growing_enable_mmap;
bool enable_mmap;
} CMmapConfig;

typedef struct CTraceConfig {
Expand Down
2 changes: 1 addition & 1 deletion internal/core/src/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@
# or implied. See the License for the specific language governing permissions and limitations under the License

add_source_at_current_directory_recursively()
add_library(milvus_exec OBJECT ${SOURCE_FILES})
add_library(milvus_exec OBJECT ${SOURCE_FILES})
32 changes: 32 additions & 0 deletions internal/core/src/exec/expression/Expr.h
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,35 @@ class SegmentExpr : public Expr {
return result;
}

template <typename FUNC, typename... ValTypes>
TargetBitmap
ProcessTextMatchIndex(FUNC func, ValTypes... values) {
TargetBitmap result;

if (cached_match_res_ == nullptr) {
auto index = segment_->GetTextIndex(field_id_);
auto res = std::move(func(index, values...));
cached_match_res_ = std::make_shared<TargetBitmap>(std::move(res));
if (cached_match_res_->size() < active_count_) {
// some entities are not visible in inverted index.
// only happend on growing segment.
TargetBitmap tail(active_count_ - cached_match_res_->size());
cached_match_res_->append(tail);
}
}

// return batch size, not sure if we should use the data position.
auto real_batch_size =
current_data_chunk_pos_ + batch_size_ > active_count_
? active_count_ - current_data_chunk_pos_
: batch_size_;
result.append(
*cached_match_res_, current_data_chunk_pos_, real_batch_size);
current_data_chunk_pos_ += real_batch_size;

return result;
}

template <typename T, typename FUNC, typename... ValTypes>
void
ProcessIndexChunksV2(FUNC func, ValTypes... values) {
Expand Down Expand Up @@ -418,6 +447,9 @@ class SegmentExpr : public Expr {
// Cache for index scan to avoid search index every batch
int64_t cached_index_chunk_id_{-1};
TargetBitmap cached_index_chunk_res_{};

// Cache for text match.
std::shared_ptr<TargetBitmap> cached_match_res_{nullptr};
};

void
Expand Down
15 changes: 15 additions & 0 deletions internal/core/src/exec/expression/UnaryExpr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,10 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplJson() {
template <typename T>
VectorPtr
PhyUnaryRangeFilterExpr::ExecRangeVisitorImpl() {
if (expr_->op_type_ == proto::plan::OpType::TextMatch) {
return ExecTextMatch();
}

if (CanUseIndex<T>()) {
return ExecRangeVisitorImplForIndex<T>();
} else {
Expand Down Expand Up @@ -857,5 +861,16 @@ PhyUnaryRangeFilterExpr::CanUseIndex() {
return res;
}

VectorPtr
PhyUnaryRangeFilterExpr::ExecTextMatch() {
using Index = index::TextMatchIndex;
auto query = GetValueFromProto<std::string>(expr_->val_);
auto func = [](Index* index, const std::string& query) -> TargetBitmap {
return index->MatchQuery(query);
};
auto res = ProcessTextMatchIndex(func, query);
return std::make_shared<ColumnVector>(std::move(res));
};

} // namespace exec
} // namespace milvus
Loading

0 comments on commit 89bf226

Please sign in to comment.