diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs b/datafusion/physical-plan/src/joins/sort_merge_join.rs index e56ef32045b71..d35dda0dd696b 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs @@ -1116,10 +1116,13 @@ impl SMJStream { // NULLs on streamed side. // // Applicable only in case of Full join. + // + // If `output_not_matched_filter` is true, this will also produce record batches + // for buffered rows which are joined with streamed side but don't match join filter. fn freeze_buffered( &mut self, batch_count: usize, - output_join_filter_fail_batch: bool, + output_not_matched_filter: bool, ) -> Result<()> { if !matches!(self.join_type, JoinType::Full) { return Ok(()); @@ -1139,7 +1142,7 @@ impl SMJStream { buffered_batch.null_joined.clear(); // For buffered rows which are joined with streamed side but doesn't satisfy the join filter - if output_join_filter_fail_batch { + if output_not_matched_filter { let buffered_indices = UInt64Array::from_iter_values( buffered_batch.join_filter_failed_idxs.iter().copied(), ); @@ -1475,10 +1478,10 @@ fn produce_buffered_null_batch( .map(|f| new_null_array(f.data_type(), buffered_indices.len())) .collect::>(); - streamed_columns.extend(buffered_columns); - let columns = streamed_columns; - - Ok(Some(RecordBatch::try_new(schema.clone(), columns)?)) + Ok(Some(RecordBatch::try_new( + schema.clone(), + streamed_columns, + )?)) } /// Get `buffered_indices` rows for `buffered_data[buffered_batch_idx]`