diff --git a/e2e_test/sink/elasticsearch/elasticsearch_sink.slt b/e2e_test/sink/elasticsearch/elasticsearch_sink.slt index 842e3f3303438..953bc694d744a 100644 --- a/e2e_test/sink/elasticsearch/elasticsearch_sink.slt +++ b/e2e_test/sink/elasticsearch/elasticsearch_sink.slt @@ -22,6 +22,7 @@ CREATE TABLE test_route ( statement ok CREATE SINK test_route_sink from test_route WITH ( + type = 'upsert', connector = 'elasticsearch', index = 'test_route', url = 'http://elasticsearch:9200', @@ -32,6 +33,7 @@ CREATE SINK test_route_sink from test_route WITH ( statement ok CREATE SINK s7 from t7 WITH ( + type = 'upsert', connector = 'elasticsearch', index = 'test', url = 'http://elasticsearch:9200', @@ -41,6 +43,7 @@ CREATE SINK s7 from t7 WITH ( statement ok CREATE SINK s8 from t7 WITH ( + type = 'upsert', connector = 'elasticsearch', index = 'test1', primary_key = 'v1,v3', diff --git a/e2e_test/udf/create_and_drop.slt b/e2e_test/udf/create_and_drop.slt new file mode 100644 index 0000000000000..7b31dba16fdbd --- /dev/null +++ b/e2e_test/udf/create_and_drop.slt @@ -0,0 +1,115 @@ +# https://github.com/risingwavelabs/risingwave/issues/17263 + +statement ok +create table t (a int, b int); + +statement ok +create function add(a int, b int) returns int language python as $$ +def add(a, b): + return a+b +$$; + +statement error function with name add\(integer,integer\) exists +create function add(int, int) returns int language sql as $$select $1 + $2$$; + +statement ok +create function if not exists add(int, int) returns int language sql as $$select $1 + $2$$; + +statement ok +create function add_v2(int, int) returns int language sql as $$select $1 + $2$$; + +statement ok +create aggregate mysum(value int) returns int language python as $$ +def create_state(): + return 0 +def accumulate(state, value): + return state + value +def finish(state): + return state +$$; + +statement error function with name mysum\(integer\) exists +create aggregate mysum(value int) returns int language python as $$ +def create_state(): + return 0 +def accumulate(state, value): + return state + value +def finish(state): + return state +$$; + +statement ok +create aggregate if not exists mysum(value int) returns int language python as $$ +def create_state(): + return 0 +def accumulate(state, value): + return state + value +def finish(state): + return state +$$; + +statement ok +create materialized view mv as select add(a, b) + add_v2(a, b) as c from t; + +statement ok +create materialized view mv2 as select mysum(a) as s from t; + +statement error function used by 1 other objects +drop function add; + +statement error function used by 1 other objects +drop function if exists add; + +statement error function used by 1 other objects +drop function add_v2; + +statement error function used by 1 other objects +drop aggregate mysum; + +statement ok +drop materialized view mv; + +statement ok +drop materialized view mv2; + +statement ok +drop function add; + +statement error function not found +drop function add; + +statement ok +drop function if exists add; + +statement ok +drop function add_v2; + +statement ok +drop function if exists add_v2; + +statement ok +drop aggregate mysum; + +statement ok +drop aggregate if exists mysum; + +statement ok +create function add(a int, b int) returns int language python as $$ +def add(a, b): + return a+b +$$; + +statement ok +create sink s as select add(a, b) as c from t with (connector = 'blackhole'); + +statement error function used by 1 other objects +drop function add; + +statement ok +drop sink s; + +statement ok +drop function add; + +statement ok +drop table t; diff --git a/e2e_test/udf/drop_function.slt b/e2e_test/udf/drop_function.slt deleted file mode 100644 index ffe4e0eea481f..0000000000000 --- a/e2e_test/udf/drop_function.slt +++ /dev/null @@ -1,52 +0,0 @@ -# https://github.com/risingwavelabs/risingwave/issues/17263 - -statement ok -create table t (a int, b int); - -statement ok -create function add(a int, b int) returns int language python as $$ -def add(a, b): - return a+b -$$; - -statement ok -create function add_v2(INT, INT) returns int language sql as $$select $1 + $2$$; - -statement ok -create materialized view mv as select add(a, b) + add_v2(a, b) as c from t; - -statement error function used by 1 other objects -drop function add; - -statement error function used by 1 other objects -drop function add_v2; - -statement ok -drop materialized view mv; - -statement ok -drop function add; - -statement ok -drop function add_v2; - -statement ok -create function add(a int, b int) returns int language python as $$ -def add(a, b): - return a+b -$$; - -statement ok -create sink s as select add(a, b) as c from t with (connector = 'blackhole'); - -statement error function used by 1 other objects -drop function add; - -statement ok -drop sink s; - -statement ok -drop function add; - -statement ok -drop table t; diff --git a/integration_tests/feature-store/simulator/Cargo.lock b/integration_tests/feature-store/simulator/Cargo.lock index 79c7d3cad88b0..1049a14ba248f 100644 --- a/integration_tests/feature-store/simulator/Cargo.lock +++ b/integration_tests/feature-store/simulator/Cargo.lock @@ -19,15 +19,16 @@ checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" [[package]] name = "anstream" -version = "0.6.4" +version = "0.6.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2ab91ebe16eb252986481c5b62f6098f3b698a45e34b5b98200cf20dd2484a44" +checksum = "8acc5369981196006228e28809f761875c0327210a891e941f4c683b3a99529b" dependencies = [ "anstyle", "anstyle-parse", "anstyle-query", "anstyle-wincon", "colorchoice", + "is_terminal_polyfill", "utf8parse", ] @@ -52,17 +53,17 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5ca11d4be1bab0c8bc8734a9aa7bf4ee8316d462a08c6ac5052f888fef5b494b" dependencies = [ - "windows-sys", + "windows-sys 0.48.0", ] [[package]] name = "anstyle-wincon" -version = "3.0.1" +version = "3.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f0699d10d2f4d628a98ee7b57b289abbc98ff3bad977cb3152709d4bf2330628" +checksum = "2109dbce0e72be3ec00bed26e6a7479ca384ad226efdd66db8fa2e3a38c83125" dependencies = [ "anstyle", - "windows-sys", + "windows-sys 0.59.0", ] [[package]] @@ -508,6 +509,12 @@ dependencies = [ "hashbrown 0.14.3", ] +[[package]] +name = "is_terminal_polyfill" +version = "1.70.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf" + [[package]] name = "itertools" version = "0.11.0" @@ -564,7 +571,7 @@ checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c" dependencies = [ "libc", "wasi", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -797,7 +804,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4031e820eb552adee9295814c0ced9e5cf38ddf1e8b7d566d6de8e2538ea989e" dependencies = [ "libc", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -837,7 +844,7 @@ dependencies = [ "pin-project-lite", "socket2 0.5.4", "tokio-macros", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -1037,7 +1044,16 @@ version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" dependencies = [ - "windows-targets", + "windows-targets 0.48.5", +] + +[[package]] +name = "windows-sys" +version = "0.59.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b" +dependencies = [ + "windows-targets 0.52.6", ] [[package]] @@ -1046,13 +1062,29 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c" dependencies = [ - "windows_aarch64_gnullvm", - "windows_aarch64_msvc", - "windows_i686_gnu", - "windows_i686_msvc", - "windows_x86_64_gnu", - "windows_x86_64_gnullvm", - "windows_x86_64_msvc", + "windows_aarch64_gnullvm 0.48.5", + "windows_aarch64_msvc 0.48.5", + "windows_i686_gnu 0.48.5", + "windows_i686_msvc 0.48.5", + "windows_x86_64_gnu 0.48.5", + "windows_x86_64_gnullvm 0.48.5", + "windows_x86_64_msvc 0.48.5", +] + +[[package]] +name = "windows-targets" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973" +dependencies = [ + "windows_aarch64_gnullvm 0.52.6", + "windows_aarch64_msvc 0.52.6", + "windows_i686_gnu 0.52.6", + "windows_i686_gnullvm", + "windows_i686_msvc 0.52.6", + "windows_x86_64_gnu 0.52.6", + "windows_x86_64_gnullvm 0.52.6", + "windows_x86_64_msvc 0.52.6", ] [[package]] @@ -1061,38 +1093,86 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" + [[package]] name = "windows_aarch64_msvc" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" +[[package]] +name = "windows_aarch64_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" + [[package]] name = "windows_i686_gnu" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" +[[package]] +name = "windows_i686_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" + +[[package]] +name = "windows_i686_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" + [[package]] name = "windows_i686_msvc" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" +[[package]] +name = "windows_i686_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" + [[package]] name = "windows_x86_64_gnu" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" +[[package]] +name = "windows_x86_64_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" + [[package]] name = "windows_x86_64_gnullvm" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" + [[package]] name = "windows_x86_64_msvc" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" diff --git a/src/connector/src/connector_common/iceberg/mod.rs b/src/connector/src/connector_common/iceberg/mod.rs index ff53d80e89598..6964e03a5b99f 100644 --- a/src/connector/src/connector_common/iceberg/mod.rs +++ b/src/connector/src/connector_common/iceberg/mod.rs @@ -387,7 +387,7 @@ impl IcebergCommon { let catalog = iceberg_catalog_rest::RestCatalog::new(config); Ok(Arc::new(catalog)) } - "glue" => { + "glue_rust" => { let mut iceberg_configs = HashMap::new(); // glue if let Some(region) = &self.region { @@ -427,7 +427,10 @@ impl IcebergCommon { Ok(Arc::new(catalog)) } catalog_type - if catalog_type == "hive" || catalog_type == "jdbc" || catalog_type == "rest" => + if catalog_type == "hive" + || catalog_type == "jdbc" + || catalog_type == "rest" + || catalog_type == "glue" => { // Create java catalog let (file_io_props, java_catalog_props) = @@ -436,6 +439,7 @@ impl IcebergCommon { "hive" => "org.apache.iceberg.hive.HiveCatalog", "jdbc" => "org.apache.iceberg.jdbc.JdbcCatalog", "rest" => "org.apache.iceberg.rest.RESTCatalog", + "glue" => "org.apache.iceberg.aws.glue.GlueCatalog", _ => unreachable!(), }; diff --git a/src/connector/src/sink/elasticsearch_opensearch/elasticsearch.rs b/src/connector/src/sink/elasticsearch_opensearch/elasticsearch.rs index 1f0ac754fcad2..fd4cbed31698d 100644 --- a/src/connector/src/sink/elasticsearch_opensearch/elasticsearch.rs +++ b/src/connector/src/sink/elasticsearch_opensearch/elasticsearch.rs @@ -28,6 +28,7 @@ pub struct ElasticSearchSink { config: ElasticSearchOpenSearchConfig, schema: Schema, pk_indices: Vec, + is_append_only: bool, } #[async_trait] @@ -41,6 +42,7 @@ impl TryFrom for ElasticSearchSink { config, schema, pk_indices: param.downstream_pk, + is_append_only: param.sink_type.is_append_only(), }) } } @@ -64,6 +66,7 @@ impl Sink for ElasticSearchSink { self.schema.clone(), self.pk_indices.clone(), Self::SINK_NAME, + self.is_append_only, )? .into_log_sinker(self.config.concurrent_requests)) } diff --git a/src/connector/src/sink/elasticsearch_opensearch/elasticsearch_converter.rs b/src/connector/src/sink/elasticsearch_opensearch/elasticsearch_converter.rs index dc19c99261aa7..ba2571ff2a130 100644 --- a/src/connector/src/sink/elasticsearch_opensearch/elasticsearch_converter.rs +++ b/src/connector/src/sink/elasticsearch_opensearch/elasticsearch_converter.rs @@ -39,6 +39,7 @@ impl StreamChunkConverter { schema: Schema, pk_indices: &Vec, properties: &BTreeMap, + is_append_only: bool, ) -> Result { if is_remote_es_sink(sink_name) { let index_column = properties @@ -71,6 +72,7 @@ impl StreamChunkConverter { index_column, index, routing_column, + is_append_only, )?)) } else { Ok(StreamChunkConverter::Other) @@ -79,13 +81,14 @@ impl StreamChunkConverter { pub fn convert_chunk(&self, chunk: StreamChunk) -> Result { match self { - StreamChunkConverter::Es(es) => es.convert_chunk(chunk), + StreamChunkConverter::Es(es) => es.convert_chunk(chunk, es.is_append_only), StreamChunkConverter::Other => Ok(chunk), } } } pub struct EsStreamChunkConverter { formatter: ElasticSearchOpenSearchFormatter, + is_append_only: bool, } impl EsStreamChunkConverter { pub fn new( @@ -95,6 +98,7 @@ impl EsStreamChunkConverter { index_column: Option, index: Option, routing_column: Option, + is_append_only: bool, ) -> Result { let formatter = ElasticSearchOpenSearchFormatter::new( pk_indices, @@ -104,10 +108,13 @@ impl EsStreamChunkConverter { index, routing_column, )?; - Ok(Self { formatter }) + Ok(Self { + formatter, + is_append_only, + }) } - fn convert_chunk(&self, chunk: StreamChunk) -> Result { + fn convert_chunk(&self, chunk: StreamChunk, is_append_only: bool) -> Result { let mut ops = Vec::with_capacity(chunk.capacity()); let mut id_string_builder = ::new(chunk.capacity()); @@ -117,7 +124,7 @@ impl EsStreamChunkConverter { ::new(chunk.capacity()); let mut routing_builder = ::new(chunk.capacity()); - for build_bulk_para in self.formatter.convert_chunk(chunk)? { + for build_bulk_para in self.formatter.convert_chunk(chunk, is_append_only)? { let BuildBulkPara { key, value, diff --git a/src/connector/src/sink/elasticsearch_opensearch/elasticsearch_opensearch_client.rs b/src/connector/src/sink/elasticsearch_opensearch/elasticsearch_opensearch_client.rs index 5369fcdee6aab..eb02d1310b650 100644 --- a/src/connector/src/sink/elasticsearch_opensearch/elasticsearch_opensearch_client.rs +++ b/src/connector/src/sink/elasticsearch_opensearch/elasticsearch_opensearch_client.rs @@ -160,6 +160,7 @@ pub struct ElasticSearchOpenSearchSinkWriter { client: Arc, formatter: ElasticSearchOpenSearchFormatter, config: ElasticSearchOpenSearchConfig, + is_append_only: bool, } impl ElasticSearchOpenSearchSinkWriter { @@ -168,6 +169,7 @@ impl ElasticSearchOpenSearchSinkWriter { schema: Schema, pk_indices: Vec, connector: &str, + is_append_only: bool, ) -> Result { let client = Arc::new(config.build_client(connector)?); let formatter = ElasticSearchOpenSearchFormatter::new( @@ -182,6 +184,7 @@ impl ElasticSearchOpenSearchSinkWriter { client, formatter, config, + is_append_only, }) } } @@ -202,7 +205,7 @@ impl AsyncTruncateSinkWriter for ElasticSearchOpenSearchSinkWriter { let mut bulks: Vec = Vec::with_capacity(chunk_capacity); let mut bulks_size = 0; - for build_bulk_para in self.formatter.convert_chunk(chunk)? { + for build_bulk_para in self.formatter.convert_chunk(chunk, self.is_append_only)? { let BuildBulkPara { key, value, diff --git a/src/connector/src/sink/elasticsearch_opensearch/elasticsearch_opensearch_config.rs b/src/connector/src/sink/elasticsearch_opensearch/elasticsearch_opensearch_config.rs index e2907301d78a9..24f79dd84030b 100644 --- a/src/connector/src/sink/elasticsearch_opensearch/elasticsearch_opensearch_config.rs +++ b/src/connector/src/sink/elasticsearch_opensearch/elasticsearch_opensearch_config.rs @@ -77,6 +77,8 @@ pub struct ElasticSearchOpenSearchConfig { #[serde_as(as = "DisplayFromStr")] #[serde(default = "default_concurrent_requests")] pub concurrent_requests: usize, + + pub r#type: String, } fn default_retry_on_conflict() -> i32 { diff --git a/src/connector/src/sink/elasticsearch_opensearch/elasticsearch_opensearch_formatter.rs b/src/connector/src/sink/elasticsearch_opensearch/elasticsearch_opensearch_formatter.rs index 660e26d578834..3cfda0601b617 100644 --- a/src/connector/src/sink/elasticsearch_opensearch/elasticsearch_opensearch_formatter.rs +++ b/src/connector/src/sink/elasticsearch_opensearch/elasticsearch_opensearch_formatter.rs @@ -113,7 +113,11 @@ impl ElasticSearchOpenSearchFormatter { }) } - pub fn convert_chunk(&self, chunk: StreamChunk) -> Result> { + pub fn convert_chunk( + &self, + chunk: StreamChunk, + is_append_only: bool, + ) -> Result> { let mut result_vec = Vec::with_capacity(chunk.capacity()); for (op, rows) in chunk.rows() { let index = if let Some(index_column) = self.index_column { @@ -157,6 +161,11 @@ impl ElasticSearchOpenSearchFormatter { }); } Op::Delete => { + if is_append_only { + return Err(SinkError::ElasticSearchOpenSearch(anyhow!( + "`Delete` operation is not supported in `append_only` mode" + ))); + } let key = self.key_encoder.encode(rows)?; let mem_size_b = std::mem::size_of_val(&key); result_vec.push(BuildBulkPara { @@ -167,7 +176,15 @@ impl ElasticSearchOpenSearchFormatter { routing_column, }); } - Op::UpdateDelete => continue, + Op::UpdateDelete => { + if is_append_only { + return Err(SinkError::ElasticSearchOpenSearch(anyhow!( + "`UpdateDelete` operation is not supported in `append_only` mode" + ))); + } else { + continue; + } + } } } Ok(result_vec) diff --git a/src/connector/src/sink/elasticsearch_opensearch/opensearch.rs b/src/connector/src/sink/elasticsearch_opensearch/opensearch.rs index b0df3c8ca2345..bf9b79f6a8850 100644 --- a/src/connector/src/sink/elasticsearch_opensearch/opensearch.rs +++ b/src/connector/src/sink/elasticsearch_opensearch/opensearch.rs @@ -30,6 +30,7 @@ pub struct OpenSearchSink { config: ElasticSearchOpenSearchConfig, schema: Schema, pk_indices: Vec, + is_append_only: bool, } #[async_trait] @@ -43,6 +44,7 @@ impl TryFrom for OpenSearchSink { config, schema, pk_indices: param.downstream_pk, + is_append_only: param.sink_type.is_append_only(), }) } } @@ -69,6 +71,7 @@ impl Sink for OpenSearchSink { self.schema.clone(), self.pk_indices.clone(), Self::SINK_NAME, + self.is_append_only, )? .into_log_sinker(self.config.concurrent_requests)) } diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index 26683966f566b..582a5c861bb5c 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -301,6 +301,7 @@ impl RemoteLogSinker { sink_param.schema(), &sink_param.downstream_pk, &sink_param.properties, + sink_param.sink_type.is_append_only(), )?, }) } diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index d7291c8ecc61d..819e597fec448 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -331,6 +331,9 @@ ElasticSearchOpenSearchConfig: - name: concurrent_requests field_type: usize required: true + - name: r#type + field_type: String + required: true FsConfig: fields: - name: fs.path diff --git a/src/frontend/src/handler/create_aggregate.rs b/src/frontend/src/handler/create_aggregate.rs index 32f326db9b1d9..85ba343fef408 100644 --- a/src/frontend/src/handler/create_aggregate.rs +++ b/src/frontend/src/handler/create_aggregate.rs @@ -13,6 +13,7 @@ // limitations under the License. use anyhow::Context; +use either::Either; use risingwave_common::catalog::FunctionId; use risingwave_expr::sig::{CreateFunctionOptions, UdfKind}; use risingwave_pb::catalog::function::{AggregateFunction, Kind}; @@ -20,12 +21,12 @@ use risingwave_pb::catalog::Function; use risingwave_sqlparser::ast::DataType as AstDataType; use super::*; -use crate::catalog::CatalogError; use crate::{bind_data_type, Binder}; pub async fn handle_create_aggregate( handler_args: HandlerArgs, or_replace: bool, + if_not_exists: bool, name: ObjectName, args: Vec, returns: AstDataType, @@ -74,20 +75,18 @@ pub async fn handle_create_aggregate( // resolve database and schema id let session = &handler_args.session; let db_name = &session.database(); - let (schema_name, function_name) = Binder::resolve_schema_qualified_name(db_name, name)?; + let (schema_name, function_name) = + Binder::resolve_schema_qualified_name(db_name, name.clone())?; let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name)?; // check if the function exists in the catalog - if (session.env().catalog_reader().read_guard()) - .get_schema_by_id(&database_id, &schema_id)? - .get_function_by_name_args(&function_name, &arg_types) - .is_some() - { - let name = format!( - "{function_name}({})", - arg_types.iter().map(|t| t.to_string()).join(",") - ); - return Err(CatalogError::Duplicated("function", name).into()); + if let Either::Right(resp) = session.check_function_name_duplicated( + StatementType::CREATE_FUNCTION, + name, + &arg_types, + if_not_exists, + )? { + return Ok(resp); } let link = match ¶ms.using { diff --git a/src/frontend/src/handler/create_function.rs b/src/frontend/src/handler/create_function.rs index b87d3c90a3488..c212eaebb56f4 100644 --- a/src/frontend/src/handler/create_function.rs +++ b/src/frontend/src/handler/create_function.rs @@ -13,6 +13,7 @@ // limitations under the License. use anyhow::Context; +use either::Either; use risingwave_common::catalog::FunctionId; use risingwave_common::types::StructType; use risingwave_expr::sig::{CreateFunctionOptions, UdfKind}; @@ -20,13 +21,13 @@ use risingwave_pb::catalog::function::{Kind, ScalarFunction, TableFunction}; use risingwave_pb::catalog::Function; use super::*; -use crate::catalog::CatalogError; use crate::{bind_data_type, Binder}; pub async fn handle_create_function( handler_args: HandlerArgs, or_replace: bool, temporary: bool, + if_not_exists: bool, name: ObjectName, args: Option>, returns: Option, @@ -107,20 +108,18 @@ pub async fn handle_create_function( // resolve database and schema id let session = &handler_args.session; let db_name = &session.database(); - let (schema_name, function_name) = Binder::resolve_schema_qualified_name(db_name, name)?; + let (schema_name, function_name) = + Binder::resolve_schema_qualified_name(db_name, name.clone())?; let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name)?; // check if the function exists in the catalog - if (session.env().catalog_reader().read_guard()) - .get_schema_by_id(&database_id, &schema_id)? - .get_function_by_name_args(&function_name, &arg_types) - .is_some() - { - let name = format!( - "{function_name}({})", - arg_types.iter().map(|t| t.to_string()).join(",") - ); - return Err(CatalogError::Duplicated("function", name).into()); + if let Either::Right(resp) = session.check_function_name_duplicated( + StatementType::CREATE_FUNCTION, + name, + &arg_types, + if_not_exists, + )? { + return Ok(resp); } let link = match ¶ms.using { diff --git a/src/frontend/src/handler/create_sql_function.rs b/src/frontend/src/handler/create_sql_function.rs index 4725b37ab6511..71a31ce5173cc 100644 --- a/src/frontend/src/handler/create_sql_function.rs +++ b/src/frontend/src/handler/create_sql_function.rs @@ -14,6 +14,7 @@ use std::collections::HashMap; +use either::Either; use fancy_regex::Regex; use risingwave_common::catalog::FunctionId; use risingwave_common::types::{DataType, StructType}; @@ -23,7 +24,6 @@ use risingwave_sqlparser::parser::{Parser, ParserError}; use super::*; use crate::binder::UdfContext; -use crate::catalog::CatalogError; use crate::expr::{Expr, ExprImpl, Literal}; use crate::{bind_data_type, Binder}; @@ -122,6 +122,7 @@ pub async fn handle_create_sql_function( handler_args: HandlerArgs, or_replace: bool, temporary: bool, + if_not_exists: bool, name: ObjectName, args: Option>, returns: Option, @@ -214,20 +215,18 @@ pub async fn handle_create_sql_function( // resolve database and schema id let session = &handler_args.session; let db_name = &session.database(); - let (schema_name, function_name) = Binder::resolve_schema_qualified_name(db_name, name)?; + let (schema_name, function_name) = + Binder::resolve_schema_qualified_name(db_name, name.clone())?; let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name)?; // check if function exists - if (session.env().catalog_reader().read_guard()) - .get_schema_by_id(&database_id, &schema_id)? - .get_function_by_name_args(&function_name, &arg_types) - .is_some() - { - let name = format!( - "{function_name}({})", - arg_types.iter().map(|t| t.to_string()).join(",") - ); - return Err(CatalogError::Duplicated("function", name).into()); + if let Either::Right(resp) = session.check_function_name_duplicated( + StatementType::CREATE_FUNCTION, + name, + &arg_types, + if_not_exists, + )? { + return Ok(resp); } // Parse function body here diff --git a/src/frontend/src/handler/explain.rs b/src/frontend/src/handler/explain.rs index e5d20ce289724..5f55f9aac378c 100644 --- a/src/frontend/src/handler/explain.rs +++ b/src/frontend/src/handler/explain.rs @@ -220,6 +220,7 @@ async fn do_handle_explain( worker_node_manager_reader, session.env().catalog_reader().clone(), session.config().batch_parallelism().0, + session.config().timezone().to_owned(), plan.clone(), )?); batch_plan_fragmenter_fmt = if explain_format == ExplainFormat::Dot { diff --git a/src/frontend/src/handler/mod.rs b/src/frontend/src/handler/mod.rs index 446187335b06a..4245e66c3034a 100644 --- a/src/frontend/src/handler/mod.rs +++ b/src/frontend/src/handler/mod.rs @@ -198,8 +198,13 @@ impl HandlerArgs { fn normalize_sql(stmt: &Statement) -> String { let mut stmt = stmt.clone(); match &mut stmt { - Statement::CreateView { or_replace, .. } => { + Statement::CreateView { + or_replace, + if_not_exists, + .. + } => { *or_replace = false; + *if_not_exists = false; } Statement::CreateTable { or_replace, @@ -273,6 +278,7 @@ pub async fn handle( Statement::CreateFunction { or_replace, temporary, + if_not_exists, name, args, returns, @@ -293,6 +299,7 @@ pub async fn handle( handler_args, or_replace, temporary, + if_not_exists, name, args, returns, @@ -305,6 +312,7 @@ pub async fn handle( handler_args, or_replace, temporary, + if_not_exists, name, args, returns, @@ -315,6 +323,7 @@ pub async fn handle( } Statement::CreateAggregate { or_replace, + if_not_exists, name, args, returns, @@ -324,6 +333,7 @@ pub async fn handle( create_aggregate::handle_create_aggregate( handler_args, or_replace, + if_not_exists, name, args, returns, diff --git a/src/frontend/src/handler/query.rs b/src/frontend/src/handler/query.rs index 4f4b1f2187aa0..ec5a3ee393d40 100644 --- a/src/frontend/src/handler/query.rs +++ b/src/frontend/src/handler/query.rs @@ -373,6 +373,7 @@ pub fn gen_batch_plan_fragmenter( worker_node_manager_reader, session.env().catalog_reader().clone(), session.config().batch_parallelism().0, + session.config().timezone().to_owned(), plan, )?; diff --git a/src/frontend/src/scheduler/distributed/query.rs b/src/frontend/src/scheduler/distributed/query.rs index a843aa1b59fe4..cf6319c894a0e 100644 --- a/src/frontend/src/scheduler/distributed/query.rs +++ b/src/frontend/src/scheduler/distributed/query.rs @@ -734,6 +734,7 @@ pub(crate) mod tests { worker_node_selector, catalog_reader, None, + "UTC".to_owned(), batch_exchange_node.clone(), ) .unwrap(); diff --git a/src/frontend/src/scheduler/plan_fragmenter.rs b/src/frontend/src/scheduler/plan_fragmenter.rs index 76f5428348003..3949f1f2d9f5d 100644 --- a/src/frontend/src/scheduler/plan_fragmenter.rs +++ b/src/frontend/src/scheduler/plan_fragmenter.rs @@ -20,6 +20,7 @@ use std::sync::Arc; use anyhow::anyhow; use async_recursion::async_recursion; +use chrono::{MappedLocalTime, TimeZone}; use enum_as_inner::EnumAsInner; use futures::TryStreamExt; use iceberg::expr::Predicate as IcebergPredicate; @@ -33,6 +34,7 @@ use risingwave_common::bitmap::{Bitmap, BitmapBuilder}; use risingwave_common::catalog::{Schema, TableDesc}; use risingwave_common::hash::table_distribution::TableDistribution; use risingwave_common::hash::{WorkerSlotId, WorkerSlotMapping}; +use risingwave_common::types::Timestamptz; use risingwave_common::util::scan_range::ScanRange; use risingwave_connector::source::filesystem::opendal_source::opendal_enumerator::OpendalEnumerator; use risingwave_connector::source::filesystem::opendal_source::{ @@ -145,6 +147,7 @@ pub struct BatchPlanFragmenter { catalog_reader: CatalogReader, batch_parallelism: usize, + timezone: String, stage_graph_builder: Option, stage_graph: Option, @@ -163,6 +166,7 @@ impl BatchPlanFragmenter { worker_node_manager: WorkerNodeSelector, catalog_reader: CatalogReader, batch_parallelism: Option, + timezone: String, batch_node: PlanRef, ) -> SchedulerResult { // if batch_parallelism is None, it means no limit, we will use the available nodes count as @@ -186,6 +190,7 @@ impl BatchPlanFragmenter { worker_node_manager, catalog_reader, batch_parallelism, + timezone, stage_graph_builder: Some(StageGraphBuilder::new(batch_parallelism)), stage_graph: None, }; @@ -311,7 +316,11 @@ impl SourceScanInfo { Self::Incomplete(fetch_info) } - pub async fn complete(self, batch_parallelism: usize) -> SchedulerResult { + pub async fn complete( + self, + batch_parallelism: usize, + timezone: String, + ) -> SchedulerResult { let fetch_info = match self { SourceScanInfo::Incomplete(fetch_info) => fetch_info, SourceScanInfo::Complete(_) => { @@ -382,15 +391,36 @@ impl SourceScanInfo { Some(AsOf::VersionString(_)) => { bail!("Unsupported version string in iceberg time travel") } - Some(AsOf::TimestampString(ts)) => Some( - speedate::DateTime::parse_str_rfc3339(&ts) - .map(|t| { - IcebergTimeTravelInfo::TimestampMs( - t.timestamp_tz() * 1000 + t.time.microsecond as i64 / 1000, - ) - }) - .map_err(|_e| anyhow!("fail to parse timestamp"))?, - ), + Some(AsOf::TimestampString(ts)) => { + let date_time = speedate::DateTime::parse_str_rfc3339(&ts) + .map_err(|_e| anyhow!("fail to parse timestamp"))?; + let timestamp = if date_time.time.tz_offset.is_none() { + // If the input does not specify a time zone, use the time zone set by the "SET TIME ZONE" command. + let tz = + Timestamptz::lookup_time_zone(&timezone).map_err(|e| anyhow!(e))?; + match tz.with_ymd_and_hms( + date_time.date.year.into(), + date_time.date.month.into(), + date_time.date.day.into(), + date_time.time.hour.into(), + date_time.time.minute.into(), + date_time.time.second.into(), + ) { + MappedLocalTime::Single(d) => Ok(d.timestamp()), + MappedLocalTime::Ambiguous(_, _) | MappedLocalTime::None => { + Err(anyhow!(format!( + "failed to parse the timestamp {ts} with the specified time zone {tz}" + ))) + } + }? + } else { + date_time.timestamp_tz() + }; + + Some(IcebergTimeTravelInfo::TimestampMs( + timestamp * 1000 + date_time.time.microsecond as i64 / 1000, + )) + } Some(AsOf::ProcessTime) | Some(AsOf::ProcessTimeWithInterval(_)) => { unreachable!() } @@ -731,6 +761,7 @@ impl StageGraph { self, catalog_reader: &CatalogReader, worker_node_manager: &WorkerNodeSelector, + timezone: String, ) -> SchedulerResult { let mut complete_stages = HashMap::new(); self.complete_stage( @@ -739,6 +770,7 @@ impl StageGraph { &mut complete_stages, catalog_reader, worker_node_manager, + timezone, ) .await?; Ok(StageGraph { @@ -758,6 +790,7 @@ impl StageGraph { complete_stages: &mut HashMap, catalog_reader: &CatalogReader, worker_node_manager: &WorkerNodeSelector, + timezone: String, ) -> SchedulerResult<()> { let parallelism = if stage.parallelism.is_some() { // If the stage has parallelism, it means it's a complete stage. @@ -772,7 +805,7 @@ impl StageGraph { .as_ref() .unwrap() .clone() - .complete(self.batch_parallelism) + .complete(self.batch_parallelism, timezone.to_owned()) .await?; // For batch reading file source, the number of files involved is typically large. @@ -842,6 +875,7 @@ impl StageGraph { complete_stages, catalog_reader, worker_node_manager, + timezone.to_owned(), ) .await?; } @@ -935,7 +969,11 @@ impl BatchPlanFragmenter { pub async fn generate_complete_query(self) -> SchedulerResult { let stage_graph = self.stage_graph.unwrap(); let new_stage_graph = stage_graph - .complete(&self.catalog_reader, &self.worker_node_manager) + .complete( + &self.catalog_reader, + &self.worker_node_manager, + self.timezone.to_owned(), + ) .await?; Ok(Query { query_id: self.query_id, diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index fe03af7c0c786..7f99d91d6e0d5 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -22,6 +22,7 @@ use std::time::{Duration, Instant}; use anyhow::anyhow; use bytes::Bytes; use either::Either; +use itertools::Itertools; use parking_lot::{Mutex, RwLock, RwLockReadGuard}; use pgwire::error::{PsqlError, PsqlResult}; use pgwire::net::{Address, AddressRef}; @@ -961,6 +962,44 @@ impl SessionImpl { .map_err(RwError::from) } + pub fn check_function_name_duplicated( + &self, + stmt_type: StatementType, + name: ObjectName, + arg_types: &[DataType], + if_not_exists: bool, + ) -> Result> { + let db_name = &self.database(); + let (schema_name, function_name) = Binder::resolve_schema_qualified_name(db_name, name)?; + let (database_id, schema_id) = self.get_database_and_schema_id_for_create(schema_name)?; + + let catalog_reader = self.env().catalog_reader().read_guard(); + if catalog_reader + .get_schema_by_id(&database_id, &schema_id)? + .get_function_by_name_args(&function_name, arg_types) + .is_some() + { + let full_name = format!( + "{function_name}({})", + arg_types.iter().map(|t| t.to_string()).join(",") + ); + if if_not_exists { + Ok(Either::Right( + PgResponse::builder(stmt_type) + .notice(format!( + "function \"{}\" already exists, skipping", + full_name + )) + .into(), + )) + } else { + Err(CatalogError::Duplicated("function", full_name).into()) + } + } else { + Ok(Either::Left(())) + } + } + /// Also check if the user has the privilege to create in the schema. pub fn get_database_and_schema_id_for_create( &self, diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index f30e20b50f0b8..8676b2583227c 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -17,6 +17,7 @@ use std::num::NonZeroUsize; use anyhow::anyhow; use itertools::Itertools; +use risingwave_common::config::DefaultParallelism; use risingwave_common::hash::VnodeCountCompat; use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_common::util::stream_graph_visitor::visit_stream_node; @@ -119,9 +120,10 @@ impl CatalogController { let txn = inner.db.begin().await?; let create_type = streaming_job.create_type(); - let streaming_parallelism = match parallelism { - None => StreamingParallelism::Adaptive, - Some(n) => StreamingParallelism::Fixed(n.parallelism as _), + let streaming_parallelism = match (parallelism, self.env.opts.default_parallelism) { + (None, DefaultParallelism::Full) => StreamingParallelism::Adaptive, + (None, DefaultParallelism::Default(n)) => StreamingParallelism::Fixed(n.get()), + (Some(n), _) => StreamingParallelism::Fixed(n.parallelism as _), }; ensure_user_id(streaming_job.owner() as _, &txn).await?; diff --git a/src/meta/src/hummock/manager/checkpoint.rs b/src/meta/src/hummock/manager/checkpoint.rs index 5f9664b64a011..bb47248686129 100644 --- a/src/meta/src/hummock/manager/checkpoint.rs +++ b/src/meta/src/hummock/manager/checkpoint.rs @@ -179,7 +179,7 @@ impl HummockManager { .change_log_delta .values() .flat_map(|change_log| { - let new_log = change_log.new_log.as_ref().unwrap(); + let new_log = &change_log.new_log; new_log .new_value .iter() diff --git a/src/meta/src/hummock/manager/gc.rs b/src/meta/src/hummock/manager/gc.rs index 7bd7034ae1f99..eba07b7002244 100644 --- a/src/meta/src/hummock/manager/gc.rs +++ b/src/meta/src/hummock/manager/gc.rs @@ -535,7 +535,7 @@ impl HummockManager { break; } let delete_batch: HashSet<_> = objects_to_delete.drain(..batch_size).collect(); - tracing::debug!(?delete_batch, "Attempt to delete objects."); + tracing::info!(?delete_batch, "Attempt to delete objects."); let deleted_object_ids = delete_batch.clone(); self.gc_manager .delete_objects(delete_batch.into_iter()) diff --git a/src/sqlparser/src/ast/mod.rs b/src/sqlparser/src/ast/mod.rs index 16496b71c97eb..98f8599ccf89a 100644 --- a/src/sqlparser/src/ast/mod.rs +++ b/src/sqlparser/src/ast/mod.rs @@ -1349,6 +1349,7 @@ pub enum Statement { CreateFunction { or_replace: bool, temporary: bool, + if_not_exists: bool, name: ObjectName, args: Option>, returns: Option, @@ -1361,6 +1362,7 @@ pub enum Statement { /// Postgres: CreateAggregate { or_replace: bool, + if_not_exists: bool, name: ObjectName, args: Vec, returns: DataType, @@ -1768,6 +1770,7 @@ impl fmt::Display for Statement { Statement::CreateFunction { or_replace, temporary, + if_not_exists, name, args, returns, @@ -1776,9 +1779,10 @@ impl fmt::Display for Statement { } => { write!( f, - "CREATE {or_replace}{temp}FUNCTION {name}", + "CREATE {or_replace}{temp}FUNCTION {if_not_exists}{name}", temp = if *temporary { "TEMPORARY " } else { "" }, or_replace = if *or_replace { "OR REPLACE " } else { "" }, + if_not_exists = if *if_not_exists { "IF NOT EXISTS " } else { "" }, )?; if let Some(args) = args { write!(f, "({})", display_comma_separated(args))?; @@ -1792,6 +1796,7 @@ impl fmt::Display for Statement { } Statement::CreateAggregate { or_replace, + if_not_exists, name, args, returns, @@ -1800,8 +1805,9 @@ impl fmt::Display for Statement { } => { write!( f, - "CREATE {or_replace}AGGREGATE {name}", + "CREATE {or_replace}AGGREGATE {if_not_exists}{name}", or_replace = if *or_replace { "OR REPLACE " } else { "" }, + if_not_exists = if *if_not_exists { "IF NOT EXISTS " } else { "" }, )?; write!(f, "({})", display_comma_separated(args))?; write!(f, " RETURNS {}", returns)?; @@ -3551,8 +3557,9 @@ mod tests { #[test] fn test_create_function_display() { let create_function = Statement::CreateFunction { - temporary: false, or_replace: false, + temporary: false, + if_not_exists: false, name: ObjectName(vec![Ident::new_unchecked("foo")]), args: Some(vec![OperateFunctionArg::unnamed(DataType::Int)]), returns: Some(CreateFunctionReturns::Value(DataType::Int)), @@ -3573,8 +3580,9 @@ mod tests { format!("{}", create_function) ); let create_function = Statement::CreateFunction { - temporary: false, or_replace: false, + temporary: false, + if_not_exists: false, name: ObjectName(vec![Ident::new_unchecked("foo")]), args: Some(vec![OperateFunctionArg::unnamed(DataType::Int)]), returns: Some(CreateFunctionReturns::Value(DataType::Int)), diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index 9eb3d9e439967..2df0183cf5c5a 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -2210,6 +2210,8 @@ impl Parser<'_> { or_replace: bool, temporary: bool, ) -> PResult { + impl_parse_to!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], self); + let name = self.parse_object_name()?; self.expect_token(&Token::LParen)?; let args = if self.peek_token().token == Token::RParen { @@ -2248,6 +2250,7 @@ impl Parser<'_> { Ok(Statement::CreateFunction { or_replace, temporary, + if_not_exists, name, args, returns: return_type, @@ -2257,6 +2260,8 @@ impl Parser<'_> { } fn parse_create_aggregate(&mut self, or_replace: bool) -> PResult { + impl_parse_to!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], self); + let name = self.parse_object_name()?; self.expect_token(&Token::LParen)?; let args = self.parse_comma_separated(Parser::parse_function_arg)?; @@ -2270,6 +2275,7 @@ impl Parser<'_> { Ok(Statement::CreateAggregate { or_replace, + if_not_exists, name, args, returns, diff --git a/src/sqlparser/tests/sqlparser_postgres.rs b/src/sqlparser/tests/sqlparser_postgres.rs index 7acf6d29b4444..1466a9024a6d5 100644 --- a/src/sqlparser/tests/sqlparser_postgres.rs +++ b/src/sqlparser/tests/sqlparser_postgres.rs @@ -753,6 +753,7 @@ fn parse_create_function() { Statement::CreateFunction { or_replace: false, temporary: false, + if_not_exists: false, name: ObjectName(vec![Ident::new_unchecked("add")]), args: Some(vec![ OperateFunctionArg::unnamed(DataType::Int), @@ -777,6 +778,7 @@ fn parse_create_function() { Statement::CreateFunction { or_replace: false, temporary: false, + if_not_exists: false, name: ObjectName(vec![Ident::new_unchecked("sub")]), args: Some(vec![ OperateFunctionArg::unnamed(DataType::Int), @@ -801,6 +803,7 @@ fn parse_create_function() { Statement::CreateFunction { or_replace: false, temporary: false, + if_not_exists: false, name: ObjectName(vec![Ident::new_unchecked("return_test")]), args: Some(vec![ OperateFunctionArg::unnamed(DataType::Int), @@ -826,6 +829,7 @@ fn parse_create_function() { Statement::CreateFunction { or_replace: true, temporary: false, + if_not_exists: false, name: ObjectName(vec![Ident::new_unchecked("add")]), args: Some(vec![ OperateFunctionArg::with_name("a", DataType::Int), @@ -851,12 +855,14 @@ fn parse_create_function() { } ); - let sql = "CREATE FUNCTION unnest(a INT[]) RETURNS TABLE (x INT) LANGUAGE SQL RETURN a"; + let sql = + "CREATE TEMPORARY FUNCTION unnest(a INT[]) RETURNS TABLE (x INT) LANGUAGE SQL RETURN a"; assert_eq!( verified_stmt(sql), Statement::CreateFunction { or_replace: false, - temporary: false, + temporary: true, + if_not_exists: false, name: ObjectName(vec![Ident::new_unchecked("unnest")]), args: Some(vec![OperateFunctionArg::with_name( "a", @@ -874,6 +880,32 @@ fn parse_create_function() { with_options: Default::default(), } ); + + let sql = + "CREATE FUNCTION IF NOT EXISTS add(INT, INT) RETURNS INT LANGUAGE SQL IMMUTABLE AS 'select $1 + $2;'"; + assert_eq!( + verified_stmt(sql), + Statement::CreateFunction { + or_replace: false, + temporary: false, + if_not_exists: true, + name: ObjectName(vec![Ident::new_unchecked("add")]), + args: Some(vec![ + OperateFunctionArg::unnamed(DataType::Int), + OperateFunctionArg::unnamed(DataType::Int), + ]), + returns: Some(CreateFunctionReturns::Value(DataType::Int)), + params: CreateFunctionBody { + language: Some("SQL".into()), + behavior: Some(FunctionBehavior::Immutable), + as_: Some(FunctionDefinition::SingleQuotedDef( + "select $1 + $2;".into() + )), + ..Default::default() + }, + with_options: Default::default(), + } + ); } #[test] @@ -884,6 +916,27 @@ fn parse_create_aggregate() { verified_stmt(sql), Statement::CreateAggregate { or_replace: true, + if_not_exists: false, + name: ObjectName(vec![Ident::new_unchecked("sum")]), + args: vec![OperateFunctionArg::unnamed(DataType::Int)], + returns: DataType::BigInt, + append_only: true, + params: CreateFunctionBody { + language: Some("python".into()), + as_: Some(FunctionDefinition::SingleQuotedDef("sum".into())), + using: Some(CreateFunctionUsing::Link("xxx".into())), + ..Default::default() + }, + } + ); + + let sql = + "CREATE AGGREGATE IF NOT EXISTS sum(INT) RETURNS BIGINT APPEND ONLY LANGUAGE python AS 'sum' USING LINK 'xxx'"; + assert_eq!( + verified_stmt(sql), + Statement::CreateAggregate { + or_replace: false, + if_not_exists: true, name: ObjectName(vec![Ident::new_unchecked("sum")]), args: vec![OperateFunctionArg::unnamed(DataType::Int)], returns: DataType::BigInt, diff --git a/src/storage/hummock_sdk/src/change_log.rs b/src/storage/hummock_sdk/src/change_log.rs index 5a7bf0143c764..5a5c4a647ecbe 100644 --- a/src/storage/hummock_sdk/src/change_log.rs +++ b/src/storage/hummock_sdk/src/change_log.rs @@ -190,11 +190,11 @@ pub fn build_table_change_log_delta<'a>( TableId::new(table_id), ChangeLogDelta { truncate_epoch, - new_log: Some(EpochNewChangeLog { + new_log: EpochNewChangeLog { new_value: vec![], old_value: vec![], epochs: epochs.clone(), - }), + }, }, ) }) @@ -203,7 +203,7 @@ pub fn build_table_change_log_delta<'a>( for table_id in &sst.table_ids { match table_change_log.get_mut(&TableId::new(*table_id)) { Some(log) => { - log.new_log.as_mut().unwrap().old_value.push(sst.clone()); + log.new_log.old_value.push(sst.clone()); } None => { warn!(table_id, ?sst, "old value sst contains non-log-store table"); @@ -214,7 +214,7 @@ pub fn build_table_change_log_delta<'a>( for sst in new_value_ssts { for table_id in &sst.table_ids { if let Some(log) = table_change_log.get_mut(&TableId::new(*table_id)) { - log.new_log.as_mut().unwrap().new_value.push(sst.clone()); + log.new_log.new_value.push(sst.clone()); } } } @@ -224,7 +224,7 @@ pub fn build_table_change_log_delta<'a>( #[derive(Debug, PartialEq, Clone)] pub struct ChangeLogDeltaCommon { pub truncate_epoch: u64, - pub new_log: Option>, + pub new_log: EpochNewChangeLogCommon, } pub type ChangeLogDelta = ChangeLogDeltaCommon; @@ -236,7 +236,7 @@ where fn from(val: &ChangeLogDeltaCommon) -> Self { Self { truncate_epoch: val.truncate_epoch, - new_log: val.new_log.as_ref().map(|a| a.into()), + new_log: Some((&val.new_log).into()), } } } @@ -248,7 +248,7 @@ where fn from(val: &PbChangeLogDelta) -> Self { Self { truncate_epoch: val.truncate_epoch, - new_log: val.new_log.as_ref().map(|a| a.into()), + new_log: val.new_log.as_ref().unwrap().into(), } } } @@ -260,7 +260,7 @@ where fn from(val: ChangeLogDeltaCommon) -> Self { Self { truncate_epoch: val.truncate_epoch, - new_log: val.new_log.map(|a| a.into()), + new_log: Some(val.new_log.into()), } } } @@ -272,7 +272,7 @@ where fn from(val: PbChangeLogDelta) -> Self { Self { truncate_epoch: val.truncate_epoch, - new_log: val.new_log.map(|a| a.into()), + new_log: val.new_log.unwrap().into(), } } } diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index ee316f75ffd65..eb4bb30e69dc3 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -693,7 +693,7 @@ impl HummockVersion { changed_table_info: &HashMap>, ) { for (table_id, change_log_delta) in change_log_delta { - let new_change_log = change_log_delta.new_log.as_ref().unwrap(); + let new_change_log = &change_log_delta.new_log; match table_change_log.entry(*table_id) { Entry::Occupied(entry) => { let change_log = entry.into_mut(); diff --git a/src/storage/hummock_sdk/src/frontend_version.rs b/src/storage/hummock_sdk/src/frontend_version.rs index 4840b402a292b..11eb045efbe22 100644 --- a/src/storage/hummock_sdk/src/frontend_version.rs +++ b/src/storage/hummock_sdk/src/frontend_version.rs @@ -151,14 +151,12 @@ impl FrontendHummockVersionDelta { *table_id, ChangeLogDeltaCommon { truncate_epoch: change_log_delta.truncate_epoch, - new_log: change_log_delta.new_log.as_ref().map(|new_log| { - EpochNewChangeLogCommon { - // Here we need to determine if value is null but don't care what the value is, so we fill him in using `()` - new_value: vec![(); new_log.new_value.len()], - old_value: vec![(); new_log.old_value.len()], - epochs: new_log.epochs.clone(), - } - }), + new_log: EpochNewChangeLogCommon { + // Here we need to determine if value is null but don't care what the value is, so we fill him in using `()` + new_value: vec![(); change_log_delta.new_log.new_value.len()], + old_value: vec![(); change_log_delta.new_log.old_value.len()], + epochs: change_log_delta.new_log.epochs.clone(), + }, }, ) }) @@ -187,11 +185,17 @@ impl FrontendHummockVersionDelta { ( table_id.table_id, PbChangeLogDelta { - new_log: delta.new_log.as_ref().map(|new_log| PbEpochNewChangeLog { + new_log: Some(PbEpochNewChangeLog { // Here we need to determine if value is null but don't care what the value is, so we fill him in using `PbSstableInfo::default()` - old_value: vec![PbSstableInfo::default(); new_log.old_value.len()], - new_value: vec![PbSstableInfo::default(); new_log.new_value.len()], - epochs: new_log.epochs.clone(), + old_value: vec![ + PbSstableInfo::default(); + delta.new_log.old_value.len() + ], + new_value: vec![ + PbSstableInfo::default(); + delta.new_log.new_value.len() + ], + epochs: delta.new_log.epochs.clone(), }), truncate_epoch: delta.truncate_epoch, }, @@ -228,14 +232,18 @@ impl FrontendHummockVersionDelta { TableId::new(*table_id), ChangeLogDeltaCommon { truncate_epoch: change_log_delta.truncate_epoch, - new_log: change_log_delta.new_log.as_ref().map(|new_log| { - EpochNewChangeLogCommon { - // Here we need to determine if value is null but don't care what the value is, so we fill him in using `()` - new_value: vec![(); new_log.new_value.len()], - old_value: vec![(); new_log.old_value.len()], - epochs: new_log.epochs.clone(), - } - }), + new_log: change_log_delta + .new_log + .as_ref() + .map(|new_log| { + EpochNewChangeLogCommon { + // Here we need to determine if value is null but don't care what the value is, so we fill him in using `()` + new_value: vec![(); new_log.new_value.len()], + old_value: vec![(); new_log.old_value.len()], + epochs: new_log.epochs.clone(), + } + }) + .unwrap(), }, ) }) diff --git a/src/storage/hummock_sdk/src/time_travel.rs b/src/storage/hummock_sdk/src/time_travel.rs index 0a651260b3789..9f628808909ff 100644 --- a/src/storage/hummock_sdk/src/time_travel.rs +++ b/src/storage/hummock_sdk/src/time_travel.rs @@ -191,15 +191,15 @@ impl From<(&HummockVersionDelta, &HashSet)> for IncompleteHummockV } debug_assert!(log_delta .new_log - .as_ref() - .map(|d| { - d.new_value.iter().chain(d.old_value.iter()).all(|s| { - s.table_ids - .iter() - .any(|tid| time_travel_table_ids.contains(tid)) - }) - }) - .unwrap_or(true)); + .new_value + .iter() + .chain(log_delta.new_log.old_value.iter()) + .all(|s| { + s.table_ids + .iter() + .any(|tid| time_travel_table_ids.contains(tid)) + })); + Some((*table_id, PbChangeLogDelta::from(log_delta).into())) }) .collect(), diff --git a/src/storage/hummock_sdk/src/version.rs b/src/storage/hummock_sdk/src/version.rs index 09e96860cc839..4c90e6cae47f1 100644 --- a/src/storage/hummock_sdk/src/version.rs +++ b/src/storage/hummock_sdk/src/version.rs @@ -570,7 +570,7 @@ where }) .chain(self.change_log_delta.values().flat_map(|delta| { // TODO: optimization: strip table change log - let new_log = delta.new_log.as_ref().unwrap(); + let new_log = &delta.new_log; new_log.new_value.iter().chain(new_log.old_value.iter()) })) } @@ -623,8 +623,8 @@ where ( TableId::new(*table_id), ChangeLogDeltaCommon { - new_log: log_delta.new_log.as_ref().map(Into::into), truncate_epoch: log_delta.truncate_epoch, + new_log: log_delta.new_log.as_ref().unwrap().into(), }, ) }) @@ -752,7 +752,7 @@ where ( TableId::new(*table_id), ChangeLogDeltaCommon { - new_log: log_delta.new_log.clone().map(Into::into), + new_log: log_delta.new_log.clone().unwrap().into(), truncate_epoch: log_delta.truncate_epoch, }, ) diff --git a/src/tests/simulation/src/cluster.rs b/src/tests/simulation/src/cluster.rs index 21f113b7b9858..a3fd28bb54293 100644 --- a/src/tests/simulation/src/cluster.rs +++ b/src/tests/simulation/src/cluster.rs @@ -14,6 +14,7 @@ #![cfg_attr(not(madsim), allow(unused_imports))] +use std::cmp::max; use std::collections::HashMap; use std::future::Future; use std::io::Write; @@ -206,6 +207,37 @@ metrics_level = "Disabled" } } + pub fn for_default_parallelism(default_parallelism: usize) -> Self { + let config_path = { + let mut file = + tempfile::NamedTempFile::new().expect("failed to create temp config file"); + + let config_data = format!( + r#" +[server] +telemetry_enabled = false +metrics_level = "Disabled" +[meta] +default_parallelism = {default_parallelism} +"# + ) + .to_owned(); + file.write_all(config_data.as_bytes()) + .expect("failed to write config file"); + file.into_temp_path() + }; + + Configuration { + config_path: ConfigPath::Temp(config_path.into()), + frontend_nodes: 1, + compute_nodes: 1, + meta_nodes: 1, + compactor_nodes: 1, + compute_node_cores: default_parallelism * 2, + per_session_queries: vec![].into(), + } + } + /// Returns the config for backfill test. pub fn for_backfill() -> Self { // Embed the config file and create a temporary file at runtime. The file will be deleted diff --git a/src/tests/simulation/tests/integration_tests/default_parallelism.rs b/src/tests/simulation/tests/integration_tests/default_parallelism.rs new file mode 100644 index 0000000000000..2cb79d5c3a8cf --- /dev/null +++ b/src/tests/simulation/tests/integration_tests/default_parallelism.rs @@ -0,0 +1,56 @@ +// Copyright 2025 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use anyhow::Result; +use risingwave_simulation::cluster::{Cluster, Configuration}; +use risingwave_simulation::utils::AssertResult; + +#[tokio::test] +async fn test_no_default_parallelism() -> Result<()> { + let mut cluster = Cluster::start(Configuration::default()).await?; + + let mut session = cluster.start_session(); + + session.run("create table t(v int);").await?; + session + .run("select parallelism from rw_streaming_parallelism where name = 't';") + .await? + .assert_result_eq("ADAPTIVE"); + + Ok(()) +} + +#[tokio::test] +async fn test_default_parallelism() -> Result<()> { + let mut cluster = Cluster::start(Configuration::for_default_parallelism(2)).await?; + + let mut session = cluster.start_session(); + + session.run("create table t(v int);").await?; + session + .run("select parallelism from rw_streaming_parallelism where name = 't';") + .await? + .assert_result_eq("FIXED(2)"); + + session + .run("alter table t set parallelism = adaptive;") + .await?; + + session + .run("select parallelism from rw_streaming_parallelism where name = 't';") + .await? + .assert_result_eq("ADAPTIVE"); + + Ok(()) +} diff --git a/src/tests/simulation/tests/integration_tests/main.rs b/src/tests/simulation/tests/integration_tests/main.rs index 8244a0bb48f27..ad8e854a30e5f 100644 --- a/src/tests/simulation/tests/integration_tests/main.rs +++ b/src/tests/simulation/tests/integration_tests/main.rs @@ -29,4 +29,5 @@ mod storage; mod throttle; mod compaction; +mod default_parallelism; mod utils;