Skip to content

Commit

Permalink
fix: merge insert operation fails if any of the payload columns are n…
Browse files Browse the repository at this point in the history
…on-nullable (#1899)

This is because we do an outer join and the output columns of an outer
join are always nullable. As a result we try and insert nullable data
and get a schema mismatch. This fix avoids this problem by restoring the
schema after we select the appropriate rows from the outer join output.
  • Loading branch information
westonpace authored Feb 1, 2024
1 parent 2bcba5e commit a0104bd
Showing 1 changed file with 19 additions and 2 deletions.
21 changes: 19 additions & 2 deletions rust/lance/src/dataset/write/merge_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,8 @@ struct Merger {
delete_expr: Option<Arc<dyn PhysicalExpr>>,
// The parameters controlling the merge
params: MergeInsertParams,
// The schema of the dataset, used to recover nullability information
schema: Arc<Schema>,
}

impl Merger {
Expand All @@ -442,6 +444,7 @@ impl Merger {
deleted_rows: Arc::new(Mutex::new(RoaringTreemap::new())),
delete_expr,
params,
schema,
})
}

Expand Down Expand Up @@ -527,11 +530,25 @@ impl Merger {
let row_ids = matched.column(row_id_col).as_primitive::<UInt64Type>();
deleted_row_ids.extend(row_ids.values());
let matched = matched.project(&right_cols)?;
// The payload columns of an outer join are always nullable. We need to restore
// non-nullable to columns that were originally non-nullable. This should be safe
// since the not_matched rows should all be valid on the right_cols
//
// Sadly we can't use with_schema because it doesn't let you toggle nullability
let matched = RecordBatch::try_new(
self.schema.clone(),
Vec::from_iter(matched.columns().iter().cloned()),
)?;
batches.push(Ok(matched));
}
if self.params.insert_not_matched {
let not_matched = arrow::compute::filter_record_batch(&batch, &right_only)?;
let not_matched = not_matched.project(&right_cols)?;
// See comment above explaining this schema replacement
let not_matched = RecordBatch::try_new(
self.schema.clone(),
Vec::from_iter(not_matched.columns().iter().cloned()),
)?;
batches.push(Ok(not_matched));
}
match self.params.delete_not_matched_by_source {
Expand Down Expand Up @@ -642,8 +659,8 @@ mod tests {
#[tokio::test]
async fn test_basic_merge() {
let schema = Arc::new(Schema::new(vec![
Field::new("key", DataType::UInt32, true),
Field::new("value", DataType::UInt32, true),
Field::new("key", DataType::UInt32, false),
Field::new("value", DataType::UInt32, false),
]));

let batch = RecordBatch::try_new(
Expand Down

0 comments on commit a0104bd

Please sign in to comment.