Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: milvus crashes when importing a parquet file with "required field" into a collection with nullable field #40291

Open
1 task done
yhmo opened this issue Mar 3, 2025 · 3 comments
Assignees
Labels
kind/bug Issues or changes related a bug severity/critical Critical, lead to crash, data missing, wrong result, function totally doesn't work.
Milestone

Comments

@yhmo
Copy link
Contributor

yhmo commented Mar 3, 2025

Is there an existing issue for this?

  • I have searched the existing issues

Environment

- Milvus version: v2.5.4
- Deployment mode(standalone or cluster):
- MQ type(rocksmq, pulsar or kafka):    
- SDK version(e.g. pymilvus v2.0.0rc2): 2.5.4
- OS(Ubuntu or CentOS): 
- CPU/Memory: 
- GPU: 
- Others:

Current Behavior

milvus crashes when importing a parquet file with "required field" into a collection with nullable field

Expected Behavior

No response

Steps To Reproduce

1. create a collection with a nullable field
2. generate a parquet file according to the collection schema, set "optional field" for the parquet file
3. call bulk_import() to import the parquet file
4. milvus crashes immediately

Milvus Log

[2025/03/03 04:02:20.177 +00:00] [INFO] [datacoord/import_checker.go:214] ["import job start to execute"] [jobID=456324347464752997] [jobTimeCost/pending=256.713014ms]
[2025/03/03 04:02:20.177 +00:00] [INFO] [datacoord/import_scheduler.go:177] ["processing pending preimport task..."] [taskID=456324347464753000] [jobID=456324347464752997] [collectionID=456324347464752989] [type=PreImportTask] [nodeID=0]
[2025/03/03 04:02:20.178 +00:00] [INFO] [datanode/services.go:436] ["datanode receive preimport request"] [traceID=89b5639204db8ef7abefaeef1e119b14] [taskID=456324347464753000] [jobID=456324347464752997] [collectionID=456324347464752989] [partitionIDs="[456324347464752990]"] [vchannels="[by-dev-rootcoord-dml_3_456324347464752989v0]"] [files="[{\"id\":456324347464752998,\"paths\":[\"1.parquet\"]}]"]
[2025/03/03 04:02:20.178 +00:00] [INFO] [datanode/services.go:450] ["datanode added preimport task"] [traceID=89b5639204db8ef7abefaeef1e119b14] [taskID=456324347464753000] [jobID=456324347464752997] [collectionID=456324347464752989] [partitionIDs="[456324347464752990]"] [vchannels="[by-dev-rootcoord-dml_3_456324347464752989v0]"] [files="[{\"id\":456324347464752998,\"paths\":[\"1.parquet\"]}]"]
[2025/03/03 04:02:20.185 +00:00] [INFO] [datacoord/import_scheduler.go:194] ["preimport task start to execute"] [taskID=456324347464753000] [jobID=456324347464752997] [collectionID=456324347464752989] [type=PreImportTask] [nodeID=0] [scheduledNodeID=1] [taskTimeCost/pending=32.231369ms]
[2025/03/03 04:02:20.680 +00:00] [DEBUG] [querynodev2/services.go:73] ["QueryNode current state"] [NodeID=1] [StateCode=Healthy]
[2025/03/03 04:02:20.952 +00:00] [DEBUG] [httpserver/handler_v2.go:255] ["high level restful api, read parameters from request body, then start to handle."] [traceID=573780e2c306418922a05090b240e87f] [url=/v2/vectordb/jobs/import/describe]
[2025/03/03 04:02:20.952 +00:00] [DEBUG] [httpserver/handler_v2.go:364] ["high level restful api, try to do a grpc call"] [traceID=573780e2c306418922a05090b240e87f]
[2025/03/03 04:02:20.952 +00:00] [INFO] [proxy/impl.go:6670] ["GetImportProgress received"] [traceID=573780e2c306418922a05090b240e87f] [jobID=456324347464752997]
[2025/03/03 04:02:20.953 +00:00] [INFO] [datacoord/services.go:1808] ["GetImportProgress done"] [traceID=573780e2c306418922a05090b240e87f] [jobID=456324347464752997] [resp="status:{} state:Importing progress:10 collection_name:\"AAA\" start_time:\"2025-03-03T04:02:19Z\""]
[2025/03/03 04:02:20.954 Z] [GIN] [/v2/vectordb/jobs/import/describe] [traceID=573780e2c306418922a05090b240e87f] [code=200] [latency=2.345839ms] [client=192.168.240.1] [method=POST] [error=]
[2025/03/03 04:02:20.987 +00:00] [INFO] [importv2/task_preimport.go:128] ["start to preimport"] [taskID=456324347464753000] [jobID=456324347464752997] [collectionID=456324347464752989] [type=PreImportTask] [bufferSize=16777216] [schema="name:\"AAA\" fields:{fieldID:100 name:\"id\" is_primary_key:true data_type:Int64} fields:{fieldID:101 name:\"vector\" data_type:FloatVector type_params:{key:\"dim\" value:\"4\"}} fields:{fieldID:102 name:\"flag\" data_type:Bool nullable:true}"]
[2025/03/03 04:02:20.999 +00:00] [INFO] [parquet/reader.go:65] ["create parquet reader done"] ["row group num"=1] ["num rows"=1]
[2025/03/03 04:02:21.003 +00:00] [INFO] [datacoord/channel_manager.go:613] ["Check ToWatch/ToRelease channel operations progress"] ["channel count"=2] ["channel names"="[by-dev-rootcoord-dml_3_456324347464752989v0,by-dev-rootcoord-dml_2_456324347464752297v0]"]
[2025/03/03 04:02:21.006 +00:00] [INFO] [datanode/services.go:384] ["DataNode receives CheckChannelOperationProgress"] [traceID=e4746433334f42a3e593163b7e000509] [channel=by-dev-rootcoord-dml_3_456324347464752989v0] [operation=ToWatch]
[2025/03/03 04:02:21.007 +00:00] [INFO] [datanode/services.go:384] ["DataNode receives CheckChannelOperationProgress"] [traceID=326bb4a941cd30dc0ec30adf879835c1] [channel=by-dev-rootcoord-dml_2_456324347464752297v0] [operation=ToRelease]
[2025/03/03 04:02:21.007 +00:00] [INFO] [datacoord/channel_manager.go:688] ["Got channel operation progress"] [opID=456324347853013050] [nodeID=1] ["check operation"=ToWatch] [channel=by-dev-rootcoord-dml_3_456324347464752989v0] ["got state"=WatchSuccess] [progress=100]
[2025/03/03 04:02:21.007 +00:00] [INFO] [datacoord/channel_manager.go:688] ["Got channel operation progress"] [opID=456324347853013051] [nodeID=1] ["check operation"=ToRelease] [channel=by-dev-rootcoord-dml_2_456324347464752297v0] ["got state"=ReleaseSuccess] [progress=0]
[2025/03/03 04:02:21.007 +00:00] [INFO] [datacoord/channel_manager.go:648] ["Finish to Check ToWatch/ToRelease channel operations progress"] ["channel count"=2] ["channel names"="[by-dev-rootcoord-dml_3_456324347464752989v0,by-dev-rootcoord-dml_2_456324347464752297v0]"]
[2025/03/03 04:02:21.027 +00:00] [DEBUG] [gc/gc_tuner.go:91] ["GC Tune done"] ["previous GOGC"=200] ["heapuse "=80] ["total memory"=205] ["next GC"=141] ["new GOGC"=200] [gc-pause=1.382065ms] [gc-pause-end=1740974541025274834]
[2025/03/03 04:02:21.066 +00:00] [ERROR] [conc/options.go:54] ["Conc pool panicked"] [panic="runtime error: index out of range [0] with length 0"] [stack="github.com/milvus-io/milvus/pkg/util/conc.(*poolOption).antsOptions.func1\n\t/workspace/source/pkg/util/conc/options.go:54\ngithub.com/panjf2000/ants/v2.(*goWorker).run.func1.1\n\t/go/pkg/mod/github.com/panjf2000/ants/[email protected]/worker.go:54\nruntime.gopanic\n\t/go/pkg/mod/golang.org/[email protected]/src/runtime/panic.go:770\ngithub.com/milvus-io/milvus/pkg/util/conc.(*Pool[...]).Submit.func1.1\n\t/workspace/source/pkg/util/conc/pool.go:75\nruntime.gopanic\n\t/go/pkg/mod/golang.org/[email protected]/src/runtime/panic.go:770\nruntime.goPanicIndexU\n\t/go/pkg/mod/golang.org/[email protected]/src/runtime/panic.go:120\ngithub.com/milvus-io/milvus/internal/util/importutilv2/parquet.bytesToBoolArray\n\t/workspace/source/internal/util/importutilv2/parquet/util.go:272\ngithub.com/milvus-io/milvus/internal/util/importutilv2/parquet.ReadNullableBoolData\n\t/workspace/source/internal/util/importutilv2/parquet/field_reader.go:262\ngithub.com/milvus-io/milvus/internal/util/importutilv2/parquet.(*FieldReader).Next\n\t/workspace/source/internal/util/importutilv2/parquet/field_reader.go:74\ngithub.com/milvus-io/milvus/internal/util/importutilv2/parquet.(*reader).Read\n\t/workspace/source/internal/util/importutilv2/parquet/reader.go:102\ngithub.com/milvus-io/milvus/internal/datanode/importv2.(*PreImportTask).readFileStat\n\t/workspace/source/internal/datanode/importv2/task_preimport.go:186\ngithub.com/milvus-io/milvus/internal/datanode/importv2.(*PreImportTask).Execute.func2\n\t/workspace/source/internal/datanode/importv2/task_preimport.go:146\ngithub.com/milvus-io/milvus/internal/datanode/importv2.(*PreImportTask).Execute.func3\n\t/workspace/source/internal/datanode/importv2/task_preimport.go:162\ngithub.com/milvus-io/milvus/pkg/util/conc.(*Pool[...]).Submit.func1\n\t/workspace/source/pkg/util/conc/pool.go:82\ngithub.com/panjf2000/ants/v2.(*goWorker).run.func1\n\t/go/pkg/mod/github.com/panjf2000/ants/[email protected]/worker.go:67"]

Anything else?

No response

@yhmo yhmo added kind/bug Issues or changes related a bug needs-triage Indicates an issue or PR lacks a `triage/foo` label and requires one. labels Mar 3, 2025
Copy link

dosubot bot commented Mar 3, 2025

The crash you're experiencing when importing a Parquet file with "optional fields" into a Milvus collection with nullable fields seems to be related to an "index out of range" error in the bytesToBoolArray function. This function is responsible for converting a byte array into a boolean array by processing each bit in the byte array. The error occurs if the length parameter exceeds the number of bits available in the bytes array, leading to an attempt to access an index that does not exist [1].

To address this issue, you might want to ensure that the Parquet file's schema and data align correctly with the collection's schema, particularly regarding the handling of nullable fields. Additionally, reviewing the implementation of the bytesToBoolArray function to handle cases where the byte array might be shorter than expected could help prevent such errors.

To continue talking to Dosu, mention @dosu.


Help Dosu learn! Give it feedback: Great Response | Irrelevant Answer | Incorrect Sources | Too Verbose | Hallucination | Bug Report | Other

@dosubot dosubot bot added the severity/critical Critical, lead to crash, data missing, wrong result, function totally doesn't work. label Mar 3, 2025
@yhmo
Copy link
Contributor Author

yhmo commented Mar 3, 2025

  1. create a collection with a nullable field
from pymilvus import (
MilvusClient,DataType,CollectionSchema,FieldSchema
)

client = MilvusClient(uri="http://localhost:19530")

collection_name = "AAA"
dim = 4
client.drop_collection(collection_name=collection_name)


fields = [
    FieldSchema(name="id", dtype=DataType.INT64, is_primary=True),
    FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=dim),
    FieldSchema(name="flag", dtype=DataType.BOOL, nullable=True),
]

schema = CollectionSchema(fields, enable_dynamic_field=False)
client.create_collection(
    collection_name=collection_name,
    schema=schema,
)
print(f"Collection '{collection_name}' created")
  1. Download this zip and extract it to /tmp path, there are two parquet files in the zip: 1.parquet and 2.parquet
    bulk_writer.zip

  2. Use this script to view the two files

import pyarrow.parquet as pq

parquet_file = pq.ParquetFile("/tmp/bulk_writer/2.parquet")
table = parquet_file.read()
print("=============================================================")
print(table)
print("=============================================================")
print(parquet_file.schema)
print(parquet_file.num_row_groups)

The parquet content of `1.parquet:

pyarrow.Table
id: int64 not null
flag: bool not null
vector: list<element: float>
  child 0, element: float
----
id: [[1]]
flag: [[true]]
vector: [[[0.1,0.2,0.3,0.4]]]
=============================================================
<pyarrow._parquet.ParquetSchema object at 0x7f07c862a740>
required group field_id=-1 schema {
  required int64 field_id=-1 id;
  required boolean field_id=-1 flag;
  optional group field_id=-1 vector (List) {
    repeated group field_id=-1 list {
      optional float field_id=-1 element;
    }
  }
}

The parquet content of 2.parquet:

pyarrow.Table
id: int64
flag: bool
vector: list<element: float>
  child 0, element: float
----
id: [[1]]
flag: [[true]]
vector: [[[0.1,0.2,0.3,0.4]]]
=============================================================
<pyarrow._parquet.ParquetSchema object at 0x7f004d7bb440>
required group field_id=-1 schema {
  optional int64 field_id=-1 id;
  optional boolean field_id=-1 flag;
  optional group field_id=-1 vector (List) {
    repeated group field_id=-1 list {
      optional float field_id=-1 element;
    }
  }
}

The difference is the "id" and "flag" is "not null" in 1.parquet
In 1.parquet, the field "flag" is defined as "required".
In 2.parquet, the field "flag" is defined as "optional".

  1. use this script to import the two files respectively
from pymilvus.bulk_writer import (
    RemoteBulkWriter,
    bulk_import,
    get_import_progress,
)

import time
from minio import Minio
from typing import Any

def arg_parse(arg: Any):
    return arg[0] if isinstance(arg, tuple) else arg

connect_param=RemoteBulkWriter.S3ConnectParam(
                endpoint="0.0.0.0:9000",
                access_key="minioadmin",
                secret_key="minioadmin",
                bucket_name="a-bucket",
            )
minio_client = Minio(
                    endpoint=arg_parse(connect_param._endpoint),
                    access_key=arg_parse(connect_param._access_key),
                    secret_key=arg_parse(connect_param._secret_key),
                    secure=arg_parse(connect_param._secure),
                    session_token=arg_parse(connect_param._session_token),
                    region=arg_parse(connect_param._region),
                    http_client=arg_parse(connect_param._http_client),
                    credentials=arg_parse(connect_param._credentials),
                )

file_name = "1.parquet"
minio_client.fput_object(
                bucket_name="a-bucket",
                object_name=file_name,
                file_path="/tmp/bulk_writer/"+file_name,
            )
url="http://127.0.0.1:19530"
resp = bulk_import(
        url=url,
        collection_name=collection_name,
        files=[[file_name]],
    )
job_id = resp.json()['data']['jobId']
print(f"Create a bulkinsert job, job id: {job_id}")

while True:
    print("Wait 1 second to check bulkinsert job state...")
    time.sleep(1)

    print(f"\n===================== Get import job progress ====================")
    resp = get_import_progress(
        url=url,
        job_id=job_id,
    )

    state = resp.json()['data']['state']
    progress = resp.json()['data']['progress']
    if state == "Importing":
        print(f"The job {job_id} is importing... {progress}%")
        continue
    if state == "Failed":
        reason = resp.json()['data']['reason']
        print(f"The job {job_id} failed, reason: {reason}")
        break
    if state == "Completed" and progress == 100:
        print(f"The job {job_id} completed")
        break

Set file_name = "2.parquet", milvus works well, the data is imported successfully.
Set file_name = "1.parquet", milvus will crash immediately.

@yhmo yhmo self-assigned this Mar 3, 2025
@yanliang567 yanliang567 removed the needs-triage Indicates an issue or PR lacks a `triage/foo` label and requires one. label Mar 3, 2025
@yanliang567 yanliang567 added this to the 2.5.6 milestone Mar 3, 2025
@yanliang567
Copy link
Contributor

/assign @zhuwenxing
please check the testcase and add a new one if needed
/unassign

@yhmo yhmo changed the title [Bug]: milvus crashes when importing a parquet file with "optional field" into a collection with nullable field [Bug]: milvus crashes when importing a parquet file with "required field" into a collection with nullable field Mar 3, 2025
sre-ci-robot pushed a commit that referenced this issue Mar 5, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind/bug Issues or changes related a bug severity/critical Critical, lead to crash, data missing, wrong result, function totally doesn't work.
Projects
None yet
Development

No branches or pull requests

3 participants