diff --git a/Cargo.toml b/Cargo.toml index 4805745e66..021ccc3c3a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,7 +16,6 @@ resolver = "2" [workspace.dependencies] bincode = { version = "2.0.0-rc.3", features = ["derive"] } -datafusion = { version = "33.0.0" } [patch.crates-io] postgres = { git = "https://github.com/getdozer/rust-postgres" } diff --git a/dozer-ingestion/Cargo.toml b/dozer-ingestion/Cargo.toml index b068d5107b..7964a3e81b 100644 --- a/dozer-ingestion/Cargo.toml +++ b/dozer-ingestion/Cargo.toml @@ -8,14 +8,14 @@ authors = ["getdozer/dozer-dev"] [dependencies] dozer-ingestion-connector = { path = "./connector" } -dozer-ingestion-deltalake = { path = "./deltalake" } +dozer-ingestion-deltalake = { path = "./deltalake", optional = true } dozer-ingestion-ethereum = { path = "./ethereum", optional = true } dozer-ingestion-grpc = { path = "./grpc" } -dozer-ingestion-javascript = { path = "./javascript" } +dozer-ingestion-javascript = { path = "./javascript", optional = true } dozer-ingestion-kafka = { path = "./kafka", optional = true } dozer-ingestion-mongodb = { path = "./mongodb", optional = true } dozer-ingestion-mysql = { path = "./mysql" } -dozer-ingestion-object-store = { path = "./object-store" } +dozer-ingestion-object-store = { path = "./object-store", optional = true } dozer-ingestion-postgres = { path = "./postgres" } dozer-ingestion-snowflake = { path = "./snowflake", optional = true } dozer-ingestion-aerospike = { path = "./aerospike" } @@ -45,6 +45,12 @@ snowflake = ["dep:dozer-ingestion-snowflake"] ethereum = ["dep:dozer-ingestion-ethereum"] kafka = ["dep:dozer-ingestion-kafka"] mongodb = ["dep:dozer-ingestion-mongodb"] +datafusion = [ + "dep:dozer-ingestion-deltalake", + "dep:dozer-ingestion-object-store", +] +javascript = ["dep:dozer-ingestion-javascript"] + [[bench]] name = "connectors" diff --git a/dozer-ingestion/object-store/Cargo.toml b/dozer-ingestion/object-store/Cargo.toml index 3604b93494..9ee1b29422 100644 --- a/dozer-ingestion/object-store/Cargo.toml +++ b/dozer-ingestion/object-store/Cargo.toml @@ -9,4 +9,4 @@ edition = "2021" dozer-ingestion-connector = { path = "../connector" } object_store = { version = "0.7.1", features = ["aws"] } url = "2.4.1" -datafusion = { workspace = true } +datafusion = { version = "33.0.0" } diff --git a/dozer-ingestion/src/errors.rs b/dozer-ingestion/src/errors.rs index 803b5a2c8b..5f6646c4e2 100644 --- a/dozer-ingestion/src/errors.rs +++ b/dozer-ingestion/src/errors.rs @@ -27,6 +27,12 @@ pub enum ConnectorError { #[error("mongodb feature is not enabled")] MongodbFeatureNotEnabled, + #[error("datafusion feature is not enabled")] + DatafusionFeatureNotEnabled, + + #[error("javascript feature is not enabled")] + JavascrtiptFeatureNotEnabled, + #[error("{0} is not supported as a source connector")] Unsupported(String), } diff --git a/dozer-ingestion/src/lib.rs b/dozer-ingestion/src/lib.rs index 9e4cda2d0c..72090e531a 100644 --- a/dozer-ingestion/src/lib.rs +++ b/dozer-ingestion/src/lib.rs @@ -13,16 +13,19 @@ use dozer_ingestion_connector::dozer_types::{ node::NodeHandle, prettytable::Table, }; +#[cfg(feature = "datafusion")] use dozer_ingestion_deltalake::DeltaLakeConnector; #[cfg(feature = "ethereum")] use dozer_ingestion_ethereum::{EthLogConnector, EthTraceConnector}; use dozer_ingestion_grpc::{connector::GrpcConnector, ArrowAdapter, DefaultAdapter}; +#[cfg(feature = "javascript")] use dozer_ingestion_javascript::JavaScriptConnector; #[cfg(feature = "kafka")] use dozer_ingestion_kafka::connector::KafkaConnector; #[cfg(feature = "mongodb")] use dozer_ingestion_mongodb::MongodbConnector; use dozer_ingestion_mysql::connector::{mysql_connection_opts_from_url, MySQLConnector}; +#[cfg(feature = "datafusion")] use dozer_ingestion_object_store::connector::ObjectStoreConnector; use dozer_ingestion_oracle::OracleConnector; use dozer_ingestion_postgres::{ @@ -40,6 +43,7 @@ pub use dozer_ingestion_connector::*; const DEFAULT_POSTGRES_SNAPSHOT_BATCH_SIZE: u32 = 100_000; +#[allow(unused_variables)] pub fn get_connector( runtime: Arc, event_hub: EventHub, @@ -110,15 +114,24 @@ pub fn get_connector( ConnectionConfig::Kafka(kafka_config) => Ok(Box::new(KafkaConnector::new(kafka_config))), #[cfg(not(feature = "kafka"))] ConnectionConfig::Kafka(_) => Err(ConnectorError::KafkaFeatureNotEnabled), + #[cfg(feature = "datafusion")] ConnectionConfig::S3Storage(object_store_config) => { Ok(Box::new(ObjectStoreConnector::new(object_store_config))) } + #[cfg(feature = "datafusion")] ConnectionConfig::LocalStorage(object_store_config) => { Ok(Box::new(ObjectStoreConnector::new(object_store_config))) } + #[cfg(feature = "datafusion")] ConnectionConfig::DeltaLake(delta_lake_config) => { Ok(Box::new(DeltaLakeConnector::new(delta_lake_config))) } + #[cfg(not(feature = "datafusion"))] + ConnectionConfig::DeltaLake(_) => Err(ConnectorError::DatafusionFeatureNotEnabled), + #[cfg(not(feature = "datafusion"))] + ConnectionConfig::LocalStorage(_) => Err(ConnectorError::DatafusionFeatureNotEnabled), + #[cfg(not(feature = "datafusion"))] + ConnectionConfig::S3Storage(_) => Err(ConnectorError::DatafusionFeatureNotEnabled), #[cfg(feature = "mongodb")] ConnectionConfig::MongoDB(mongodb_config) => { let connection_string = mongodb_config.connection_string; @@ -137,6 +150,9 @@ pub fn get_connector( ConnectionConfig::Webhook(webhook_config) => { Ok(Box::new(WebhookConnector::new(webhook_config))) } + #[cfg(not(feature = "javascript"))] + ConnectionConfig::JavaScript(_) => Err(ConnectorError::JavascrtiptFeatureNotEnabled), + #[cfg(feature = "javascript")] ConnectionConfig::JavaScript(javascript_config) => Ok(Box::new(JavaScriptConnector::new( runtime, javascript_config, diff --git a/dozer-ingestion/tests/test_connectors.rs b/dozer-ingestion/tests/test_connectors.rs index 593234f82f..307fa4f29e 100644 --- a/dozer-ingestion/tests/test_connectors.rs +++ b/dozer-ingestion/tests/test_connectors.rs @@ -6,12 +6,13 @@ use test_suite::{ }; use tokio::runtime::Runtime; +#[cfg(feature = "datafusion")] #[test] fn test_local_storage() { let runtime = create_test_runtime(); runtime.block_on(test_local_storage_impl(runtime.clone())); } - +#[cfg(feature = "datafusion")] async fn test_local_storage_impl(runtime: Arc) { let _ = env_logger::builder().is_test(true).try_init(); diff --git a/dozer-ingestion/tests/test_suite/connectors/mod.rs b/dozer-ingestion/tests/test_suite/connectors/mod.rs index 93d0c601b0..9e86c00eef 100644 --- a/dozer-ingestion/tests/test_suite/connectors/mod.rs +++ b/dozer-ingestion/tests/test_suite/connectors/mod.rs @@ -1,4 +1,5 @@ mod arrow; +#[cfg(feature = "datafusion")] mod object_store; mod postgres; mod sql; @@ -8,6 +9,6 @@ mod mongodb; #[cfg(feature = "mongodb")] pub use self::mongodb::MongodbConnectorTest; - +#[cfg(feature = "datafusion")] pub use self::object_store::LocalStorageObjectStoreConnectorTest; pub use self::postgres::PostgresConnectorTest; diff --git a/dozer-ingestion/tests/test_suite/mod.rs b/dozer-ingestion/tests/test_suite/mod.rs index e36781f1d5..309fe6642a 100644 --- a/dozer-ingestion/tests/test_suite/mod.rs +++ b/dozer-ingestion/tests/test_suite/mod.rs @@ -51,4 +51,6 @@ pub use basic::{ #[cfg(feature = "mongodb")] pub use connectors::MongodbConnectorTest; -pub use connectors::{LocalStorageObjectStoreConnectorTest, PostgresConnectorTest}; +pub use connectors::PostgresConnectorTest; +#[cfg(feature = "datafusion")] +pub use LocalStorageObjectStoreConnectorTest; diff --git a/dozer-sql/expression/Cargo.toml b/dozer-sql/expression/Cargo.toml index 29e638a6fa..ce26e8e572 100644 --- a/dozer-sql/expression/Cargo.toml +++ b/dozer-sql/expression/Cargo.toml @@ -6,7 +6,7 @@ authors = ["getdozer/dozer-dev"] [dependencies] dozer-types = { path = "../../dozer-types" } -dozer-deno = { path = "../../dozer-deno" } +dozer-deno = { path = "../../dozer-deno", optional = true } dozer-core = { path = "../../dozer-core" } num-traits = "0.2.16" sqlparser = { git = "https://github.com/getdozer/sqlparser-rs.git" } @@ -27,3 +27,4 @@ proptest = "1.2.0" bigdecimal = ["dep:bigdecimal", "sqlparser/bigdecimal"] python = ["dozer-types/python-auto-initialize"] onnx = ["dep:ort", "dep:ndarray", "dep:half"] +javascript = ["dep:dozer-deno"] diff --git a/dozer-sql/expression/src/builder.rs b/dozer-sql/expression/src/builder.rs index 495cf4d9fe..c1989c1020 100644 --- a/dozer-sql/expression/src/builder.rs +++ b/dozer-sql/expression/src/builder.rs @@ -27,6 +27,7 @@ use crate::scalar::string::TrimType; use super::cast::CastOperatorType; +#[allow(dead_code)] #[derive(Clone, Debug)] pub struct ExpressionBuilder { // Must be an aggregation function @@ -544,15 +545,25 @@ impl ExpressionBuilder { Err(Error::OnnxNotEnabled) } } + UdfType::JavaScript(config) => { - self.parse_javascript_udf( - function_name.clone(), - config, - sql_function, - schema, - udfs, - ) - .await + #[cfg(feature = "javasscript")] + { + self.parse_javascript_udf( + function_name.clone(), + config, + sql_function, + schema, + udfs, + ) + .await + } + + #[cfg(not(feature = "javascript"))] + { + let _ = config; + Err(Error::OnnxNotEnabled) + } } }; } @@ -947,6 +958,7 @@ impl ExpressionBuilder { }) } + #[cfg(feature = "datafusion")] async fn parse_javascript_udf( &mut self, name: String, diff --git a/dozer-sql/expression/src/error.rs b/dozer-sql/expression/src/error.rs index 7e4ee3572e..1fd7a9bef1 100644 --- a/dozer-sql/expression/src/error.rs +++ b/dozer-sql/expression/src/error.rs @@ -100,6 +100,10 @@ pub enum Error { #[error("ONNX UDF is not enabled")] OnnxNotEnabled, + #[error("Javascript is not enabled")] + JavaScriptNotEnabled, + + #[cfg(feature = "javasscript")] #[error("JavaScript UDF error: {0}")] JavaScript(#[from] crate::javascript::Error), diff --git a/dozer-sql/expression/src/execution.rs b/dozer-sql/expression/src/execution.rs index 08e7cbf442..09ce5c9853 100644 --- a/dozer-sql/expression/src/execution.rs +++ b/dozer-sql/expression/src/execution.rs @@ -96,6 +96,7 @@ pub enum Expression { session: crate::onnx::DozerSession, args: Vec, }, + #[cfg(feature = "javascript")] JavaScriptUdf(crate::javascript::Udf), } @@ -275,6 +276,7 @@ impl Expression { .as_str() + ")" } + #[cfg(feature = "javascript")] Expression::JavaScriptUdf(udf) => udf.to_string(schema), } } @@ -363,6 +365,7 @@ impl Expression { results, else_result, } => evaluate_case(schema, operand, conditions, results, else_result, record), + #[cfg(feature = "javascript")] Expression::JavaScriptUdf(udf) => udf.evaluate(record, schema), } } @@ -471,6 +474,7 @@ impl Expression { SourceDefinition::Dynamic, false, )), + #[cfg(feature = "javascript")] Expression::JavaScriptUdf(udf) => Ok(udf.get_type()), } } diff --git a/dozer-sql/expression/src/lib.rs b/dozer-sql/expression/src/lib.rs index 7147bea48e..8ad5fda4a6 100644 --- a/dozer-sql/expression/src/lib.rs +++ b/dozer-sql/expression/src/lib.rs @@ -16,6 +16,7 @@ mod mathematical; pub mod operator; pub mod scalar; +#[cfg(feature = "javascript")] mod javascript; #[cfg(feature = "onnx")] mod onnx;