From 764fbc4d223d336bf66de90fcb5162b19de83b77 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 13 Jun 2024 11:01:17 -0700 Subject: [PATCH] For review --- .../physical-plan/src/joins/sort_merge_join.rs | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs b/datafusion/physical-plan/src/joins/sort_merge_join.rs index e56ef32045b7..bf1d810baed8 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs @@ -1116,10 +1116,13 @@ impl SMJStream { // NULLs on streamed side. // // Applicable only in case of Full join. + // + // If `output_not_matched_filter` is true, this will also produce record batches + // for buffered rows which are joined with streamed side but don't match join filter. fn freeze_buffered( &mut self, batch_count: usize, - output_join_filter_fail_batch: bool, + output_not_matched_filter: bool, ) -> Result<()> { if !matches!(self.join_type, JoinType::Full) { return Ok(()); @@ -1139,7 +1142,7 @@ impl SMJStream { buffered_batch.null_joined.clear(); // For buffered rows which are joined with streamed side but doesn't satisfy the join filter - if output_join_filter_fail_batch { + if output_not_matched_filter { let buffered_indices = UInt64Array::from_iter_values( buffered_batch.join_filter_failed_idxs.iter().copied(), ); @@ -1476,9 +1479,11 @@ fn produce_buffered_null_batch( .collect::>(); streamed_columns.extend(buffered_columns); - let columns = streamed_columns; - Ok(Some(RecordBatch::try_new(schema.clone(), columns)?)) + Ok(Some(RecordBatch::try_new( + schema.clone(), + streamed_columns, + )?)) } /// Get `buffered_indices` rows for `buffered_data[buffered_batch_idx]`