Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix some issues causing 137 oom #815

Merged
merged 1 commit into from
Feb 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 5 additions & 2 deletions native-engine/datafusion-ext-exprs/src/spark_udf_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::{
};

use arrow::{
array::{as_struct_array, make_array, Array, ArrayRef, StructArray},
array::{as_struct_array, make_array, new_empty_array, Array, ArrayRef, StructArray},
datatypes::{DataType, Field, Schema, SchemaRef},
ffi::{from_ffi, FFI_ArrowArray, FFI_ArrowSchema},
record_batch::{RecordBatch, RecordBatchOptions},
Expand Down Expand Up @@ -123,6 +123,10 @@ impl PhysicalExpr for SparkUDFWrapperExpr {
}

let batch_schema = batch.schema();
let num_rows = batch.num_rows();
if num_rows == 0 {
return Ok(ColumnarValue::Array(new_empty_array(&self.return_type)));
}

// init params schema
let params_schema = self
Expand All @@ -140,7 +144,6 @@ impl PhysicalExpr for SparkUDFWrapperExpr {
})?;

// evaluate params
let num_rows = batch.num_rows();
let params: Vec<ArrayRef> = self
.params
.iter()
Expand Down
41 changes: 13 additions & 28 deletions native-engine/datafusion-ext-plans/src/agg/agg_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use datafusion_ext_commons::{
rdx_queue::{KeyForRadixQueue, RadixQueue},
rdx_sort::radix_sort_by_key,
},
batch_size, compute_suggested_batch_size_for_output, df_execution_err, downcast_any,
batch_size, compute_suggested_batch_size_for_output, df_execution_err,
io::{read_bytes_slice, read_len, write_len},
};
use futures::lock::Mutex;
Expand Down Expand Up @@ -60,7 +60,6 @@ const SPILL_OFFHEAP_MEM_COST: usize = 200000;
const NUM_SPILL_BUCKETS: usize = 64000;

pub struct AggTable {
name: String,
mem_consumer_info: Option<Weak<MemConsumerInfo>>,
in_mem: Mutex<InMemTable>,
spills: Mutex<Vec<Box<dyn Spill>>>,
Expand All @@ -71,14 +70,12 @@ pub struct AggTable {

impl AggTable {
pub fn new(agg_ctx: Arc<AggContext>, exec_ctx: Arc<ExecutionContext>) -> Self {
let name = format!("AggTable[partition={}]", exec_ctx.partition_id());
let hashing_time = exec_ctx.register_timer_metric("hashing_time");
let merging_time = exec_ctx.register_timer_metric("merging_time");
let output_time = exec_ctx.register_timer_metric("output_time");
Self {
mem_consumer_info: None,
in_mem: Mutex::new(InMemTable::new(
name.clone(),
0,
agg_ctx.clone(),
exec_ctx.clone(),
Expand All @@ -87,7 +84,6 @@ impl AggTable {
merging_time.clone(),
)),
spills: Mutex::default(),
name,
agg_ctx,
exec_ctx,
output_time,
Expand Down Expand Up @@ -201,22 +197,24 @@ impl AggTable {
return Ok(());
}

// convert all tables into cursors
// write rest data into an in-memory buffer if in-mem data is small
// otherwise write into spill
let mut spills = spills;
let mut cursors = vec![];
if in_mem.num_records() > 0 {
let spill = tokio::task::spawn_blocking(|| {
let mut spill: Box<dyn Spill> = Box::new(vec![]);
let spill_metrics = self.exec_ctx.spill_metrics().clone();
let spill = tokio::task::spawn_blocking(move || {
let mut spill: Box<dyn Spill> = try_new_spill(&spill_metrics)?;
in_mem.try_into_spill(&mut spill)?; // spill staging records
Ok::<_, DataFusionError>(spill)
})
.await
.expect("tokio error")?;
let spill_size = downcast_any!(spill, Vec<u8>)?.len();
self.update_mem_used(spill_size + spills.len() * SPILL_OFFHEAP_MEM_COST)
.expect("tokio spawn_blocking error")?;
self.update_mem_used(spills.len() * SPILL_OFFHEAP_MEM_COST)
.await?;
spills.push(spill);
}

let mut cursors = vec![];
for spill in &mut spills {
cursors.push(RecordsSpillCursor::try_from_spill(spill, &self.agg_ctx)?);
}
Expand Down Expand Up @@ -277,7 +275,7 @@ impl AggTable {
#[async_trait]
impl MemConsumer for AggTable {
fn name(&self) -> &str {
&self.name
"AggTable"
}

fn set_consumer_info(&mut self, consumer_info: Weak<MemConsumerInfo>) {
Expand Down Expand Up @@ -336,7 +334,6 @@ pub enum InMemMode {

/// Unordered in-mem hash table which can be updated
pub struct InMemTable {
name: String,
id: usize,
agg_ctx: Arc<AggContext>,
exec_ctx: Arc<ExecutionContext>,
Expand All @@ -347,7 +344,6 @@ pub struct InMemTable {

impl InMemTable {
fn new(
name: String,
id: usize,
agg_ctx: Arc<AggContext>,
exec_ctx: Arc<ExecutionContext>,
Expand All @@ -356,7 +352,6 @@ impl InMemTable {
merging_time: Time,
) -> Self {
Self {
name,
id,
hashing_data: HashingData::new(agg_ctx.clone(), hashing_time),
merging_data: MergingData::new(agg_ctx.clone(), merging_time),
Expand All @@ -367,23 +362,14 @@ impl InMemTable {
}

fn renew(&mut self, mode: InMemMode) -> Self {
let name = self.name.clone();
let agg_ctx = self.agg_ctx.clone();
let task_ctx = self.exec_ctx.clone();
let id = self.id + 1;
let hashing_time = self.hashing_data.hashing_time.clone();
let merging_time = self.merging_data.merging_time.clone();
std::mem::replace(
self,
Self::new(
name,
id,
agg_ctx,
task_ctx,
mode,
hashing_time,
merging_time,
),
Self::new(id, agg_ctx, task_ctx, mode, hashing_time, merging_time),
)
}

Expand All @@ -407,8 +393,7 @@ impl InMemTable {
let cardinality_ratio = self.hashing_data.cardinality_ratio();
if cardinality_ratio > self.agg_ctx.partial_skipping_ratio {
log::warn!(
"{} cardinality ratio = {cardinality_ratio}, will trigger partial skipping",
self.name,
"AggTable cardinality ratio = {cardinality_ratio}, will trigger partial skipping",
);
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use datafusion_ext_commons::{
arrow::{array_size::ArraySize, coalesce::coalesce_batches_unchecked},
batch_size, df_execution_err, suggested_output_batch_mem_size,
};
use futures::{Stream, StreamExt};
use futures::{executor::block_on_stream, Stream, StreamExt};
use futures_util::FutureExt;
use once_cell::sync::OnceCell;
use parking_lot::Mutex;
Expand Down Expand Up @@ -122,6 +122,33 @@ impl ExecutionContext {
.counter(name.to_owned(), self.partition_id)
}

pub fn spawn_worker_thread_on_stream(
self: &Arc<Self>,
input: SendableRecordBatchStream,
) -> SendableRecordBatchStream {
let (batch_sender, mut batch_receiver) = tokio::sync::mpsc::channel(1);

tokio::task::spawn_blocking(move || {
let mut blocking_stream = block_on_stream(input);
while is_task_running()
&& let Some(batch_result) = blocking_stream.next()
{
if batch_sender.blocking_send(batch_result).is_err() {
break;
}
}
});

self.output_with_sender("WorkerThreadOnStream", move |sender| async move {
while is_task_running()
&& let Some(batch_result) = batch_receiver.recv().await
{
sender.send(batch_result?).await;
}
Ok(())
})
}

pub fn coalesce_with_default_batch_size(
self: &Arc<Self>,
input: SendableRecordBatchStream,
Expand Down Expand Up @@ -393,6 +420,9 @@ impl WrappedRecordBatchSender {
}

pub async fn send(&self, batch: RecordBatch) {
if batch.num_rows() == 0 {
return;
}
let exclude_time = self.exclude_time.get().cloned();
let send_time = exclude_time.as_ref().map(|_| Instant::now());
self.sender
Expand Down
6 changes: 3 additions & 3 deletions native-engine/datafusion-ext-plans/src/ffi_reader_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,14 +149,14 @@ fn read_ffi(
Ok(exec_ctx
.clone()
.output_with_sender("FFIReader", move |sender| async move {
struct AutoCloseableExporer(GlobalRef);
impl Drop for AutoCloseableExporer {
struct AutoCloseableExporter(GlobalRef);
impl Drop for AutoCloseableExporter {
fn drop(&mut self) {
let _ = jni_call!(JavaAutoCloseable(self.0.as_obj()).close() -> ());
}
}
let exporter = AutoCloseableExporter(exporter);

let exporter = AutoCloseableExporer(exporter);
loop {
let batch = {
// load batch from ffi
Expand Down
36 changes: 22 additions & 14 deletions native-engine/datafusion-ext-plans/src/orc_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::{any::Any, fmt, fmt::Formatter, sync::Arc};
use std::{any::Any, fmt, fmt::Formatter, pin::Pin, sync::Arc};

use arrow::{datatypes::SchemaRef, error::ArrowError};
use blaze_jni_bridge::{jni_call_static, jni_new_global_ref, jni_new_string};
Expand Down Expand Up @@ -165,24 +165,16 @@ impl ExecutionPlan for OrcExec {
fs_provider,
};

let mut file_stream = Box::pin(FileStream::new(
let file_stream = Box::pin(FileStream::new(
&self.base_config,
partition,
opener,
exec_ctx.execution_plan_metrics(),
)?);
let timed_stream =
exec_ctx
.clone()
.output_with_sender("OrcScan", move |sender| async move {
sender.exclude_time(exec_ctx.baseline_metrics().elapsed_compute());
let _timer = exec_ctx.baseline_metrics().elapsed_compute().timer();
while let Some(batch) = file_stream.next().await.transpose()? {
sender.send(batch).await;
}
Ok(())
});
Ok(timed_stream)

let timed_stream = execute_orc_scan(file_stream, exec_ctx.clone())?;
let nonblock_stream = exec_ctx.spawn_worker_thread_on_stream(timed_stream);
Ok(exec_ctx.coalesce_with_default_batch_size(nonblock_stream))
}

fn metrics(&self) -> Option<MetricsSet> {
Expand All @@ -194,6 +186,22 @@ impl ExecutionPlan for OrcExec {
}
}

fn execute_orc_scan(
mut stream: Pin<Box<FileStream<OrcOpener>>>,
exec_ctx: Arc<ExecutionContext>,
) -> Result<SendableRecordBatchStream> {
Ok(exec_ctx
.clone()
.output_with_sender("OrcScan", move |sender| async move {
sender.exclude_time(exec_ctx.baseline_metrics().elapsed_compute());
let _timer = exec_ctx.baseline_metrics().elapsed_compute().timer();
while let Some(batch) = stream.next().await.transpose()? {
sender.send(batch).await;
}
Ok(())
}))
}

struct OrcOpener {
projection: Vec<usize>,
batch_size: usize,
Expand Down
5 changes: 3 additions & 2 deletions native-engine/datafusion-ext-plans/src/parquet_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,9 @@ impl ExecutionPlan for ParquetExec {
file_stream = file_stream.with_on_error(OnError::Skip);
}

let timed_stream = execute_parquet_scan(Box::pin(file_stream), exec_ctx)?;
Ok(timed_stream)
let timed_stream = execute_parquet_scan(Box::pin(file_stream), exec_ctx.clone())?;
let nonblock_stream = exec_ctx.spawn_worker_thread_on_stream(timed_stream);
Ok(exec_ctx.coalesce_with_default_batch_size(nonblock_stream))
}

fn metrics(&self) -> Option<MetricsSet> {
Expand Down
2 changes: 2 additions & 0 deletions native-engine/datafusion-ext-plans/src/project_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,8 @@ fn execute_project_with_filtering(
.transpose()?
{
let output_batch = cached_expr_evaluator.filter_project(&batch)?;
drop(batch);

exec_ctx
.baseline_metrics()
.record_output(output_batch.num_rows());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use crate::{
};

pub struct RssSortShuffleRepartitioner {
name: String,
mem_consumer_info: Option<Weak<MemConsumerInfo>>,
data: Mutex<BufferedData>,
rss: GlobalRef,
Expand All @@ -41,7 +40,6 @@ impl RssSortShuffleRepartitioner {
sort_time: Time,
) -> Self {
Self {
name: format!("RssSortShufflePartitioner[partition={}]", partition_id),
mem_consumer_info: None,
data: Mutex::new(BufferedData::new(partitioning, partition_id, sort_time)),
rss: rss_partition_writer,
Expand All @@ -52,7 +50,7 @@ impl RssSortShuffleRepartitioner {
#[async_trait]
impl MemConsumer for RssSortShuffleRepartitioner {
fn name(&self) -> &str {
&self.name
"RssSortShuffleRepartitioner"
}

fn set_consumer_info(&mut self, consumer_info: Weak<MemConsumerInfo>) {
Expand Down
Loading
Loading