Skip to content

Commit

Permalink
Optimize large memory usage of InsertRecord by using vector instead o…
Browse files Browse the repository at this point in the history
…f unordered_map if InsertRecord used in sealed segment (milvus-io#19245)

Signed-off-by: aoiasd <[email protected]>

Signed-off-by: aoiasd <[email protected]>
  • Loading branch information
aoiasd authored Sep 23, 2022
1 parent 22477d4 commit 2b58bd5
Show file tree
Hide file tree
Showing 12 changed files with 321 additions and 234 deletions.
2 changes: 1 addition & 1 deletion internal/core/src/query/SearchOnSealed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ SearchOnSealedIndex(const Schema& schema,

void
SearchOnSealed(const Schema& schema,
const segcore::InsertRecord& record,
const segcore::InsertRecord<true>& record,
const SearchInfo& search_info,
const void* query_data,
int64_t num_queries,
Expand Down
2 changes: 1 addition & 1 deletion internal/core/src/query/SearchOnSealed.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ SearchOnSealedIndex(const Schema& schema,

void
SearchOnSealed(const Schema& schema,
const segcore::InsertRecord& record,
const segcore::InsertRecord<true>& record,
const SearchInfo& search_info,
const void* query_data,
int64_t num_queries,
Expand Down
23 changes: 0 additions & 23 deletions internal/core/src/segcore/FieldIndexing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,29 +73,6 @@ VectorFieldIndexing::get_search_params(int top_K) const {
return base_params;
}

void
IndexingRecord::UpdateResourceAck(int64_t chunk_ack, const InsertRecord& record) {
if (resource_ack_ >= chunk_ack) {
return;
}

std::unique_lock lck(mutex_);
int64_t old_ack = resource_ack_;
if (old_ack >= chunk_ack) {
return;
}
resource_ack_ = chunk_ack;
lck.unlock();

// std::thread([this, old_ack, chunk_ack, &record] {
for (auto& [field_offset, entry] : field_indexings_) {
auto vec_base = record.get_field_data_base(field_offset);
entry->BuildIndexRange(old_ack, chunk_ack, vec_base);
}
finished_ack_.AddSegment(old_ack, chunk_ack);
// }).detach();
}

template <typename T>
void
ScalarFieldIndexing<T>::BuildIndexRange(int64_t ack_beg, int64_t ack_end, const VectorBase* vec_base) {
Expand Down
23 changes: 22 additions & 1 deletion internal/core/src/segcore/FieldIndexing.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,29 @@ class IndexingRecord {
}

// concurrent, reentrant
template <bool is_sealed>
void
UpdateResourceAck(int64_t chunk_ack, const InsertRecord& record);
UpdateResourceAck(int64_t chunk_ack, const InsertRecord<is_sealed>& record) {
if (resource_ack_ >= chunk_ack) {
return;
}

std::unique_lock lck(mutex_);
int64_t old_ack = resource_ack_;
if (old_ack >= chunk_ack) {
return;
}
resource_ack_ = chunk_ack;
lck.unlock();

// std::thread([this, old_ack, chunk_ack, &record] {
for (auto& [field_offset, entry] : field_indexings_) {
auto vec_base = record.get_field_data_base(field_offset);
entry->BuildIndexRange(old_ack, chunk_ack, vec_base);
}
finished_ack_.AddSegment(old_ack, chunk_ack);
// }).detach();
}

// concurrent
int64_t
Expand Down
75 changes: 0 additions & 75 deletions internal/core/src/segcore/InsertRecord.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,79 +9,4 @@
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License

#include <sys/timeb.h>
#include "InsertRecord.h"

namespace milvus::segcore {

InsertRecord::InsertRecord(const Schema& schema, int64_t size_per_chunk)
: row_ids_(size_per_chunk), timestamps_(size_per_chunk) {
std::optional<FieldId> pk_field_id = schema.get_primary_field_id();

for (auto& field : schema) {
auto field_id = field.first;
auto& field_meta = field.second;
if (pk2offset_ == nullptr && pk_field_id.has_value() && pk_field_id.value() == field_id) {
switch (field_meta.get_data_type()) {
case DataType::INT64: {
pk2offset_ = std::make_unique<OffsetHashMap<int64_t>>();
break;
}
case DataType::VARCHAR: {
pk2offset_ = std::make_unique<OffsetHashMap<std::string>>();
break;
}
}
}
if (field_meta.is_vector()) {
if (field_meta.get_data_type() == DataType::VECTOR_FLOAT) {
this->append_field_data<FloatVector>(field_id, field_meta.get_dim(), size_per_chunk);
continue;
} else if (field_meta.get_data_type() == DataType::VECTOR_BINARY) {
this->append_field_data<BinaryVector>(field_id, field_meta.get_dim(), size_per_chunk);
continue;
} else {
PanicInfo("unsupported");
}
}
switch (field_meta.get_data_type()) {
case DataType::BOOL: {
this->append_field_data<bool>(field_id, size_per_chunk);
break;
}
case DataType::INT8: {
this->append_field_data<int8_t>(field_id, size_per_chunk);
break;
}
case DataType::INT16: {
this->append_field_data<int16_t>(field_id, size_per_chunk);
break;
}
case DataType::INT32: {
this->append_field_data<int32_t>(field_id, size_per_chunk);
break;
}
case DataType::INT64: {
this->append_field_data<int64_t>(field_id, size_per_chunk);
break;
}
case DataType::FLOAT: {
this->append_field_data<float>(field_id, size_per_chunk);
break;
}
case DataType::DOUBLE: {
this->append_field_data<double>(field_id, size_per_chunk);
break;
}
case DataType::VARCHAR: {
this->append_field_data<std::string>(field_id, size_per_chunk);
break;
}
default: {
PanicInfo("unsupported");
}
}
}
}

} // namespace milvus::segcore
Loading

0 comments on commit 2b58bd5

Please sign in to comment.