From f530c10b4a614d353d17d924b69b5690aa282476 Mon Sep 17 00:00:00 2001 From: congyi wang <58715567+wcy-fdu@users.noreply.github.com> Date: Tue, 7 Jan 2025 18:40:41 +0800 Subject: [PATCH] feat(connector): add minio file scan type and enhance test (#19950) --- e2e_test/s3/file_sink.py | 55 +++++++++++++++++++ proto/batch_plan.proto | 1 + .../executors/src/executor/s3_file_scan.rs | 8 ++- .../source/iceberg/parquet_file_handler.rs | 18 +++--- src/frontend/src/expr/table_function.rs | 19 +++++-- .../optimizer/plan_node/batch_file_scan.rs | 1 + .../optimizer/plan_node/generic/file_scan.rs | 1 + .../optimizer/plan_node/logical_file_scan.rs | 6 +- .../rule/table_function_to_file_scan_rule.rs | 14 ++++- .../src/object/opendal_engine/opendal_s3.rs | 1 - 10 files changed, 102 insertions(+), 22 deletions(-) diff --git a/e2e_test/s3/file_sink.py b/e2e_test/s3/file_sink.py index a64f40d0692df..bdabdbbd08e44 100644 --- a/e2e_test/s3/file_sink.py +++ b/e2e_test/s3/file_sink.py @@ -62,6 +62,46 @@ def do_test(config, file_num, item_num_per_file, prefix): def _table(): return 's3_test_parquet' + print("test table function file scan") + cur.execute(f''' + SELECT + id, + name, + sex, + mark, + test_int, + test_int8, + test_uint8, + test_uint16, + test_uint32, + test_uint64, + test_float_16, + test_real, + test_double_precision, + test_varchar, + test_bytea, + test_date, + test_time, + test_timestamp_s, + test_timestamp_ms, + test_timestamp_us, + test_timestamp_ns, + test_timestamptz_s, + test_timestamptz_ms, + test_timestamptz_us, + test_timestamptz_ns + FROM file_scan( + 'parquet', + 's3', + 'http://127.0.0.1:9301', + 'hummockadmin', + 'hummockadmin', + 's3://hummock001/test_file_scan/test_file_scan.parquet' + );''') + result = cur.fetchone() + assert result[0] == 0, f'file scan assertion failed: the first column is {result[0]}, expect 0.' + + print("file scan test pass") # Execute a SELECT statement cur.execute(f'''CREATE TABLE {_table()}( id bigint primary key, @@ -491,6 +531,21 @@ def _assert_greater(field, got, expect): _s3(idx), _local(idx) ) + # put parquet file to test table function file scan + if data: + first_file_data = data[0] + first_table = pa.Table.from_pandas(pd.DataFrame(first_file_data)) + + first_file_name = f"test_file_scan.parquet" + first_file_path = f"test_file_scan/{first_file_name}" + + pq.write_table(first_table, "data_0.parquet") + + client.fput_object( + "hummock001", + first_file_path, + "data_0.parquet" + ) # do test do_test(config, FILE_NUM, ITEM_NUM_PER_FILE, run_id) diff --git a/proto/batch_plan.proto b/proto/batch_plan.proto index 14c587cad99b4..a105b7e3ece5b 100644 --- a/proto/batch_plan.proto +++ b/proto/batch_plan.proto @@ -99,6 +99,7 @@ message FileScanNode { string s3_access_key = 5; string s3_secret_key = 6; repeated string file_location = 7; + string s3_endpoint = 8; } message GcsFileScanNode { diff --git a/src/batch/executors/src/executor/s3_file_scan.rs b/src/batch/executors/src/executor/s3_file_scan.rs index ee3b08508cd66..09545d615c795 100644 --- a/src/batch/executors/src/executor/s3_file_scan.rs +++ b/src/batch/executors/src/executor/s3_file_scan.rs @@ -20,7 +20,6 @@ use risingwave_connector::source::iceberg::{ extract_bucket_and_file_name, new_s3_operator, read_parquet_file, FileScanBackend, }; use risingwave_pb::batch_plan::file_scan_node; -use risingwave_pb::batch_plan::file_scan_node::StorageType; use risingwave_pb::batch_plan::plan_node::NodeBody; use crate::error::BatchError; @@ -38,6 +37,7 @@ pub struct S3FileScanExecutor { s3_region: String, s3_access_key: String, s3_secret_key: String, + s3_endpoint: String, batch_size: usize, schema: Schema, identity: String, @@ -67,6 +67,7 @@ impl S3FileScanExecutor { batch_size: usize, schema: Schema, identity: String, + s3_endpoint: String, ) -> Self { Self { file_format, @@ -74,6 +75,7 @@ impl S3FileScanExecutor { s3_region, s3_access_key, s3_secret_key, + s3_endpoint, batch_size, schema, identity, @@ -90,6 +92,7 @@ impl S3FileScanExecutor { self.s3_access_key.clone(), self.s3_secret_key.clone(), bucket.clone(), + self.s3_endpoint.clone(), )?; let chunk_stream = read_parquet_file(op, file_name, None, None, self.batch_size, 0).await?; @@ -115,8 +118,6 @@ impl BoxedExecutorBuilder for FileScanExecutorBuilder { NodeBody::FileScan )?; - assert_eq!(file_scan_node.storage_type, StorageType::S3 as i32); - Ok(Box::new(S3FileScanExecutor::new( match file_scan_node::FileFormat::try_from(file_scan_node.file_format).unwrap() { file_scan_node::FileFormat::Parquet => FileFormat::Parquet, @@ -129,6 +130,7 @@ impl BoxedExecutorBuilder for FileScanExecutorBuilder { source.context().get_config().developer.chunk_size, Schema::from_iter(file_scan_node.columns.iter().map(Field::from)), source.plan_node().get_identity().clone(), + file_scan_node.s3_endpoint.clone(), ))) } } diff --git a/src/connector/src/source/iceberg/parquet_file_handler.rs b/src/connector/src/source/iceberg/parquet_file_handler.rs index 2cae369aa6a22..57854687a3e9c 100644 --- a/src/connector/src/source/iceberg/parquet_file_handler.rs +++ b/src/connector/src/source/iceberg/parquet_file_handler.rs @@ -109,16 +109,16 @@ pub fn new_s3_operator( s3_access_key: String, s3_secret_key: String, bucket: String, + s3_endpoint: String, ) -> ConnectorResult { - // Create s3 builder. - let mut builder = S3::default().bucket(&bucket).region(&s3_region); - builder = builder.secret_access_key(&s3_access_key); - builder = builder.secret_access_key(&s3_secret_key); - builder = builder.endpoint(&format!( - "https://{}.s3.{}.amazonaws.com", - bucket, s3_region - )); - + let mut builder = S3::default(); + builder = builder + .region(&s3_region) + .endpoint(&s3_endpoint) + .access_key_id(&s3_access_key) + .secret_access_key(&s3_secret_key) + .bucket(&bucket) + .disable_config_load(); let op: Operator = Operator::new(builder)? .layer(LoggingLayer::default()) .layer(RetryLayer::default()) diff --git a/src/frontend/src/expr/table_function.rs b/src/frontend/src/expr/table_function.rs index 7905747a52b88..03ac2c50cc0d9 100644 --- a/src/frontend/src/expr/table_function.rs +++ b/src/frontend/src/expr/table_function.rs @@ -159,14 +159,24 @@ impl TableFunction { }; let op = match file_scan_backend { FileScanBackend::S3 => { - let (bucket, _) = - extract_bucket_and_file_name(&input_file_location, &file_scan_backend)?; - + let (bucket, _) = extract_bucket_and_file_name( + &eval_args[5].clone(), + &file_scan_backend, + )?; + + let (s3_region, s3_endpoint) = match eval_args[2].starts_with("http") { + true => ("us-east-1".to_owned(), eval_args[2].clone()), /* for minio, hard code region as not used but needed. */ + false => ( + eval_args[2].clone(), + format!("https://{}.s3.{}.amazonaws.com", bucket, eval_args[2],), + ), + }; new_s3_operator( - eval_args[2].clone(), + s3_region.clone(), eval_args[3].clone(), eval_args[4].clone(), bucket.clone(), + s3_endpoint.clone(), )? } FileScanBackend::Gcs => { @@ -189,7 +199,6 @@ impl TableFunction { Ok::, anyhow::Error>(files) }) })?; - if files.is_empty() { return Err(BindError( "file_scan function only accepts non-empty directory".to_owned(), diff --git a/src/frontend/src/optimizer/plan_node/batch_file_scan.rs b/src/frontend/src/optimizer/plan_node/batch_file_scan.rs index 0de65f4bd8555..55a39fbca8be6 100644 --- a/src/frontend/src/optimizer/plan_node/batch_file_scan.rs +++ b/src/frontend/src/optimizer/plan_node/batch_file_scan.rs @@ -93,6 +93,7 @@ impl ToBatchPb for BatchFileScan { s3_access_key: file_scan.s3_access_key.clone(), s3_secret_key: file_scan.s3_secret_key.clone(), file_location: file_scan.file_location.clone(), + s3_endpoint: file_scan.s3_endpoint.clone(), }), generic::FileScanBackend::GcsFileScan(gcs_file_scan) => { NodeBody::GcsFileScan(GcsFileScanNode { diff --git a/src/frontend/src/optimizer/plan_node/generic/file_scan.rs b/src/frontend/src/optimizer/plan_node/generic/file_scan.rs index 95ea6e1f4fcce..03bf31ce14336 100644 --- a/src/frontend/src/optimizer/plan_node/generic/file_scan.rs +++ b/src/frontend/src/optimizer/plan_node/generic/file_scan.rs @@ -92,6 +92,7 @@ pub struct FileScan { pub s3_access_key: String, pub s3_secret_key: String, pub file_location: Vec, + pub s3_endpoint: String, #[educe(PartialEq(ignore))] #[educe(Hash(ignore))] diff --git a/src/frontend/src/optimizer/plan_node/logical_file_scan.rs b/src/frontend/src/optimizer/plan_node/logical_file_scan.rs index 429e1974d8e84..847d5f0f7e47b 100644 --- a/src/frontend/src/optimizer/plan_node/logical_file_scan.rs +++ b/src/frontend/src/optimizer/plan_node/logical_file_scan.rs @@ -48,19 +48,22 @@ impl LogicalFileScan { s3_access_key: String, s3_secret_key: String, file_location: Vec, + s3_endpoint: String, ) -> Self { assert!("parquet".eq_ignore_ascii_case(&file_format)); assert!("s3".eq_ignore_ascii_case(&storage_type)); + let storage_type = generic::StorageType::S3; let core = generic::FileScanBackend::FileScan(generic::FileScan { schema, file_format: generic::FileFormat::Parquet, - storage_type: generic::StorageType::S3, + storage_type, s3_region, s3_access_key, s3_secret_key, file_location, ctx, + s3_endpoint, }); let base = PlanBase::new_logical_with_core(&core); @@ -89,7 +92,6 @@ impl LogicalFileScan { }); let base = PlanBase::new_logical_with_core(&core); - LogicalFileScan { base, core } } } diff --git a/src/frontend/src/optimizer/rule/table_function_to_file_scan_rule.rs b/src/frontend/src/optimizer/rule/table_function_to_file_scan_rule.rs index 1c081796bbe22..c722bb7c78344 100644 --- a/src/frontend/src/optimizer/rule/table_function_to_file_scan_rule.rs +++ b/src/frontend/src/optimizer/rule/table_function_to_file_scan_rule.rs @@ -16,6 +16,7 @@ use itertools::Itertools; use risingwave_common::catalog::{Field, Schema}; use risingwave_common::types::{DataType, ScalarImpl}; use risingwave_common::util::iter_util::ZipEqDebug; +use risingwave_connector::source::iceberg::{extract_bucket_and_file_name, FileScanBackend}; use super::{BoxedRule, Rule}; use crate::expr::{Expr, TableFunctionType}; @@ -63,11 +64,19 @@ impl Rule for TableFunctionToFileScanRule { ); if "s3".eq_ignore_ascii_case(&eval_args[1]) { - let s3_region = eval_args[2].clone(); let s3_access_key = eval_args[3].clone(); let s3_secret_key = eval_args[4].clone(); - // The rest of the arguments are file locations let file_location = eval_args[5..].iter().cloned().collect_vec(); + + let (bucket, _) = + extract_bucket_and_file_name(&file_location[0], &FileScanBackend::S3).ok()?; + let (s3_region, s3_endpoint) = match eval_args[2].starts_with("http") { + true => ("us-east-1".to_owned(), eval_args[2].clone()), /* for minio, hard code region as not used but needed. */ + false => ( + eval_args[2].clone(), + format!("https://{}.s3.{}.amazonaws.com", bucket, eval_args[2],), + ), + }; Some( LogicalFileScan::new_s3_logical_file_scan( logical_table_function.ctx(), @@ -78,6 +87,7 @@ impl Rule for TableFunctionToFileScanRule { s3_access_key, s3_secret_key, file_location, + s3_endpoint, ) .into(), ) diff --git a/src/object_store/src/object/opendal_engine/opendal_s3.rs b/src/object_store/src/object/opendal_engine/opendal_s3.rs index 918a18df6f020..24b4783ff761e 100644 --- a/src/object_store/src/object/opendal_engine/opendal_s3.rs +++ b/src/object_store/src/object/opendal_engine/opendal_s3.rs @@ -78,7 +78,6 @@ impl OpendalObjectStore { "http://" }; let (address, bucket) = rest.split_once('/').unwrap(); - let builder = S3::default() .bucket(bucket) .region("custom")