Skip to content

Commit

Permalink
fix: Fix a crash issue of bulkinsert (#40304)
Browse files Browse the repository at this point in the history
issue: #40291
pr: #40331

Signed-off-by: yhmo <[email protected]>
  • Loading branch information
yhmo authored Mar 5, 2025
1 parent 493b7a5 commit a1b9e3c
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 10 deletions.
18 changes: 9 additions & 9 deletions internal/util/importutilv2/parquet/field_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func ReadNullableBoolData(pcr *FieldReader, count int64) (any, []bool, error) {
validData = append(validData, make([]bool, dataNums)...)
data = append(data, make([]bool, dataNums)...)
} else {
validData = append(validData, bytesToBoolArray(dataNums, boolReader.NullBitmapBytes())...)
validData = append(validData, bytesToValidData(dataNums, boolReader.NullBitmapBytes())...)
for i := 0; i < dataNums; i++ {
data = append(data, boolReader.Value(i))
}
Expand Down Expand Up @@ -357,37 +357,37 @@ func ReadNullableIntegerOrFloatData[T constraints.Integer | constraints.Float](p
switch chunk.DataType().ID() {
case arrow.INT8:
int8Reader := chunk.(*array.Int8)
validData = append(validData, bytesToBoolArray(dataNums, int8Reader.NullBitmapBytes())...)
validData = append(validData, bytesToValidData(dataNums, int8Reader.NullBitmapBytes())...)
for i := 0; i < dataNums; i++ {
data = append(data, T(int8Reader.Value(i)))
}
case arrow.INT16:
int16Reader := chunk.(*array.Int16)
validData = append(validData, bytesToBoolArray(dataNums, int16Reader.NullBitmapBytes())...)
validData = append(validData, bytesToValidData(dataNums, int16Reader.NullBitmapBytes())...)
for i := 0; i < dataNums; i++ {
data = append(data, T(int16Reader.Value(i)))
}
case arrow.INT32:
int32Reader := chunk.(*array.Int32)
validData = append(validData, bytesToBoolArray(dataNums, int32Reader.NullBitmapBytes())...)
validData = append(validData, bytesToValidData(dataNums, int32Reader.NullBitmapBytes())...)
for i := 0; i < dataNums; i++ {
data = append(data, T(int32Reader.Value(i)))
}
case arrow.INT64:
int64Reader := chunk.(*array.Int64)
validData = append(validData, bytesToBoolArray(dataNums, int64Reader.NullBitmapBytes())...)
validData = append(validData, bytesToValidData(dataNums, int64Reader.NullBitmapBytes())...)
for i := 0; i < dataNums; i++ {
data = append(data, T(int64Reader.Value(i)))
}
case arrow.FLOAT32:
float32Reader := chunk.(*array.Float32)
validData = append(validData, bytesToBoolArray(dataNums, float32Reader.NullBitmapBytes())...)
validData = append(validData, bytesToValidData(dataNums, float32Reader.NullBitmapBytes())...)
for i := 0; i < dataNums; i++ {
data = append(data, T(float32Reader.Value(i)))
}
case arrow.FLOAT64:
float64Reader := chunk.(*array.Float64)
validData = append(validData, bytesToBoolArray(dataNums, float64Reader.NullBitmapBytes())...)
validData = append(validData, bytesToValidData(dataNums, float64Reader.NullBitmapBytes())...)
for i := 0; i < dataNums; i++ {
data = append(data, T(float64Reader.Value(i)))
}
Expand Down Expand Up @@ -460,7 +460,7 @@ func ReadNullableStringData(pcr *FieldReader, count int64) (any, []bool, error)
validData = append(validData, make([]bool, dataNums)...)
data = append(data, make([]string, dataNums)...)
} else {
validData = append(validData, bytesToBoolArray(dataNums, stringReader.NullBitmapBytes())...)
validData = append(validData, bytesToValidData(dataNums, stringReader.NullBitmapBytes())...)
for i := 0; i < dataNums; i++ {
if stringReader.IsNull(i) {
data = append(data, "")
Expand Down Expand Up @@ -534,7 +534,7 @@ func ReadNullableVarcharData(pcr *FieldReader, count int64) (any, []bool, error)
validData = append(validData, make([]bool, dataNums)...)
data = append(data, make([]string, dataNums)...)
} else {
validData = append(validData, bytesToBoolArray(dataNums, stringReader.NullBitmapBytes())...)
validData = append(validData, bytesToValidData(dataNums, stringReader.NullBitmapBytes())...)
for i := 0; i < dataNums; i++ {
if stringReader.IsNull(i) {
data = append(data, "")
Expand Down
13 changes: 12 additions & 1 deletion internal/util/importutilv2/parquet/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,9 +265,20 @@ func isSchemaEqual(schema *schemapb.CollectionSchema, arrSchema *arrow.Schema) e
}

// todo(smellthemoon): use byte to store valid_data
func bytesToBoolArray(length int, bytes []byte) []bool {
func bytesToValidData(length int, bytes []byte) []bool {
bools := make([]bool, 0, length)
if len(bytes) == 0 {
// parquet field is "optional" or "required"
// for "required" field, the arrow.array.NullBitmapBytes() returns an empty byte list
// which means all the elements are valid. In this case, we simply construct an all-true bool array
for i := 0; i < length; i++ {
bools = append(bools, true)
}
return bools
}

// for "optional" field, the arrow.array.NullBitmapBytes() returns a non-empty byte list
// with each bit representing the existence of an element
for i := 0; i < length; i++ {
bit := (bytes[uint(i)/8] & BitMask[byte(i)%8]) != 0
bools = append(bools, bit)
Expand Down

0 comments on commit a1b9e3c

Please sign in to comment.