diff --git a/README.md b/README.md index 0553c83..dcfeaa4 100644 --- a/README.md +++ b/README.md @@ -26,9 +26,8 @@ referred to as 'query federation'. ## Usage -> :warning: **All the examples are deprecated for now** - -Check out [the examples](./examples/) to get a feel for how it works. +Check out the [examples](./datafusion-federation/examples/) to get a feel for +how it works. ## Potential use-cases: diff --git a/datafusion-federation/Cargo.toml b/datafusion-federation/Cargo.toml index fa6bd74..6af448c 100644 --- a/datafusion-federation/Cargo.toml +++ b/datafusion-federation/Cargo.toml @@ -10,6 +10,12 @@ description = "Datafusion federation." name = "datafusion_federation" path = "src/lib.rs" +[package.metadata.docs.rs] +# Whether to pass `--all-features` to Cargo (default: false) +all-features = true +# Whether to pass `--no-default-features` to Cargo (default: false) +no-default-features = true + [features] sql = ["futures"] @@ -19,10 +25,10 @@ datafusion.workspace = true futures = { version = "0.3.30", optional = true } -[package.metadata.docs.rs] - -# Whether to pass `--all-features` to Cargo (default: false) -all-features = true +[dev-dependencies] +tokio = { version = "1.39.3", features = ["full"] } -# Whether to pass `--no-default-features` to Cargo (default: false) -no-default-features = true +[[example]] +name = "df-csv" +path = "examples/df-csv.rs" +required-features = ["sql"] diff --git a/datafusion-federation/examples/df-csv.rs b/datafusion-federation/examples/df-csv.rs new file mode 100644 index 0000000..24ac495 --- /dev/null +++ b/datafusion-federation/examples/df-csv.rs @@ -0,0 +1,115 @@ +use std::sync::Arc; + +use async_trait::async_trait; +use datafusion::{ + arrow::datatypes::SchemaRef, + catalog::SchemaProvider, + error::{DataFusionError, Result}, + execution::{ + context::{SessionContext, SessionState}, + options::CsvReadOptions, + }, + physical_plan::{stream::RecordBatchStreamAdapter, SendableRecordBatchStream}, + sql::sqlparser::dialect::{Dialect, GenericDialect}, +}; +use datafusion_federation::sql::{SQLExecutor, SQLFederationProvider, SQLSchemaProvider}; +use futures::TryStreamExt; + +const CSV_PATH: &str = "./examples/test.csv"; +const TABLE_NAME: &str = "test"; + +#[tokio::main] +async fn main() -> Result<()> { + // Create a remote context + let remote_ctx = Arc::new(SessionContext::new()); + + // Registers a CSV file + remote_ctx + .register_csv(TABLE_NAME, CSV_PATH, CsvReadOptions::new()) + .await?; + let known_tables: Vec = [TABLE_NAME].iter().map(|&x| x.into()).collect(); + + // Register schema + let executor = Arc::new(InMemorySQLExecutor::new(remote_ctx)); + let provider = Arc::new(SQLFederationProvider::new(executor)); + let schema_provider = + Arc::new(SQLSchemaProvider::new_with_tables(provider, known_tables).await?); + + // Local context + let state = datafusion_federation::default_session_state(); + overwrite_default_schema(&state, schema_provider)?; + let ctx = SessionContext::new_with_state(state); + + // Run query + let query = r#"SELECT * from test"#; + let df = ctx.sql(query).await?; + + // let explain = df.clone().explain(true, false)?; + // explain.show().await?; + + df.show().await +} + +fn overwrite_default_schema(state: &SessionState, schema: Arc) -> Result<()> { + let options = &state.config().options().catalog; + let catalog = state + .catalog_list() + .catalog(options.default_catalog.as_str()) + .unwrap(); + + catalog.register_schema(options.default_schema.as_str(), schema)?; + + Ok(()) +} + +pub struct InMemorySQLExecutor { + session: Arc, +} + +impl InMemorySQLExecutor { + pub fn new(session: Arc) -> Self { + Self { session } + } +} + +#[async_trait] +impl SQLExecutor for InMemorySQLExecutor { + fn name(&self) -> &str { + "in_memory_sql_executor" + } + + fn compute_context(&self) -> Option { + None + } + + fn execute(&self, sql: &str, schema: SchemaRef) -> Result { + // Execute it using the remote datafusion session context + let future_stream = _execute(self.session.clone(), sql.to_string()); + let stream = futures::stream::once(future_stream).try_flatten(); + Ok(Box::pin(RecordBatchStreamAdapter::new( + schema.clone(), + stream, + ))) + } + + async fn table_names(&self) -> Result> { + Err(DataFusionError::NotImplemented( + "table inference not implemented".to_string(), + )) + } + + async fn get_table_schema(&self, table_name: &str) -> Result { + let sql = format!("select * from {table_name} limit 1"); + let df = self.session.sql(&sql).await?; + let schema = df.schema().as_arrow().clone(); + Ok(Arc::new(schema)) + } + + fn dialect(&self) -> Arc { + Arc::new(GenericDialect {}) + } +} + +async fn _execute(ctx: Arc, sql: String) -> Result { + ctx.sql(&sql).await?.execute_stream().await +} diff --git a/datafusion-federation/examples/test.csv b/datafusion-federation/examples/test.csv new file mode 100644 index 0000000..811d276 --- /dev/null +++ b/datafusion-federation/examples/test.csv @@ -0,0 +1,4 @@ +foo,bar +a,1 +b,2 +c,3 \ No newline at end of file