From 44f4be20b4753f481caeea81f24ddc45a8821075 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 12 Dec 2024 09:04:42 -0500 Subject: [PATCH] Minor: Add doc example to RecordBatchStreamAdapter (#13725) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Minor: Add doc example to RecordBatchStreamAdapter * Update datafusion/physical-plan/src/stream.rs Co-authored-by: Berkay Şahin <124376117+berkaysynnada@users.noreply.github.com> --------- Co-authored-by: Berkay Şahin <124376117+berkaysynnada@users.noreply.github.com> --- datafusion/physical-plan/src/stream.rs | 27 ++++++++++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-plan/src/stream.rs b/datafusion/physical-plan/src/stream.rs index b3054299b7f7..a05b46d22840 100644 --- a/datafusion/physical-plan/src/stream.rs +++ b/datafusion/physical-plan/src/stream.rs @@ -337,7 +337,9 @@ impl RecordBatchReceiverStream { pin_project! { /// Combines a [`Stream`] with a [`SchemaRef`] implementing - /// [`RecordBatchStream`] for the combination + /// [`SendableRecordBatchStream`] for the combination + /// + /// See [`Self::new`] for an example pub struct RecordBatchStreamAdapter { schema: SchemaRef, @@ -347,7 +349,28 @@ pin_project! { } impl RecordBatchStreamAdapter { - /// Creates a new [`RecordBatchStreamAdapter`] from the provided schema and stream + /// Creates a new [`RecordBatchStreamAdapter`] from the provided schema and stream. + /// + /// Note to create a [`SendableRecordBatchStream`] you pin the result + /// + /// # Example + /// ``` + /// # use arrow::array::record_batch; + /// # use datafusion_execution::SendableRecordBatchStream; + /// # use datafusion_physical_plan::stream::RecordBatchStreamAdapter; + /// // Create stream of Result + /// let batch = record_batch!( + /// ("a", Int32, [1, 2, 3]), + /// ("b", Float64, [Some(4.0), None, Some(5.0)]) + /// ).expect("created batch"); + /// let schema = batch.schema(); + /// let stream = futures::stream::iter(vec![Ok(batch)]); + /// // Convert the stream to a SendableRecordBatchStream + /// let adapter = RecordBatchStreamAdapter::new(schema, stream); + /// // Now you can use the adapter as a SendableRecordBatchStream + /// let batch_stream: SendableRecordBatchStream = Box::pin(adapter); + /// // ... + /// ``` pub fn new(schema: SchemaRef, stream: S) -> Self { Self { schema, stream } }