Skip to content

Commit

Permalink
feat: [2.4] support the mmap_enable param in the field schema (#2239)
Browse files Browse the repository at this point in the history
- issue: milvus-io/milvus#35273
- pr: #2238

Signed-off-by: SimFG <[email protected]>
  • Loading branch information
SimFG authored Aug 27, 2024
1 parent fda125f commit 2bd4134
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 8 deletions.
6 changes: 2 additions & 4 deletions pymilvus/bulk_writer/buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,10 +267,8 @@ def _persist_parquet(self, local_path: str, **kwargs):
buffer_row_count = kwargs.get("buffer_row_count", 1)
size_per_row = int(buffer_size / buffer_row_count) + 1
row_group_size = int(row_group_bytes / size_per_row)
if row_group_size < row_group_size_min:
row_group_size = row_group_size_min
if row_group_size > row_group_size_max:
row_group_size = row_group_size_max
row_group_size = max(row_group_size, row_group_size_min)
row_group_size = min(row_group_size, row_group_size_max)

# write to Parquet file
data_frame = pd.DataFrame(data=data)
Expand Down
5 changes: 5 additions & 0 deletions pymilvus/client/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ def __pack(self, raw: Any):

self.params[type_param.key] = json.loads(type_param.value)
else:
if type_param.key in ["mmap.enabled"]:
self.params["mmap_enabled"] = (
bool(type_param.value) if type_param.value.lower() != "false" else False
)
continue
self.params[type_param.key] = type_param.value
if type_param.key in ["dim"]:
self.params[type_param.key] = int(type_param.value)
Expand Down
11 changes: 9 additions & 2 deletions pymilvus/client/prepare.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,9 @@ def get_schema_from_collection_schema(
is_clustering_key=f.is_clustering_key,
)
for k, v in f.params.items():
kv_pair = common_types.KeyValuePair(key=str(k), value=str(v))
kv_pair = common_types.KeyValuePair(
key=str(k) if k != "mmap_enabled" else "mmap.enabled", value=str(v)
)
field_schema.type_params.append(kv_pair)

schema.fields.append(field_schema)
Expand Down Expand Up @@ -187,7 +189,12 @@ def get_field_schema(
type_params = field.get("params", {})
if not isinstance(type_params, dict):
raise ParamError(message="params should be dictionary type")
kvs = [common_types.KeyValuePair(key=str(k), value=str(v)) for k, v in type_params.items()]
kvs = [
common_types.KeyValuePair(
key=str(k) if k != "mmap_enabled" else "mmap.enabled", value=str(v)
)
for k, v in type_params.items()
]
field_schema.type_params.extend(kvs)

return field_schema, primary_field, auto_id_field
Expand Down
3 changes: 1 addition & 2 deletions pymilvus/orm/iterator.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ def extend_batch_size(batch_size: int, next_param: dict, to_extend_batch_size: b
extend_rate = DEFAULT_SEARCH_EXTENSION_RATE
if EF in next_param[PARAMS]:
real_batch = min(MAX_BATCH_SIZE, batch_size * extend_rate, next_param[PARAMS][EF])
if next_param[PARAMS][EF] > real_batch:
next_param[PARAMS][EF] = real_batch
next_param[PARAMS][EF] = min(real_batch, next_param[PARAMS][EF])
return real_batch
return min(MAX_BATCH_SIZE, batch_size * extend_rate)

Expand Down
2 changes: 2 additions & 0 deletions pymilvus/orm/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,8 @@ def __init__(self, name: str, dtype: DataType, description: str = "", **kwargs)
self.is_partition_key = kwargs.get("is_partition_key", False)
self.is_clustering_key = kwargs.get("is_clustering_key", False)
self.element_type = kwargs.get("element_type", None)
if "mmap_enabled" in kwargs:
self._type_params["mmap_enabled"] = kwargs["mmap_enabled"]
self._parse_type_params()

def __repr__(self) -> str:
Expand Down

0 comments on commit 2bd4134

Please sign in to comment.