Skip to content

Commit

Permalink
Feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
matthewmturner committed May 31, 2024
1 parent 4d7bbd5 commit 1774aac
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 24 deletions.
1 change: 0 additions & 1 deletion datafusion-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,3 @@ uuid = "1.7"

[target.'cfg(not(target_os = "windows"))'.dev-dependencies]
nix = { version = "0.28.0", features = ["fs"] }

19 changes: 10 additions & 9 deletions datafusion-examples/examples/file_stream_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use futures::StreamExt;
use nix::sys::stat;
use nix::unistd;
use tempfile::TempDir;
use tokio::task::{spawn_blocking, JoinHandle};
use tokio::task::JoinSet;

use datafusion::datasource::stream::{FileStreamProvider, StreamConfig, StreamTable};
use datafusion::datasource::TableProvider;
Expand Down Expand Up @@ -94,13 +94,14 @@ fn create_writing_thread(
lines: Vec<String>,
waiting_lock: Arc<AtomicBool>,
wait_until: usize,
) -> JoinHandle<()> {
tasks: &mut JoinSet<()>,
) {
// Timeout for a long period of BrokenPipe error
let broken_pipe_timeout = Duration::from_secs(10);
let sa = file_path.clone();
// Spawn a new thread to write to the FIFO file
#[allow(clippy::disallowed_methods)] // spawn allowed only in tests
spawn_blocking(move || {
tasks.spawn_blocking(move || {
let file = OpenOptions::new().write(true).open(sa).unwrap();
// Reference time to use when deciding to fail the test
let execution_start = Instant::now();
Expand All @@ -114,7 +115,7 @@ fn create_writing_thread(
write_to_fifo(&file, line, execution_start, broken_pipe_timeout).unwrap();
}
drop(file);
})
});
}

/// This example demonstrates a scanning against an Arrow data source (JSON) and
Expand All @@ -130,21 +131,22 @@ async fn main() -> Result<()> {
let tmp_dir = TempDir::new()?;
let fifo_path = create_fifo_file(&tmp_dir, "fifo_unbounded.csv")?;

let mut tasks: Vec<JoinHandle<()>> = vec![];
let mut tasks: JoinSet<()> = JoinSet::new();
let waiting = Arc::new(AtomicBool::new(true));

let data_iter = 0..TEST_DATA_SIZE;
let lines = data_iter
.map(|i| format!("{},{}\n", i, i + 1))
.collect::<Vec<_>>();
// Create writing threads for the left and right FIFO files
tasks.push(create_writing_thread(

create_writing_thread(
fifo_path.clone(),
Some("a1,a2\n".to_owned()),
lines.clone(),
waiting.clone(),
TEST_DATA_SIZE,
));
&mut tasks,
);

// Create schema
let schema = Arc::new(Schema::new(vec![
Expand All @@ -161,7 +163,6 @@ async fn main() -> Result<()> {
let df = ctx.sql("SELECT * FROM fifo").await.unwrap();
let mut stream = df.execute_stream().await.unwrap();

futures::future::join_all(tasks).await;
let mut batches = Vec::new();
if let Some(Ok(batch)) = stream.next().await {
batches.push(batch)
Expand Down
21 changes: 7 additions & 14 deletions datafusion/core/src/datasource/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,7 @@ impl FromStr for StreamEncoding {
/// responsible for providing a `RecordBatchReader` and optionally a `RecordBatchWriter`.
pub trait StreamProvider: std::fmt::Debug + Send + Sync {
/// Get a reference to the schema for this stream
fn schema(&self) -> SchemaRef;
/// Needed for `PartitionStream` - maybe there is a better way to do this.
fn schema_ref(&self) -> &SchemaRef;
fn schema(&self) -> &SchemaRef;
/// Provide `RecordBatchReader`
fn reader(&self) -> Result<Box<dyn RecordBatchReader>>;
/// Provide `RecordBatchWriter`
Expand Down Expand Up @@ -182,12 +180,7 @@ impl FileStreamProvider {
}

impl StreamProvider for FileStreamProvider {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}

// Needed for `PartitionStream`
fn schema_ref(&self) -> &SchemaRef {
fn schema(&self) -> &SchemaRef {
&self.schema
}

Expand Down Expand Up @@ -339,11 +332,11 @@ impl TableProvider for StreamTable {
let projected = self.0.source.schema().project(p)?;
create_ordering(&projected, &self.0.order)?
}
None => create_ordering(self.0.source.schema_ref(), &self.0.order)?,
None => create_ordering(self.0.source.schema(), &self.0.order)?,
};

Ok(Arc::new(StreamingTableExec::try_new(
self.0.source.schema(),
self.0.source.schema().clone(),
vec![Arc::new(StreamRead(self.0.clone())) as _],
projection,
projected_schema,
Expand All @@ -360,7 +353,7 @@ impl TableProvider for StreamTable {
) -> Result<Arc<dyn ExecutionPlan>> {
let ordering = match self.0.order.first() {
Some(x) => {
let schema = self.0.source.schema_ref();
let schema = self.0.source.schema();
let orders = create_ordering(schema, std::slice::from_ref(x))?;
let ordering = orders.into_iter().next().unwrap();
Some(ordering.into_iter().map(Into::into).collect())
Expand All @@ -371,7 +364,7 @@ impl TableProvider for StreamTable {
Ok(Arc::new(DataSinkExec::new(
input,
Arc::new(StreamWrite(self.0.clone())),
self.0.source.schema(),
self.0.source.schema().clone(),
ordering,
)))
}
Expand All @@ -381,7 +374,7 @@ struct StreamRead(Arc<StreamConfig>);

impl PartitionStream for StreamRead {
fn schema(&self) -> &SchemaRef {
self.0.source.schema_ref()
self.0.source.schema()
}

fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
Expand Down

0 comments on commit 1774aac

Please sign in to comment.