Skip to content

Commit

Permalink
Delete pulsar address test and refactor master param table
Browse files Browse the repository at this point in the history
Signed-off-by: bigsheeper <[email protected]>
  • Loading branch information
bigsheeper authored and yefu.chen committed Nov 23, 2020
1 parent c7a49c9 commit 84f3d97
Show file tree
Hide file tree
Showing 33 changed files with 943 additions and 653 deletions.
38 changes: 2 additions & 36 deletions cmd/master/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,41 +17,7 @@ func main() {
// Creates server.
ctx, cancel := context.WithCancel(context.Background())

etcdAddress := master.Params.EtcdAddress()
etcdRootPath := master.Params.EtcdRootPath()
pulsarAddr := master.Params.PulsarAddress()
defaultRecordSize := master.Params.DefaultRecordSize()
minimumAssignSize := master.Params.MinimumAssignSize()
segmentThreshold := master.Params.SegmentThreshold()
segmentExpireDuration := master.Params.SegmentExpireDuration()
numOfChannel := master.Params.TopicNum()
nodeNum, _ := master.Params.QueryNodeNum()
statsChannel := master.Params.StatsChannels()

opt := master.Option{
KVRootPath: etcdRootPath,
MetaRootPath: etcdRootPath,
EtcdAddr: []string{etcdAddress},
PulsarAddr: pulsarAddr,
ProxyIDs: master.Params.ProxyIDList(),
PulsarProxyChannels: master.Params.ProxyTimeSyncChannels(),
PulsarProxySubName: master.Params.ProxyTimeSyncSubName(),
SoftTTBInterval: master.Params.SoftTimeTickBarrierInterval(),
WriteIDs: master.Params.WriteIDList(),
PulsarWriteChannels: master.Params.WriteTimeSyncChannels(),
PulsarWriteSubName: master.Params.WriteTimeSyncSubName(),
PulsarDMChannels: master.Params.DMTimeSyncChannels(),
PulsarK2SChannels: master.Params.K2STimeSyncChannels(),
DefaultRecordSize: defaultRecordSize,
MinimumAssignSize: minimumAssignSize,
SegmentThreshold: segmentThreshold,
SegmentExpireDuration: segmentExpireDuration,
NumOfChannel: numOfChannel,
NumOfQueryNode: nodeNum,
StatsChannels: statsChannel,
}

svr, err := master.CreateServer(ctx, &opt)
svr, err := master.CreateServer(ctx)
if err != nil {
log.Print("create server failed", zap.Error(err))
}
Expand All @@ -69,7 +35,7 @@ func main() {
cancel()
}()

if err := svr.Run(int64(master.Params.Port())); err != nil {
if err := svr.Run(int64(master.Params.Port)); err != nil {
log.Fatal("run server failed", zap.Error(err))
}

Expand Down
1 change: 1 addition & 0 deletions configs/advanced/master.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ master:
segment:
# old name: segmentThreshold: 536870912
size: 512 # MB
sizeFactor: 0.75
defaultSizePerRecord: 1024
minIDAssignCnt: 1024
maxIDAssignCnt: 16384
Expand Down
1 change: 1 addition & 0 deletions configs/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ master:
minimumAssignSize: 1048576
segmentThreshold: 536870912
segmentExpireDuration: 2000
segmentThresholdFactor: 0.75
querynodenum: 1
writenodenum: 1
statsChannels: "statistic"
Expand Down
1 change: 1 addition & 0 deletions internal/core/src/query/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ set(MILVUS_QUERY_SRCS
visitors/ShowExprVisitor.cpp
visitors/ExecExprVisitor.cpp
Plan.cpp
Search.cpp
)
add_library(milvus_query ${MILVUS_QUERY_SRCS})
target_link_libraries(milvus_query milvus_proto)
107 changes: 107 additions & 0 deletions internal/core/src/query/Search.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
#include "Search.h"
#include <knowhere/index/vector_index/adapter/VectorAdapter.h>
#include <knowhere/index/vector_index/VecIndexFactory.h>
#include "segcore/Reduce.h"

#include <faiss/utils/distances.h>
#include "utils/tools.h"

namespace milvus::query {
static faiss::ConcurrentBitsetPtr
create_bitmap_view(std::optional<const BitmapSimple*> bitmaps_opt, int64_t chunk_id) {
if (!bitmaps_opt.has_value()) {
return nullptr;
}
auto& bitmaps = *bitmaps_opt.value();
auto& src_vec = bitmaps.at(chunk_id);
auto dst = std::make_shared<faiss::ConcurrentBitset>(src_vec.size());
boost::to_block_range(src_vec, dst->mutable_data());
return dst;
}

using namespace segcore;
Status
QueryBruteForceImpl(const SegmentSmallIndex& segment,
const query::QueryInfo& info,
const float* query_data,
int64_t num_queries,
Timestamp timestamp,
std::optional<const BitmapSimple*> bitmaps_opt,
QueryResult& results) {
auto& record = segment.get_insert_record();
auto& schema = segment.get_schema();
auto& indexing_record = segment.get_indexing_record();
// step 1: binary search to find the barrier of the snapshot
auto ins_barrier = get_barrier(record, timestamp);
auto max_chunk = upper_div(ins_barrier, DefaultElementPerChunk);
// auto del_barrier = get_barrier(deleted_record_, timestamp);

#if 0
auto bitmap_holder = get_deleted_bitmap(del_barrier, timestamp, ins_barrier);
Assert(bitmap_holder);
auto bitmap = bitmap_holder->bitmap_ptr;
#endif

// step 2.1: get meta
// step 2.2: get which vector field to search
auto vecfield_offset_opt = schema.get_offset(info.field_id_);
Assert(vecfield_offset_opt.has_value());
auto vecfield_offset = vecfield_offset_opt.value();
auto& field = schema[vecfield_offset];
auto vec_ptr = record.get_vec_entity<float>(vecfield_offset);

Assert(field.get_data_type() == DataType::VECTOR_FLOAT);
auto dim = field.get_dim();
auto topK = info.topK_;
auto total_count = topK * num_queries;
// TODO: optimize

// step 3: small indexing search
std::vector<int64_t> final_uids(total_count, -1);
std::vector<float> final_dis(total_count, std::numeric_limits<float>::max());

auto max_indexed_id = indexing_record.get_finished_ack();
const auto& indexing_entry = indexing_record.get_indexing(vecfield_offset);
auto search_conf = indexing_entry.get_search_conf(topK);

for (int chunk_id = 0; chunk_id < max_indexed_id; ++chunk_id) {
auto indexing = indexing_entry.get_indexing(chunk_id);
auto src_data = vec_ptr->get_chunk(chunk_id).data();
auto dataset = knowhere::GenDataset(num_queries, dim, src_data);
auto bitmap_view = create_bitmap_view(bitmaps_opt, chunk_id);
auto ans = indexing->Query(dataset, search_conf, bitmap_view);
auto dis = ans->Get<float*>(milvus::knowhere::meta::DISTANCE);
auto uids = ans->Get<int64_t*>(milvus::knowhere::meta::IDS);
merge_into(num_queries, topK, final_dis.data(), final_uids.data(), dis, uids);
}

// step 4: brute force search where small indexing is unavailable
for (int chunk_id = max_indexed_id; chunk_id < max_chunk; ++chunk_id) {
std::vector<int64_t> buf_uids(total_count, -1);
std::vector<float> buf_dis(total_count, std::numeric_limits<float>::max());

faiss::float_maxheap_array_t buf = {(size_t)num_queries, (size_t)topK, buf_uids.data(), buf_dis.data()};
auto src_data = vec_ptr->get_chunk(chunk_id).data();
auto nsize =
chunk_id != max_chunk - 1 ? DefaultElementPerChunk : ins_barrier - chunk_id * DefaultElementPerChunk;
auto bitmap_view = create_bitmap_view(bitmaps_opt, chunk_id);
faiss::knn_L2sqr(query_data, src_data, dim, num_queries, nsize, &buf, bitmap_view);
merge_into(num_queries, topK, final_dis.data(), final_uids.data(), buf_dis.data(), buf_uids.data());
}

// step 5: convert offset to uids
for (auto& id : final_uids) {
if (id == -1) {
continue;
}
id = record.uids_[id];
}

results.result_ids_ = std::move(final_uids);
results.result_distances_ = std::move(final_dis);
results.topK_ = topK;
results.num_queries_ = num_queries;

return Status::OK();
}
} // namespace milvus::query
19 changes: 19 additions & 0 deletions internal/core/src/query/Search.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#pragma once
#include <optional>
#include "segcore/SegmentSmallIndex.h"
#include <boost/dynamic_bitset.hpp>

namespace milvus::query {
using BitmapChunk = boost::dynamic_bitset<>;
using BitmapSimple = std::deque<BitmapChunk>;

// note: c++17 don't support optional ref
Status
QueryBruteForceImpl(const segcore::SegmentSmallIndex& segment,
const QueryInfo& info,
const float* query_data,
int64_t num_queries,
Timestamp timestamp,
std::optional<const BitmapSimple*> bitmap_opt,
segcore::QueryResult& results);
} // namespace milvus::query
3 changes: 2 additions & 1 deletion internal/core/src/query/generated/ExecExprVisitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "segcore/SegmentSmallIndex.h"
#include <optional>
#include "query/ExprImpl.h"
#include "boost/dynamic_bitset.hpp"
#include "ExprVisitor.h"

namespace milvus::query {
Expand All @@ -22,7 +23,7 @@ class ExecExprVisitor : ExprVisitor {
visit(RangeExpr& expr) override;

public:
using RetType = std::vector<std::vector<bool>>;
using RetType = std::deque<boost::dynamic_bitset<>>;
explicit ExecExprVisitor(segcore::SegmentSmallIndex& segment) : segment_(segment) {
}
RetType
Expand Down
5 changes: 3 additions & 2 deletions internal/core/src/query/visitors/ExecExprVisitor.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "segcore/SegmentSmallIndex.h"
#include <optional>
#include "query/ExprImpl.h"
#include "boost/dynamic_bitset.hpp"
#include "query/generated/ExecExprVisitor.h"

namespace milvus::query {
Expand All @@ -10,7 +11,7 @@ namespace milvus::query {
namespace impl {
class ExecExprVisitor : ExprVisitor {
public:
using RetType = std::vector<std::vector<bool>>;
using RetType = std::deque<boost::dynamic_bitset<>>;
explicit ExecExprVisitor(segcore::SegmentSmallIndex& segment) : segment_(segment) {
}
RetType
Expand Down Expand Up @@ -66,7 +67,7 @@ ExecExprVisitor::ExecRangeVisitorImpl(RangeExprImpl<T>& expr, Func func) -> RetT
auto& field_meta = schema[field_offset];
auto vec_ptr = records.get_scalar_entity<T>(field_offset);
auto& vec = *vec_ptr;
std::vector<std::vector<bool>> results(vec.chunk_size());
RetType results(vec.chunk_size());
for (auto chunk_id = 0; chunk_id < vec.chunk_size(); ++chunk_id) {
auto& result = results[chunk_id];
result.resize(segcore::DefaultElementPerChunk);
Expand Down
9 changes: 8 additions & 1 deletion internal/core/src/query/visitors/ExecPlanNodeVisitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
#include "segcore/SegmentBase.h"
#include "query/generated/ExecPlanNodeVisitor.h"
#include "segcore/SegmentSmallIndex.h"
#include "query/generated/ExecExprVisitor.h"
#include "query/Search.h"

namespace milvus::query {

Expand Down Expand Up @@ -49,7 +51,12 @@ ExecPlanNodeVisitor::visit(FloatVectorANNS& node) {
auto& ph = placeholder_group_.at(0);
auto src_data = ph.get_blob<float>();
auto num_queries = ph.num_of_queries_;
segment->QueryBruteForceImpl(node.query_info_, src_data, num_queries, timestamp_, ret);
if (node.predicate_.has_value()) {
auto bitmap = ExecExprVisitor(*segment).call_child(*node.predicate_.value());
auto ptr = &bitmap;
QueryBruteForceImpl(*segment, node.query_info_, src_data, num_queries, timestamp_, ptr, ret);
}
QueryBruteForceImpl(*segment, node.query_info_, src_data, num_queries, timestamp_, std::nullopt, ret);
ret_ = ret;
}

Expand Down
1 change: 0 additions & 1 deletion internal/core/src/segcore/AckResponder.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,5 @@ class AckResponder {
std::shared_mutex mutex_;
std::set<int64_t> acks_ = {0};
std::atomic<int64_t> minimum_ = 0;
// std::atomic<int64_t> maximum_ = 0;
};
} // namespace milvus::segcore
1 change: 1 addition & 0 deletions internal/core/src/segcore/DeletedRecord.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "common/Schema.h"
#include "knowhere/index/vector_index/IndexIVF.h"
#include <memory>
#include "segcore/Record.h"

namespace milvus::segcore {

Expand Down
2 changes: 1 addition & 1 deletion internal/core/src/segcore/IndexingEntry.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class IndexingRecord {

// concurrent
int64_t
get_finished_ack() {
get_finished_ack() const {
return finished_ack_.GetAck();
}

Expand Down
1 change: 1 addition & 0 deletions internal/core/src/segcore/InsertRecord.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include "common/Schema.h"
#include "ConcurrentVector.h"
#include "AckResponder.h"
#include "segcore/Record.h"

namespace milvus::segcore {
struct InsertRecord {
Expand Down
21 changes: 21 additions & 0 deletions internal/core/src/segcore/Record.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#pragma once
#include "common/Schema.h"

namespace milvus::segcore {
template <typename RecordType>
inline int64_t
get_barrier(const RecordType& record, Timestamp timestamp) {
auto& vec = record.timestamps_;
int64_t beg = 0;
int64_t end = record.ack_responder_.GetAck();
while (beg < end) {
auto mid = (beg + end) / 2;
if (vec[mid] < timestamp) {
beg = mid + 1;
} else {
end = mid;
}
}
return beg;
}
} // namespace milvus::segcore
5 changes: 1 addition & 4 deletions internal/core/src/segcore/SegmentBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,7 @@ class SegmentBase {
virtual Status
Delete(int64_t reserved_offset, int64_t size, const int64_t* row_ids, const Timestamp* timestamps) = 0;

// query contains metadata of
virtual Status
QueryDeprecated(query::QueryDeprecatedPtr query, Timestamp timestamp, QueryResult& results) = 0;

public:
virtual Status
Search(const query::Plan* Plan,
const query::PlaceholderGroup* placeholder_groups[],
Expand Down
17 changes: 0 additions & 17 deletions internal/core/src/segcore/SegmentNaive.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,23 +219,6 @@ SegmentNaive::Delete(int64_t reserved_begin, int64_t size, const int64_t* uids_r
// return Status::OK();
}

template <typename RecordType>
int64_t
get_barrier(const RecordType& record, Timestamp timestamp) {
auto& vec = record.timestamps_;
int64_t beg = 0;
int64_t end = record.ack_responder_.GetAck();
while (beg < end) {
auto mid = (beg + end) / 2;
if (vec[mid] < timestamp) {
beg = mid + 1;
} else {
end = mid;
}
}
return beg;
}

Status
SegmentNaive::QueryImpl(query::QueryDeprecatedPtr query_info, Timestamp timestamp, QueryResult& result) {
auto ins_barrier = get_barrier(record_, timestamp);
Expand Down
6 changes: 4 additions & 2 deletions internal/core/src/segcore/SegmentNaive.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,12 @@ class SegmentNaive : public SegmentBase {
Status
Delete(int64_t reserverd_offset, int64_t size, const int64_t* row_ids, const Timestamp* timestamps) override;

// query contains metadata of
private:
// NOTE: now deprecated, remains for further copy out
Status
QueryDeprecated(query::QueryDeprecatedPtr query_info, Timestamp timestamp, QueryResult& results) override;
QueryDeprecated(query::QueryDeprecatedPtr query_info, Timestamp timestamp, QueryResult& results);

public:
Status
Search(const query::Plan* Plan,
const query::PlaceholderGroup* placeholder_groups[],
Expand Down
Loading

0 comments on commit 84f3d97

Please sign in to comment.