Skip to content

Commit

Permalink
test: refine import test (#33691)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhuwenxing authored Jun 7, 2024
1 parent b78d7ed commit 29efd69
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 543 deletions.
94 changes: 64 additions & 30 deletions tests/python_client/common/bulk_insert_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import jax.numpy as jnp
import pandas as pd
import random
from pathlib import Path
import uuid
from faker import Faker
from sklearn import preprocessing
from common.common_func import gen_unique_str
Expand Down Expand Up @@ -670,20 +672,30 @@ def gen_dict_data_by_data_field(data_fields, rows, start=0, float_vector=True, d


def gen_new_json_files(float_vector, rows, dim, data_fields, file_nums=1, array_length=None, file_size=None, err_type="", enable_dynamic_field=False, **kwargs):
schema = kwargs.get("schema", None)
dir_prefix = f"json-{uuid.uuid4()}"
data_source_new = f"{data_source}/{dir_prefix}"
schema_file = f"{data_source_new}/schema.json"
Path(schema_file).parent.mkdir(parents=True, exist_ok=True)
if schema is not None:
data = schema.to_dict()
with open(schema_file, "w") as f:
json.dump(data, f)
files = []
if file_size is not None:
rows = 5000
start_uid = 0
for i in range(file_nums):
file_name = f"data-fields-{len(data_fields)}-rows-{rows}-dim-{dim}-file-num-{i}-{int(time.time())}.json"
file = f"{data_source}/{file_name}"
file = f"{data_source_new}/{file_name}"
Path(file).parent.mkdir(parents=True, exist_ok=True)
data = gen_dict_data_by_data_field(data_fields=data_fields, rows=rows, start=start_uid, float_vector=float_vector, dim=dim, array_length=array_length, enable_dynamic_field=enable_dynamic_field, **kwargs)
# log.info(f"data: {data}")
with open(file, "w") as f:
json.dump(data, f)
# get the file size
if file_size is not None:
batch_file_size = os.path.getsize(f"{data_source}/{file_name}")
batch_file_size = os.path.getsize(f"{data_source_new}/{file_name}")
log.info(f"file_size with rows {rows} for {file_name}: {batch_file_size/1024/1024} MB")
# calculate the rows to be generated
total_batch = int(file_size*1024*1024*1024/batch_file_size)
Expand All @@ -693,17 +705,27 @@ def gen_new_json_files(float_vector, rows, dim, data_fields, file_nums=1, array_
for _ in range(total_batch):
all_data += data
file_name = f"data-fields-{len(data_fields)}-rows-{total_rows}-dim-{dim}-file-num-{i}-{int(time.time())}.json"
with open(f"{data_source}/{file_name}", "w") as f:
with open(f"{data_source_new}/{file_name}", "w") as f:
json.dump(all_data, f)
batch_file_size = os.path.getsize(f"{data_source}/{file_name}")
batch_file_size = os.path.getsize(f"{data_source_new}/{file_name}")
log.info(f"file_size with rows {total_rows} for {file_name}: {batch_file_size/1024/1024/1024} GB")
files.append(file_name)
start_uid += rows
files = [f"{dir_prefix}/{f}" for f in files]
return files


def gen_npy_files(float_vector, rows, dim, data_fields, file_size=None, file_nums=1, err_type="", force=False, enable_dynamic_field=False, include_meta=True):
def gen_npy_files(float_vector, rows, dim, data_fields, file_size=None, file_nums=1, err_type="", force=False, enable_dynamic_field=False, include_meta=True, **kwargs):
# gen numpy files
schema = kwargs.get("schema", None)
u_id = f"numpy-{uuid.uuid4()}"
data_source_new = f"{data_source}/{u_id}"
schema_file = f"{data_source_new}/schema.json"
Path(schema_file).parent.mkdir(parents=True, exist_ok=True)
if schema is not None:
data = schema.to_dict()
with open(schema_file, "w") as f:
json.dump(data, f)
files = []
start_uid = 0
if file_nums == 1:
Expand All @@ -723,47 +745,47 @@ def gen_npy_files(float_vector, rows, dim, data_fields, file_size=None, file_num
if "fp16" in data_field:
float_vector = True
vector_type = "fp16"
file_name = gen_vectors_in_numpy_file(dir=data_source, data_field=data_field, float_vector=float_vector,
file_name = gen_vectors_in_numpy_file(dir=data_source_new, data_field=data_field, float_vector=float_vector,
vector_type=vector_type, rows=rows, dim=dim, force=force)
elif data_field == DataField.string_field: # string field for numpy not supported yet at 2022-10-17
file_name = gen_string_in_numpy_file(dir=data_source, data_field=data_field, rows=rows, force=force)
file_name = gen_string_in_numpy_file(dir=data_source_new, data_field=data_field, rows=rows, force=force)
elif data_field == DataField.bool_field:
file_name = gen_bool_in_numpy_file(dir=data_source, data_field=data_field, rows=rows, force=force)
file_name = gen_bool_in_numpy_file(dir=data_source_new, data_field=data_field, rows=rows, force=force)
elif data_field == DataField.json_field:
file_name = gen_json_in_numpy_file(dir=data_source, data_field=data_field, rows=rows, force=force)
file_name = gen_json_in_numpy_file(dir=data_source_new, data_field=data_field, rows=rows, force=force)
else:
file_name = gen_int_or_float_in_numpy_file(dir=data_source, data_field=data_field,
file_name = gen_int_or_float_in_numpy_file(dir=data_source_new, data_field=data_field,
rows=rows, force=force)
files.append(file_name)
if enable_dynamic_field and include_meta:
file_name = gen_dynamic_field_in_numpy_file(dir=data_source, rows=rows, force=force)
file_name = gen_dynamic_field_in_numpy_file(dir=data_source_new, rows=rows, force=force)
files.append(file_name)
if file_size is not None:
batch_file_size = 0
for file_name in files:
batch_file_size += os.path.getsize(f"{data_source}/{file_name}")
batch_file_size += os.path.getsize(f"{data_source_new}/{file_name}")
log.info(f"file_size with rows {rows} for {files}: {batch_file_size/1024/1024} MB")
# calculate the rows to be generated
total_batch = int(file_size*1024*1024*1024/batch_file_size)
total_rows = total_batch * rows
new_files = []
for f in files:
arr = np.load(f"{data_source}/{f}")
arr = np.load(f"{data_source_new}/{f}")
all_arr = np.concatenate([arr for _ in range(total_batch)], axis=0)
file_name = f
np.save(f"{data_source}/{file_name}", all_arr)
np.save(f"{data_source_new}/{file_name}", all_arr)
log.info(f"file_name: {file_name} data type: {all_arr.dtype} data shape: {all_arr.shape}")
new_files.append(file_name)
files = new_files
batch_file_size = 0
for file_name in files:
batch_file_size += os.path.getsize(f"{data_source}/{file_name}")
batch_file_size += os.path.getsize(f"{data_source_new}/{file_name}")
log.info(f"file_size with rows {total_rows} for {files}: {batch_file_size/1024/1024/1024} GB")

else:
for i in range(file_nums):
subfolder = gen_subfolder(root=data_source, dim=dim, rows=rows, file_num=i)
dir = f"{data_source}/{subfolder}"
subfolder = gen_subfolder(root=data_source_new, dim=dim, rows=rows, file_num=i)
dir = f"{data_source_new}/{subfolder}"
for data_field in data_fields:
if DataField.vec_field in data_field:
file_name = gen_vectors_in_numpy_file(dir=dir, data_field=data_field, float_vector=float_vector, rows=rows, dim=dim, force=force)
Expand All @@ -774,6 +796,7 @@ def gen_npy_files(float_vector, rows, dim, data_fields, file_size=None, file_num
file_name = gen_dynamic_field_in_numpy_file(dir=dir, rows=rows, start=start_uid, force=force)
files.append(f"{subfolder}/{file_name}")
start_uid += rows
files = [f"{u_id}/{f}" for f in files]
return files


Expand All @@ -784,7 +807,17 @@ def gen_dynamic_field_data_in_parquet_file(rows, start=0):
return data


def gen_parquet_files(float_vector, rows, dim, data_fields, file_size=None, row_group_size=None, file_nums=1, array_length=None, err_type="", enable_dynamic_field=False, include_meta=True, sparse_format="doc"):
def gen_parquet_files(float_vector, rows, dim, data_fields, file_size=None, row_group_size=None, file_nums=1, array_length=None, err_type="", enable_dynamic_field=False, include_meta=True, sparse_format="doc", **kwargs):
schema = kwargs.get("schema", None)
u_id = f"parquet-{uuid.uuid4()}"
data_source_new = f"{data_source}/{u_id}"
schema_file = f"{data_source_new}/schema.json"
Path(schema_file).parent.mkdir(parents=True, exist_ok=True)
if schema is not None:
data = schema.to_dict()
with open(schema_file, "w") as f:
json.dump(data, f)

# gen numpy files
if err_type == "":
err_type = "none"
Expand All @@ -805,12 +838,12 @@ def gen_parquet_files(float_vector, rows, dim, data_fields, file_size=None, row_
log.info(f"df: \n{df}")
file_name = f"data-fields-{len(data_fields)}-rows-{rows}-dim-{dim}-file-num-{file_nums}-error-{err_type}-{int(time.time())}.parquet"
if row_group_size is not None:
df.to_parquet(f"{data_source}/{file_name}", engine='pyarrow', row_group_size=row_group_size)
df.to_parquet(f"{data_source_new}/{file_name}", engine='pyarrow', row_group_size=row_group_size)
else:
df.to_parquet(f"{data_source}/{file_name}", engine='pyarrow')
df.to_parquet(f"{data_source_new}/{file_name}", engine='pyarrow')
# get the file size
if file_size is not None:
batch_file_size = os.path.getsize(f"{data_source}/{file_name}")
batch_file_size = os.path.getsize(f"{data_source_new}/{file_name}")
log.info(f"file_size with rows {rows} for {file_name}: {batch_file_size/1024/1024} MB")
# calculate the rows to be generated
total_batch = int(file_size*1024*1024*1024/batch_file_size)
Expand All @@ -819,10 +852,10 @@ def gen_parquet_files(float_vector, rows, dim, data_fields, file_size=None, row_
file_name = f"data-fields-{len(data_fields)}-rows-{total_rows}-dim-{dim}-file-num-{file_nums}-error-{err_type}-{int(time.time())}.parquet"
log.info(f"all df: \n {all_df}")
if row_group_size is not None:
all_df.to_parquet(f"{data_source}/{file_name}", engine='pyarrow', row_group_size=row_group_size)
all_df.to_parquet(f"{data_source_new}/{file_name}", engine='pyarrow', row_group_size=row_group_size)
else:
all_df.to_parquet(f"{data_source}/{file_name}", engine='pyarrow')
batch_file_size = os.path.getsize(f"{data_source}/{file_name}")
all_df.to_parquet(f"{data_source_new}/{file_name}", engine='pyarrow')
batch_file_size = os.path.getsize(f"{data_source_new}/{file_name}")
log.info(f"file_size with rows {total_rows} for {file_name}: {batch_file_size/1024/1024} MB")
files.append(file_name)
else:
Expand All @@ -837,11 +870,12 @@ def gen_parquet_files(float_vector, rows, dim, data_fields, file_size=None, row_
df = pd.DataFrame(all_field_data)
file_name = f"data-fields-{len(data_fields)}-rows-{rows}-dim-{dim}-file-num-{i}-error-{err_type}-{int(time.time())}.parquet"
if row_group_size is not None:
df.to_parquet(f"{data_source}/{file_name}", engine='pyarrow', row_group_size=row_group_size)
df.to_parquet(f"{data_source_new}/{file_name}", engine='pyarrow', row_group_size=row_group_size)
else:
df.to_parquet(f"{data_source}/{file_name}", engine='pyarrow')
df.to_parquet(f"{data_source_new}/{file_name}", engine='pyarrow')
files.append(file_name)
start_uid += rows
files = [f"{u_id}/{f}" for f in files]
return files


Expand Down Expand Up @@ -931,7 +965,7 @@ def prepare_bulk_insert_new_json_files(minio_endpoint="", bucket_name="milvus-bu


def prepare_bulk_insert_numpy_files(minio_endpoint="", bucket_name="milvus-bucket", rows=100, dim=128, enable_dynamic_field=False, file_size=None,
data_fields=[DataField.vec_field], float_vector=True, file_nums=1, force=False, include_meta=True):
data_fields=[DataField.vec_field], float_vector=True, file_nums=1, force=False, include_meta=True, **kwargs):
"""
Generate column based files based on params in numpy format and copy them to the minio
Note: each field in data_fields would be generated one numpy file.
Expand Down Expand Up @@ -963,14 +997,14 @@ def prepare_bulk_insert_numpy_files(minio_endpoint="", bucket_name="milvus-bucke
"""
files = gen_npy_files(rows=rows, dim=dim, float_vector=float_vector, file_size=file_size,
data_fields=data_fields, enable_dynamic_field=enable_dynamic_field,
file_nums=file_nums, force=force, include_meta=include_meta)
file_nums=file_nums, force=force, include_meta=include_meta, **kwargs)

copy_files_to_minio(host=minio_endpoint, r_source=data_source, files=files, bucket_name=bucket_name, force=force)
return files


def prepare_bulk_insert_parquet_files(minio_endpoint="", bucket_name="milvus-bucket", rows=100, dim=128, array_length=None, file_size=None, row_group_size=None,
enable_dynamic_field=False, data_fields=[DataField.vec_field], float_vector=True, file_nums=1, force=False, include_meta=True, sparse_format="doc"):
enable_dynamic_field=False, data_fields=[DataField.vec_field], float_vector=True, file_nums=1, force=False, include_meta=True, sparse_format="doc", **kwargs):
"""
Generate column based files based on params in parquet format and copy them to the minio
Note: each field in data_fields would be generated one parquet file.
Expand Down Expand Up @@ -1002,7 +1036,7 @@ def prepare_bulk_insert_parquet_files(minio_endpoint="", bucket_name="milvus-buc
"""
files = gen_parquet_files(rows=rows, dim=dim, float_vector=float_vector, enable_dynamic_field=enable_dynamic_field,
data_fields=data_fields, array_length=array_length, file_size=file_size, row_group_size=row_group_size,
file_nums=file_nums, include_meta=include_meta, sparse_format=sparse_format)
file_nums=file_nums, include_meta=include_meta, sparse_format=sparse_format, **kwargs)
copy_files_to_minio(host=minio_endpoint, r_source=data_source, files=files, bucket_name=bucket_name, force=force)
return files

Expand Down
Loading

0 comments on commit 29efd69

Please sign in to comment.