Skip to content

Commit

Permalink
chore: enable javascript and datafusion features (#2449)
Browse files Browse the repository at this point in the history
* chore: enable javascript and datafusion features

* chore: fix reqwest dependency

* chore: fix reqwest dependency
  • Loading branch information
v3g42 authored Mar 9, 2024
1 parent b2c3d0e commit 898ad5b
Show file tree
Hide file tree
Showing 16 changed files with 135 additions and 81 deletions.
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ resolver = "2"

[workspace.dependencies]
bincode = { version = "2.0.0-rc.3", features = ["derive"] }
datafusion = { version = "33.0.0" }

[patch.crates-io]
postgres = { git = "https://github.com/getdozer/rust-postgres" }
Expand Down
3 changes: 2 additions & 1 deletion dozer-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ rustyline = "12.0.0"
rustyline-derive = "0.9.0"
futures = "0.3.28"
page_size = "0.6.0"
reqwest = { version = "0.11.20", features = [
reqwest = { version = "0.11.23", features = [
"rustls-tls",
"cookies",
"json",
], default-features = false }
glob = "0.3.1"
atty = "0.2.14"
Expand Down
12 changes: 9 additions & 3 deletions dozer-ingestion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ authors = ["getdozer/dozer-dev"]

[dependencies]
dozer-ingestion-connector = { path = "./connector" }
dozer-ingestion-deltalake = { path = "./deltalake" }
dozer-ingestion-deltalake = { path = "./deltalake", optional = true }
dozer-ingestion-ethereum = { path = "./ethereum", optional = true }
dozer-ingestion-grpc = { path = "./grpc" }
dozer-ingestion-javascript = { path = "./javascript" }
dozer-ingestion-javascript = { path = "./javascript", optional = true }
dozer-ingestion-kafka = { path = "./kafka", optional = true }
dozer-ingestion-mongodb = { path = "./mongodb", optional = true }
dozer-ingestion-mysql = { path = "./mysql" }
dozer-ingestion-object-store = { path = "./object-store" }
dozer-ingestion-object-store = { path = "./object-store", optional = true }
dozer-ingestion-postgres = { path = "./postgres" }
dozer-ingestion-snowflake = { path = "./snowflake", optional = true }
dozer-ingestion-aerospike = { path = "./aerospike" }
Expand Down Expand Up @@ -45,6 +45,12 @@ snowflake = ["dep:dozer-ingestion-snowflake"]
ethereum = ["dep:dozer-ingestion-ethereum"]
kafka = ["dep:dozer-ingestion-kafka"]
mongodb = ["dep:dozer-ingestion-mongodb"]
datafusion = [
"dep:dozer-ingestion-deltalake",
"dep:dozer-ingestion-object-store",
]
javascript = ["dep:dozer-ingestion-javascript"]


[[bench]]
name = "connectors"
Expand Down
2 changes: 1 addition & 1 deletion dozer-ingestion/object-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@ edition = "2021"
dozer-ingestion-connector = { path = "../connector" }
object_store = { version = "0.7.1", features = ["aws"] }
url = "2.4.1"
datafusion = { workspace = true }
datafusion = { version = "33.0.0" }
6 changes: 6 additions & 0 deletions dozer-ingestion/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ pub enum ConnectorError {
#[error("mongodb feature is not enabled")]
MongodbFeatureNotEnabled,

#[error("datafusion feature is not enabled")]
DatafusionFeatureNotEnabled,

#[error("javascript feature is not enabled")]
JavascrtiptFeatureNotEnabled,

#[error("{0} is not supported as a source connector")]
Unsupported(String),
}
16 changes: 16 additions & 0 deletions dozer-ingestion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,19 @@ use dozer_ingestion_connector::dozer_types::{
node::NodeHandle,
prettytable::Table,
};
#[cfg(feature = "datafusion")]
use dozer_ingestion_deltalake::DeltaLakeConnector;
#[cfg(feature = "ethereum")]
use dozer_ingestion_ethereum::{EthLogConnector, EthTraceConnector};
use dozer_ingestion_grpc::{connector::GrpcConnector, ArrowAdapter, DefaultAdapter};
#[cfg(feature = "javascript")]
use dozer_ingestion_javascript::JavaScriptConnector;
#[cfg(feature = "kafka")]
use dozer_ingestion_kafka::connector::KafkaConnector;
#[cfg(feature = "mongodb")]
use dozer_ingestion_mongodb::MongodbConnector;
use dozer_ingestion_mysql::connector::{mysql_connection_opts_from_url, MySQLConnector};
#[cfg(feature = "datafusion")]
use dozer_ingestion_object_store::connector::ObjectStoreConnector;
use dozer_ingestion_oracle::OracleConnector;
use dozer_ingestion_postgres::{
Expand All @@ -40,6 +43,7 @@ pub use dozer_ingestion_connector::*;

const DEFAULT_POSTGRES_SNAPSHOT_BATCH_SIZE: u32 = 100_000;

#[allow(unused_variables)]
pub fn get_connector(
runtime: Arc<Runtime>,
event_hub: EventHub,
Expand Down Expand Up @@ -110,15 +114,24 @@ pub fn get_connector(
ConnectionConfig::Kafka(kafka_config) => Ok(Box::new(KafkaConnector::new(kafka_config))),
#[cfg(not(feature = "kafka"))]
ConnectionConfig::Kafka(_) => Err(ConnectorError::KafkaFeatureNotEnabled),
#[cfg(feature = "datafusion")]
ConnectionConfig::S3Storage(object_store_config) => {
Ok(Box::new(ObjectStoreConnector::new(object_store_config)))
}
#[cfg(feature = "datafusion")]
ConnectionConfig::LocalStorage(object_store_config) => {
Ok(Box::new(ObjectStoreConnector::new(object_store_config)))
}
#[cfg(feature = "datafusion")]
ConnectionConfig::DeltaLake(delta_lake_config) => {
Ok(Box::new(DeltaLakeConnector::new(delta_lake_config)))
}
#[cfg(not(feature = "datafusion"))]
ConnectionConfig::DeltaLake(_) => Err(ConnectorError::DatafusionFeatureNotEnabled),
#[cfg(not(feature = "datafusion"))]
ConnectionConfig::LocalStorage(_) => Err(ConnectorError::DatafusionFeatureNotEnabled),
#[cfg(not(feature = "datafusion"))]
ConnectionConfig::S3Storage(_) => Err(ConnectorError::DatafusionFeatureNotEnabled),
#[cfg(feature = "mongodb")]
ConnectionConfig::MongoDB(mongodb_config) => {
let connection_string = mongodb_config.connection_string;
Expand All @@ -137,6 +150,9 @@ pub fn get_connector(
ConnectionConfig::Webhook(webhook_config) => {
Ok(Box::new(WebhookConnector::new(webhook_config)))
}
#[cfg(not(feature = "javascript"))]
ConnectionConfig::JavaScript(_) => Err(ConnectorError::JavascrtiptFeatureNotEnabled),
#[cfg(feature = "javascript")]
ConnectionConfig::JavaScript(javascript_config) => Ok(Box::new(JavaScriptConnector::new(
runtime,
javascript_config,
Expand Down
3 changes: 2 additions & 1 deletion dozer-ingestion/tests/test_connectors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ use test_suite::{
};
use tokio::runtime::Runtime;

#[cfg(feature = "datafusion")]
#[test]
fn test_local_storage() {
let runtime = create_test_runtime();
runtime.block_on(test_local_storage_impl(runtime.clone()));
}

#[cfg(feature = "datafusion")]
async fn test_local_storage_impl(runtime: Arc<Runtime>) {
let _ = env_logger::builder().is_test(true).try_init();

Expand Down
3 changes: 2 additions & 1 deletion dozer-ingestion/tests/test_suite/connectors/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod arrow;
#[cfg(feature = "datafusion")]
mod object_store;
mod postgres;
mod sql;
Expand All @@ -8,6 +9,6 @@ mod mongodb;

#[cfg(feature = "mongodb")]
pub use self::mongodb::MongodbConnectorTest;

#[cfg(feature = "datafusion")]
pub use self::object_store::LocalStorageObjectStoreConnectorTest;
pub use self::postgres::PostgresConnectorTest;
4 changes: 3 additions & 1 deletion dozer-ingestion/tests/test_suite/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,6 @@ pub use basic::{
#[cfg(feature = "mongodb")]
pub use connectors::MongodbConnectorTest;

pub use connectors::{LocalStorageObjectStoreConnectorTest, PostgresConnectorTest};
pub use connectors::PostgresConnectorTest;
#[cfg(feature = "datafusion")]
pub use LocalStorageObjectStoreConnectorTest;
3 changes: 2 additions & 1 deletion dozer-sql/expression/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ authors = ["getdozer/dozer-dev"]

[dependencies]
dozer-types = { path = "../../dozer-types" }
dozer-deno = { path = "../../dozer-deno" }
dozer-deno = { path = "../../dozer-deno", optional = true }
dozer-core = { path = "../../dozer-core" }
num-traits = "0.2.16"
sqlparser = { git = "https://github.com/getdozer/sqlparser-rs.git" }
Expand All @@ -27,3 +27,4 @@ proptest = "1.2.0"
bigdecimal = ["dep:bigdecimal", "sqlparser/bigdecimal"]
python = ["dozer-types/python-auto-initialize"]
onnx = ["dep:ort", "dep:ndarray", "dep:half"]
javascript = ["dep:dozer-deno"]
28 changes: 20 additions & 8 deletions dozer-sql/expression/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::scalar::string::TrimType;

use super::cast::CastOperatorType;

#[allow(dead_code)]
#[derive(Clone, Debug)]
pub struct ExpressionBuilder {
// Must be an aggregation function
Expand Down Expand Up @@ -544,15 +545,25 @@ impl ExpressionBuilder {
Err(Error::OnnxNotEnabled)
}
}

UdfType::JavaScript(config) => {
self.parse_javascript_udf(
function_name.clone(),
config,
sql_function,
schema,
udfs,
)
.await
#[cfg(feature = "javasscript")]
{
self.parse_javascript_udf(
function_name.clone(),
config,
sql_function,
schema,
udfs,
)
.await
}

#[cfg(not(feature = "javascript"))]
{
let _ = config;
Err(Error::OnnxNotEnabled)
}
}
};
}
Expand Down Expand Up @@ -947,6 +958,7 @@ impl ExpressionBuilder {
})
}

#[cfg(feature = "javascript")]
async fn parse_javascript_udf(
&mut self,
name: String,
Expand Down
4 changes: 4 additions & 0 deletions dozer-sql/expression/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ pub enum Error {
#[error("ONNX UDF is not enabled")]
OnnxNotEnabled,

#[error("Javascript is not enabled")]
JavaScriptNotEnabled,

#[cfg(feature = "javasscript")]
#[error("JavaScript UDF error: {0}")]
JavaScript(#[from] crate::javascript::Error),

Expand Down
4 changes: 4 additions & 0 deletions dozer-sql/expression/src/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ pub enum Expression {
session: crate::onnx::DozerSession,
args: Vec<Expression>,
},
#[cfg(feature = "javascript")]
JavaScriptUdf(crate::javascript::Udf),
}

Expand Down Expand Up @@ -275,6 +276,7 @@ impl Expression {
.as_str()
+ ")"
}
#[cfg(feature = "javascript")]
Expression::JavaScriptUdf(udf) => udf.to_string(schema),
}
}
Expand Down Expand Up @@ -363,6 +365,7 @@ impl Expression {
results,
else_result,
} => evaluate_case(schema, operand, conditions, results, else_result, record),
#[cfg(feature = "javascript")]
Expression::JavaScriptUdf(udf) => udf.evaluate(record, schema),
}
}
Expand Down Expand Up @@ -471,6 +474,7 @@ impl Expression {
SourceDefinition::Dynamic,
false,
)),
#[cfg(feature = "javascript")]
Expression::JavaScriptUdf(udf) => Ok(udf.get_type()),
}
}
Expand Down
1 change: 1 addition & 0 deletions dozer-sql/expression/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ mod mathematical;
pub mod operator;
pub mod scalar;

#[cfg(feature = "javascript")]
mod javascript;
#[cfg(feature = "onnx")]
mod onnx;
Expand Down
Loading

0 comments on commit 898ad5b

Please sign in to comment.