diff --git a/e2e_test/sink/elasticsearch/elasticsearch_sink.slt b/e2e_test/sink/elasticsearch/elasticsearch_sink.slt index 842e3f3303438..953bc694d744a 100644 --- a/e2e_test/sink/elasticsearch/elasticsearch_sink.slt +++ b/e2e_test/sink/elasticsearch/elasticsearch_sink.slt @@ -22,6 +22,7 @@ CREATE TABLE test_route ( statement ok CREATE SINK test_route_sink from test_route WITH ( + type = 'upsert', connector = 'elasticsearch', index = 'test_route', url = 'http://elasticsearch:9200', @@ -32,6 +33,7 @@ CREATE SINK test_route_sink from test_route WITH ( statement ok CREATE SINK s7 from t7 WITH ( + type = 'upsert', connector = 'elasticsearch', index = 'test', url = 'http://elasticsearch:9200', @@ -41,6 +43,7 @@ CREATE SINK s7 from t7 WITH ( statement ok CREATE SINK s8 from t7 WITH ( + type = 'upsert', connector = 'elasticsearch', index = 'test1', primary_key = 'v1,v3', diff --git a/e2e_test/udf/create_and_drop.slt b/e2e_test/udf/create_and_drop.slt new file mode 100644 index 0000000000000..7b31dba16fdbd --- /dev/null +++ b/e2e_test/udf/create_and_drop.slt @@ -0,0 +1,115 @@ +# https://github.com/risingwavelabs/risingwave/issues/17263 + +statement ok +create table t (a int, b int); + +statement ok +create function add(a int, b int) returns int language python as $$ +def add(a, b): + return a+b +$$; + +statement error function with name add\(integer,integer\) exists +create function add(int, int) returns int language sql as $$select $1 + $2$$; + +statement ok +create function if not exists add(int, int) returns int language sql as $$select $1 + $2$$; + +statement ok +create function add_v2(int, int) returns int language sql as $$select $1 + $2$$; + +statement ok +create aggregate mysum(value int) returns int language python as $$ +def create_state(): + return 0 +def accumulate(state, value): + return state + value +def finish(state): + return state +$$; + +statement error function with name mysum\(integer\) exists +create aggregate mysum(value int) returns int language python as $$ +def create_state(): + return 0 +def accumulate(state, value): + return state + value +def finish(state): + return state +$$; + +statement ok +create aggregate if not exists mysum(value int) returns int language python as $$ +def create_state(): + return 0 +def accumulate(state, value): + return state + value +def finish(state): + return state +$$; + +statement ok +create materialized view mv as select add(a, b) + add_v2(a, b) as c from t; + +statement ok +create materialized view mv2 as select mysum(a) as s from t; + +statement error function used by 1 other objects +drop function add; + +statement error function used by 1 other objects +drop function if exists add; + +statement error function used by 1 other objects +drop function add_v2; + +statement error function used by 1 other objects +drop aggregate mysum; + +statement ok +drop materialized view mv; + +statement ok +drop materialized view mv2; + +statement ok +drop function add; + +statement error function not found +drop function add; + +statement ok +drop function if exists add; + +statement ok +drop function add_v2; + +statement ok +drop function if exists add_v2; + +statement ok +drop aggregate mysum; + +statement ok +drop aggregate if exists mysum; + +statement ok +create function add(a int, b int) returns int language python as $$ +def add(a, b): + return a+b +$$; + +statement ok +create sink s as select add(a, b) as c from t with (connector = 'blackhole'); + +statement error function used by 1 other objects +drop function add; + +statement ok +drop sink s; + +statement ok +drop function add; + +statement ok +drop table t; diff --git a/e2e_test/udf/drop_function.slt b/e2e_test/udf/drop_function.slt deleted file mode 100644 index ffe4e0eea481f..0000000000000 --- a/e2e_test/udf/drop_function.slt +++ /dev/null @@ -1,52 +0,0 @@ -# https://github.com/risingwavelabs/risingwave/issues/17263 - -statement ok -create table t (a int, b int); - -statement ok -create function add(a int, b int) returns int language python as $$ -def add(a, b): - return a+b -$$; - -statement ok -create function add_v2(INT, INT) returns int language sql as $$select $1 + $2$$; - -statement ok -create materialized view mv as select add(a, b) + add_v2(a, b) as c from t; - -statement error function used by 1 other objects -drop function add; - -statement error function used by 1 other objects -drop function add_v2; - -statement ok -drop materialized view mv; - -statement ok -drop function add; - -statement ok -drop function add_v2; - -statement ok -create function add(a int, b int) returns int language python as $$ -def add(a, b): - return a+b -$$; - -statement ok -create sink s as select add(a, b) as c from t with (connector = 'blackhole'); - -statement error function used by 1 other objects -drop function add; - -statement ok -drop sink s; - -statement ok -drop function add; - -statement ok -drop table t; diff --git a/integration_tests/feature-store/simulator/Cargo.lock b/integration_tests/feature-store/simulator/Cargo.lock index 79c7d3cad88b0..1049a14ba248f 100644 --- a/integration_tests/feature-store/simulator/Cargo.lock +++ b/integration_tests/feature-store/simulator/Cargo.lock @@ -19,15 +19,16 @@ checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" [[package]] name = "anstream" -version = "0.6.4" +version = "0.6.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2ab91ebe16eb252986481c5b62f6098f3b698a45e34b5b98200cf20dd2484a44" +checksum = "8acc5369981196006228e28809f761875c0327210a891e941f4c683b3a99529b" dependencies = [ "anstyle", "anstyle-parse", "anstyle-query", "anstyle-wincon", "colorchoice", + "is_terminal_polyfill", "utf8parse", ] @@ -52,17 +53,17 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5ca11d4be1bab0c8bc8734a9aa7bf4ee8316d462a08c6ac5052f888fef5b494b" dependencies = [ - "windows-sys", + "windows-sys 0.48.0", ] [[package]] name = "anstyle-wincon" -version = "3.0.1" +version = "3.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f0699d10d2f4d628a98ee7b57b289abbc98ff3bad977cb3152709d4bf2330628" +checksum = "2109dbce0e72be3ec00bed26e6a7479ca384ad226efdd66db8fa2e3a38c83125" dependencies = [ "anstyle", - "windows-sys", + "windows-sys 0.59.0", ] [[package]] @@ -508,6 +509,12 @@ dependencies = [ "hashbrown 0.14.3", ] +[[package]] +name = "is_terminal_polyfill" +version = "1.70.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf" + [[package]] name = "itertools" version = "0.11.0" @@ -564,7 +571,7 @@ checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c" dependencies = [ "libc", "wasi", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -797,7 +804,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4031e820eb552adee9295814c0ced9e5cf38ddf1e8b7d566d6de8e2538ea989e" dependencies = [ "libc", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -837,7 +844,7 @@ dependencies = [ "pin-project-lite", "socket2 0.5.4", "tokio-macros", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -1037,7 +1044,16 @@ version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" dependencies = [ - "windows-targets", + "windows-targets 0.48.5", +] + +[[package]] +name = "windows-sys" +version = "0.59.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b" +dependencies = [ + "windows-targets 0.52.6", ] [[package]] @@ -1046,13 +1062,29 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c" dependencies = [ - "windows_aarch64_gnullvm", - "windows_aarch64_msvc", - "windows_i686_gnu", - "windows_i686_msvc", - "windows_x86_64_gnu", - "windows_x86_64_gnullvm", - "windows_x86_64_msvc", + "windows_aarch64_gnullvm 0.48.5", + "windows_aarch64_msvc 0.48.5", + "windows_i686_gnu 0.48.5", + "windows_i686_msvc 0.48.5", + "windows_x86_64_gnu 0.48.5", + "windows_x86_64_gnullvm 0.48.5", + "windows_x86_64_msvc 0.48.5", +] + +[[package]] +name = "windows-targets" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973" +dependencies = [ + "windows_aarch64_gnullvm 0.52.6", + "windows_aarch64_msvc 0.52.6", + "windows_i686_gnu 0.52.6", + "windows_i686_gnullvm", + "windows_i686_msvc 0.52.6", + "windows_x86_64_gnu 0.52.6", + "windows_x86_64_gnullvm 0.52.6", + "windows_x86_64_msvc 0.52.6", ] [[package]] @@ -1061,38 +1093,86 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" + [[package]] name = "windows_aarch64_msvc" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" +[[package]] +name = "windows_aarch64_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" + [[package]] name = "windows_i686_gnu" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" +[[package]] +name = "windows_i686_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" + +[[package]] +name = "windows_i686_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" + [[package]] name = "windows_i686_msvc" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" +[[package]] +name = "windows_i686_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" + [[package]] name = "windows_x86_64_gnu" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" +[[package]] +name = "windows_x86_64_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" + [[package]] name = "windows_x86_64_gnullvm" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" + [[package]] name = "windows_x86_64_msvc" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" diff --git a/src/connector/src/sink/elasticsearch_opensearch/elasticsearch.rs b/src/connector/src/sink/elasticsearch_opensearch/elasticsearch.rs index 1f0ac754fcad2..fd4cbed31698d 100644 --- a/src/connector/src/sink/elasticsearch_opensearch/elasticsearch.rs +++ b/src/connector/src/sink/elasticsearch_opensearch/elasticsearch.rs @@ -28,6 +28,7 @@ pub struct ElasticSearchSink { config: ElasticSearchOpenSearchConfig, schema: Schema, pk_indices: Vec, + is_append_only: bool, } #[async_trait] @@ -41,6 +42,7 @@ impl TryFrom for ElasticSearchSink { config, schema, pk_indices: param.downstream_pk, + is_append_only: param.sink_type.is_append_only(), }) } } @@ -64,6 +66,7 @@ impl Sink for ElasticSearchSink { self.schema.clone(), self.pk_indices.clone(), Self::SINK_NAME, + self.is_append_only, )? .into_log_sinker(self.config.concurrent_requests)) } diff --git a/src/connector/src/sink/elasticsearch_opensearch/elasticsearch_converter.rs b/src/connector/src/sink/elasticsearch_opensearch/elasticsearch_converter.rs index dc19c99261aa7..ba2571ff2a130 100644 --- a/src/connector/src/sink/elasticsearch_opensearch/elasticsearch_converter.rs +++ b/src/connector/src/sink/elasticsearch_opensearch/elasticsearch_converter.rs @@ -39,6 +39,7 @@ impl StreamChunkConverter { schema: Schema, pk_indices: &Vec, properties: &BTreeMap, + is_append_only: bool, ) -> Result { if is_remote_es_sink(sink_name) { let index_column = properties @@ -71,6 +72,7 @@ impl StreamChunkConverter { index_column, index, routing_column, + is_append_only, )?)) } else { Ok(StreamChunkConverter::Other) @@ -79,13 +81,14 @@ impl StreamChunkConverter { pub fn convert_chunk(&self, chunk: StreamChunk) -> Result { match self { - StreamChunkConverter::Es(es) => es.convert_chunk(chunk), + StreamChunkConverter::Es(es) => es.convert_chunk(chunk, es.is_append_only), StreamChunkConverter::Other => Ok(chunk), } } } pub struct EsStreamChunkConverter { formatter: ElasticSearchOpenSearchFormatter, + is_append_only: bool, } impl EsStreamChunkConverter { pub fn new( @@ -95,6 +98,7 @@ impl EsStreamChunkConverter { index_column: Option, index: Option, routing_column: Option, + is_append_only: bool, ) -> Result { let formatter = ElasticSearchOpenSearchFormatter::new( pk_indices, @@ -104,10 +108,13 @@ impl EsStreamChunkConverter { index, routing_column, )?; - Ok(Self { formatter }) + Ok(Self { + formatter, + is_append_only, + }) } - fn convert_chunk(&self, chunk: StreamChunk) -> Result { + fn convert_chunk(&self, chunk: StreamChunk, is_append_only: bool) -> Result { let mut ops = Vec::with_capacity(chunk.capacity()); let mut id_string_builder = ::new(chunk.capacity()); @@ -117,7 +124,7 @@ impl EsStreamChunkConverter { ::new(chunk.capacity()); let mut routing_builder = ::new(chunk.capacity()); - for build_bulk_para in self.formatter.convert_chunk(chunk)? { + for build_bulk_para in self.formatter.convert_chunk(chunk, is_append_only)? { let BuildBulkPara { key, value, diff --git a/src/connector/src/sink/elasticsearch_opensearch/elasticsearch_opensearch_client.rs b/src/connector/src/sink/elasticsearch_opensearch/elasticsearch_opensearch_client.rs index 5369fcdee6aab..eb02d1310b650 100644 --- a/src/connector/src/sink/elasticsearch_opensearch/elasticsearch_opensearch_client.rs +++ b/src/connector/src/sink/elasticsearch_opensearch/elasticsearch_opensearch_client.rs @@ -160,6 +160,7 @@ pub struct ElasticSearchOpenSearchSinkWriter { client: Arc, formatter: ElasticSearchOpenSearchFormatter, config: ElasticSearchOpenSearchConfig, + is_append_only: bool, } impl ElasticSearchOpenSearchSinkWriter { @@ -168,6 +169,7 @@ impl ElasticSearchOpenSearchSinkWriter { schema: Schema, pk_indices: Vec, connector: &str, + is_append_only: bool, ) -> Result { let client = Arc::new(config.build_client(connector)?); let formatter = ElasticSearchOpenSearchFormatter::new( @@ -182,6 +184,7 @@ impl ElasticSearchOpenSearchSinkWriter { client, formatter, config, + is_append_only, }) } } @@ -202,7 +205,7 @@ impl AsyncTruncateSinkWriter for ElasticSearchOpenSearchSinkWriter { let mut bulks: Vec = Vec::with_capacity(chunk_capacity); let mut bulks_size = 0; - for build_bulk_para in self.formatter.convert_chunk(chunk)? { + for build_bulk_para in self.formatter.convert_chunk(chunk, self.is_append_only)? { let BuildBulkPara { key, value, diff --git a/src/connector/src/sink/elasticsearch_opensearch/elasticsearch_opensearch_config.rs b/src/connector/src/sink/elasticsearch_opensearch/elasticsearch_opensearch_config.rs index e2907301d78a9..24f79dd84030b 100644 --- a/src/connector/src/sink/elasticsearch_opensearch/elasticsearch_opensearch_config.rs +++ b/src/connector/src/sink/elasticsearch_opensearch/elasticsearch_opensearch_config.rs @@ -77,6 +77,8 @@ pub struct ElasticSearchOpenSearchConfig { #[serde_as(as = "DisplayFromStr")] #[serde(default = "default_concurrent_requests")] pub concurrent_requests: usize, + + pub r#type: String, } fn default_retry_on_conflict() -> i32 { diff --git a/src/connector/src/sink/elasticsearch_opensearch/elasticsearch_opensearch_formatter.rs b/src/connector/src/sink/elasticsearch_opensearch/elasticsearch_opensearch_formatter.rs index 660e26d578834..3cfda0601b617 100644 --- a/src/connector/src/sink/elasticsearch_opensearch/elasticsearch_opensearch_formatter.rs +++ b/src/connector/src/sink/elasticsearch_opensearch/elasticsearch_opensearch_formatter.rs @@ -113,7 +113,11 @@ impl ElasticSearchOpenSearchFormatter { }) } - pub fn convert_chunk(&self, chunk: StreamChunk) -> Result> { + pub fn convert_chunk( + &self, + chunk: StreamChunk, + is_append_only: bool, + ) -> Result> { let mut result_vec = Vec::with_capacity(chunk.capacity()); for (op, rows) in chunk.rows() { let index = if let Some(index_column) = self.index_column { @@ -157,6 +161,11 @@ impl ElasticSearchOpenSearchFormatter { }); } Op::Delete => { + if is_append_only { + return Err(SinkError::ElasticSearchOpenSearch(anyhow!( + "`Delete` operation is not supported in `append_only` mode" + ))); + } let key = self.key_encoder.encode(rows)?; let mem_size_b = std::mem::size_of_val(&key); result_vec.push(BuildBulkPara { @@ -167,7 +176,15 @@ impl ElasticSearchOpenSearchFormatter { routing_column, }); } - Op::UpdateDelete => continue, + Op::UpdateDelete => { + if is_append_only { + return Err(SinkError::ElasticSearchOpenSearch(anyhow!( + "`UpdateDelete` operation is not supported in `append_only` mode" + ))); + } else { + continue; + } + } } } Ok(result_vec) diff --git a/src/connector/src/sink/elasticsearch_opensearch/opensearch.rs b/src/connector/src/sink/elasticsearch_opensearch/opensearch.rs index b0df3c8ca2345..bf9b79f6a8850 100644 --- a/src/connector/src/sink/elasticsearch_opensearch/opensearch.rs +++ b/src/connector/src/sink/elasticsearch_opensearch/opensearch.rs @@ -30,6 +30,7 @@ pub struct OpenSearchSink { config: ElasticSearchOpenSearchConfig, schema: Schema, pk_indices: Vec, + is_append_only: bool, } #[async_trait] @@ -43,6 +44,7 @@ impl TryFrom for OpenSearchSink { config, schema, pk_indices: param.downstream_pk, + is_append_only: param.sink_type.is_append_only(), }) } } @@ -69,6 +71,7 @@ impl Sink for OpenSearchSink { self.schema.clone(), self.pk_indices.clone(), Self::SINK_NAME, + self.is_append_only, )? .into_log_sinker(self.config.concurrent_requests)) } diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index 26683966f566b..582a5c861bb5c 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -301,6 +301,7 @@ impl RemoteLogSinker { sink_param.schema(), &sink_param.downstream_pk, &sink_param.properties, + sink_param.sink_type.is_append_only(), )?, }) } diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index d7291c8ecc61d..819e597fec448 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -331,6 +331,9 @@ ElasticSearchOpenSearchConfig: - name: concurrent_requests field_type: usize required: true + - name: r#type + field_type: String + required: true FsConfig: fields: - name: fs.path diff --git a/src/frontend/src/handler/create_aggregate.rs b/src/frontend/src/handler/create_aggregate.rs index 32f326db9b1d9..85ba343fef408 100644 --- a/src/frontend/src/handler/create_aggregate.rs +++ b/src/frontend/src/handler/create_aggregate.rs @@ -13,6 +13,7 @@ // limitations under the License. use anyhow::Context; +use either::Either; use risingwave_common::catalog::FunctionId; use risingwave_expr::sig::{CreateFunctionOptions, UdfKind}; use risingwave_pb::catalog::function::{AggregateFunction, Kind}; @@ -20,12 +21,12 @@ use risingwave_pb::catalog::Function; use risingwave_sqlparser::ast::DataType as AstDataType; use super::*; -use crate::catalog::CatalogError; use crate::{bind_data_type, Binder}; pub async fn handle_create_aggregate( handler_args: HandlerArgs, or_replace: bool, + if_not_exists: bool, name: ObjectName, args: Vec, returns: AstDataType, @@ -74,20 +75,18 @@ pub async fn handle_create_aggregate( // resolve database and schema id let session = &handler_args.session; let db_name = &session.database(); - let (schema_name, function_name) = Binder::resolve_schema_qualified_name(db_name, name)?; + let (schema_name, function_name) = + Binder::resolve_schema_qualified_name(db_name, name.clone())?; let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name)?; // check if the function exists in the catalog - if (session.env().catalog_reader().read_guard()) - .get_schema_by_id(&database_id, &schema_id)? - .get_function_by_name_args(&function_name, &arg_types) - .is_some() - { - let name = format!( - "{function_name}({})", - arg_types.iter().map(|t| t.to_string()).join(",") - ); - return Err(CatalogError::Duplicated("function", name).into()); + if let Either::Right(resp) = session.check_function_name_duplicated( + StatementType::CREATE_FUNCTION, + name, + &arg_types, + if_not_exists, + )? { + return Ok(resp); } let link = match ¶ms.using { diff --git a/src/frontend/src/handler/create_function.rs b/src/frontend/src/handler/create_function.rs index b87d3c90a3488..c212eaebb56f4 100644 --- a/src/frontend/src/handler/create_function.rs +++ b/src/frontend/src/handler/create_function.rs @@ -13,6 +13,7 @@ // limitations under the License. use anyhow::Context; +use either::Either; use risingwave_common::catalog::FunctionId; use risingwave_common::types::StructType; use risingwave_expr::sig::{CreateFunctionOptions, UdfKind}; @@ -20,13 +21,13 @@ use risingwave_pb::catalog::function::{Kind, ScalarFunction, TableFunction}; use risingwave_pb::catalog::Function; use super::*; -use crate::catalog::CatalogError; use crate::{bind_data_type, Binder}; pub async fn handle_create_function( handler_args: HandlerArgs, or_replace: bool, temporary: bool, + if_not_exists: bool, name: ObjectName, args: Option>, returns: Option, @@ -107,20 +108,18 @@ pub async fn handle_create_function( // resolve database and schema id let session = &handler_args.session; let db_name = &session.database(); - let (schema_name, function_name) = Binder::resolve_schema_qualified_name(db_name, name)?; + let (schema_name, function_name) = + Binder::resolve_schema_qualified_name(db_name, name.clone())?; let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name)?; // check if the function exists in the catalog - if (session.env().catalog_reader().read_guard()) - .get_schema_by_id(&database_id, &schema_id)? - .get_function_by_name_args(&function_name, &arg_types) - .is_some() - { - let name = format!( - "{function_name}({})", - arg_types.iter().map(|t| t.to_string()).join(",") - ); - return Err(CatalogError::Duplicated("function", name).into()); + if let Either::Right(resp) = session.check_function_name_duplicated( + StatementType::CREATE_FUNCTION, + name, + &arg_types, + if_not_exists, + )? { + return Ok(resp); } let link = match ¶ms.using { diff --git a/src/frontend/src/handler/create_sql_function.rs b/src/frontend/src/handler/create_sql_function.rs index 4725b37ab6511..71a31ce5173cc 100644 --- a/src/frontend/src/handler/create_sql_function.rs +++ b/src/frontend/src/handler/create_sql_function.rs @@ -14,6 +14,7 @@ use std::collections::HashMap; +use either::Either; use fancy_regex::Regex; use risingwave_common::catalog::FunctionId; use risingwave_common::types::{DataType, StructType}; @@ -23,7 +24,6 @@ use risingwave_sqlparser::parser::{Parser, ParserError}; use super::*; use crate::binder::UdfContext; -use crate::catalog::CatalogError; use crate::expr::{Expr, ExprImpl, Literal}; use crate::{bind_data_type, Binder}; @@ -122,6 +122,7 @@ pub async fn handle_create_sql_function( handler_args: HandlerArgs, or_replace: bool, temporary: bool, + if_not_exists: bool, name: ObjectName, args: Option>, returns: Option, @@ -214,20 +215,18 @@ pub async fn handle_create_sql_function( // resolve database and schema id let session = &handler_args.session; let db_name = &session.database(); - let (schema_name, function_name) = Binder::resolve_schema_qualified_name(db_name, name)?; + let (schema_name, function_name) = + Binder::resolve_schema_qualified_name(db_name, name.clone())?; let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name)?; // check if function exists - if (session.env().catalog_reader().read_guard()) - .get_schema_by_id(&database_id, &schema_id)? - .get_function_by_name_args(&function_name, &arg_types) - .is_some() - { - let name = format!( - "{function_name}({})", - arg_types.iter().map(|t| t.to_string()).join(",") - ); - return Err(CatalogError::Duplicated("function", name).into()); + if let Either::Right(resp) = session.check_function_name_duplicated( + StatementType::CREATE_FUNCTION, + name, + &arg_types, + if_not_exists, + )? { + return Ok(resp); } // Parse function body here diff --git a/src/frontend/src/handler/explain.rs b/src/frontend/src/handler/explain.rs index e5d20ce289724..5f55f9aac378c 100644 --- a/src/frontend/src/handler/explain.rs +++ b/src/frontend/src/handler/explain.rs @@ -220,6 +220,7 @@ async fn do_handle_explain( worker_node_manager_reader, session.env().catalog_reader().clone(), session.config().batch_parallelism().0, + session.config().timezone().to_owned(), plan.clone(), )?); batch_plan_fragmenter_fmt = if explain_format == ExplainFormat::Dot { diff --git a/src/frontend/src/handler/mod.rs b/src/frontend/src/handler/mod.rs index bd559b9245fc7..4245e66c3034a 100644 --- a/src/frontend/src/handler/mod.rs +++ b/src/frontend/src/handler/mod.rs @@ -278,6 +278,7 @@ pub async fn handle( Statement::CreateFunction { or_replace, temporary, + if_not_exists, name, args, returns, @@ -298,6 +299,7 @@ pub async fn handle( handler_args, or_replace, temporary, + if_not_exists, name, args, returns, @@ -310,6 +312,7 @@ pub async fn handle( handler_args, or_replace, temporary, + if_not_exists, name, args, returns, @@ -320,6 +323,7 @@ pub async fn handle( } Statement::CreateAggregate { or_replace, + if_not_exists, name, args, returns, @@ -329,6 +333,7 @@ pub async fn handle( create_aggregate::handle_create_aggregate( handler_args, or_replace, + if_not_exists, name, args, returns, diff --git a/src/frontend/src/handler/query.rs b/src/frontend/src/handler/query.rs index 4f4b1f2187aa0..ec5a3ee393d40 100644 --- a/src/frontend/src/handler/query.rs +++ b/src/frontend/src/handler/query.rs @@ -373,6 +373,7 @@ pub fn gen_batch_plan_fragmenter( worker_node_manager_reader, session.env().catalog_reader().clone(), session.config().batch_parallelism().0, + session.config().timezone().to_owned(), plan, )?; diff --git a/src/frontend/src/scheduler/distributed/query.rs b/src/frontend/src/scheduler/distributed/query.rs index a843aa1b59fe4..cf6319c894a0e 100644 --- a/src/frontend/src/scheduler/distributed/query.rs +++ b/src/frontend/src/scheduler/distributed/query.rs @@ -734,6 +734,7 @@ pub(crate) mod tests { worker_node_selector, catalog_reader, None, + "UTC".to_owned(), batch_exchange_node.clone(), ) .unwrap(); diff --git a/src/frontend/src/scheduler/plan_fragmenter.rs b/src/frontend/src/scheduler/plan_fragmenter.rs index 76f5428348003..3949f1f2d9f5d 100644 --- a/src/frontend/src/scheduler/plan_fragmenter.rs +++ b/src/frontend/src/scheduler/plan_fragmenter.rs @@ -20,6 +20,7 @@ use std::sync::Arc; use anyhow::anyhow; use async_recursion::async_recursion; +use chrono::{MappedLocalTime, TimeZone}; use enum_as_inner::EnumAsInner; use futures::TryStreamExt; use iceberg::expr::Predicate as IcebergPredicate; @@ -33,6 +34,7 @@ use risingwave_common::bitmap::{Bitmap, BitmapBuilder}; use risingwave_common::catalog::{Schema, TableDesc}; use risingwave_common::hash::table_distribution::TableDistribution; use risingwave_common::hash::{WorkerSlotId, WorkerSlotMapping}; +use risingwave_common::types::Timestamptz; use risingwave_common::util::scan_range::ScanRange; use risingwave_connector::source::filesystem::opendal_source::opendal_enumerator::OpendalEnumerator; use risingwave_connector::source::filesystem::opendal_source::{ @@ -145,6 +147,7 @@ pub struct BatchPlanFragmenter { catalog_reader: CatalogReader, batch_parallelism: usize, + timezone: String, stage_graph_builder: Option, stage_graph: Option, @@ -163,6 +166,7 @@ impl BatchPlanFragmenter { worker_node_manager: WorkerNodeSelector, catalog_reader: CatalogReader, batch_parallelism: Option, + timezone: String, batch_node: PlanRef, ) -> SchedulerResult { // if batch_parallelism is None, it means no limit, we will use the available nodes count as @@ -186,6 +190,7 @@ impl BatchPlanFragmenter { worker_node_manager, catalog_reader, batch_parallelism, + timezone, stage_graph_builder: Some(StageGraphBuilder::new(batch_parallelism)), stage_graph: None, }; @@ -311,7 +316,11 @@ impl SourceScanInfo { Self::Incomplete(fetch_info) } - pub async fn complete(self, batch_parallelism: usize) -> SchedulerResult { + pub async fn complete( + self, + batch_parallelism: usize, + timezone: String, + ) -> SchedulerResult { let fetch_info = match self { SourceScanInfo::Incomplete(fetch_info) => fetch_info, SourceScanInfo::Complete(_) => { @@ -382,15 +391,36 @@ impl SourceScanInfo { Some(AsOf::VersionString(_)) => { bail!("Unsupported version string in iceberg time travel") } - Some(AsOf::TimestampString(ts)) => Some( - speedate::DateTime::parse_str_rfc3339(&ts) - .map(|t| { - IcebergTimeTravelInfo::TimestampMs( - t.timestamp_tz() * 1000 + t.time.microsecond as i64 / 1000, - ) - }) - .map_err(|_e| anyhow!("fail to parse timestamp"))?, - ), + Some(AsOf::TimestampString(ts)) => { + let date_time = speedate::DateTime::parse_str_rfc3339(&ts) + .map_err(|_e| anyhow!("fail to parse timestamp"))?; + let timestamp = if date_time.time.tz_offset.is_none() { + // If the input does not specify a time zone, use the time zone set by the "SET TIME ZONE" command. + let tz = + Timestamptz::lookup_time_zone(&timezone).map_err(|e| anyhow!(e))?; + match tz.with_ymd_and_hms( + date_time.date.year.into(), + date_time.date.month.into(), + date_time.date.day.into(), + date_time.time.hour.into(), + date_time.time.minute.into(), + date_time.time.second.into(), + ) { + MappedLocalTime::Single(d) => Ok(d.timestamp()), + MappedLocalTime::Ambiguous(_, _) | MappedLocalTime::None => { + Err(anyhow!(format!( + "failed to parse the timestamp {ts} with the specified time zone {tz}" + ))) + } + }? + } else { + date_time.timestamp_tz() + }; + + Some(IcebergTimeTravelInfo::TimestampMs( + timestamp * 1000 + date_time.time.microsecond as i64 / 1000, + )) + } Some(AsOf::ProcessTime) | Some(AsOf::ProcessTimeWithInterval(_)) => { unreachable!() } @@ -731,6 +761,7 @@ impl StageGraph { self, catalog_reader: &CatalogReader, worker_node_manager: &WorkerNodeSelector, + timezone: String, ) -> SchedulerResult { let mut complete_stages = HashMap::new(); self.complete_stage( @@ -739,6 +770,7 @@ impl StageGraph { &mut complete_stages, catalog_reader, worker_node_manager, + timezone, ) .await?; Ok(StageGraph { @@ -758,6 +790,7 @@ impl StageGraph { complete_stages: &mut HashMap, catalog_reader: &CatalogReader, worker_node_manager: &WorkerNodeSelector, + timezone: String, ) -> SchedulerResult<()> { let parallelism = if stage.parallelism.is_some() { // If the stage has parallelism, it means it's a complete stage. @@ -772,7 +805,7 @@ impl StageGraph { .as_ref() .unwrap() .clone() - .complete(self.batch_parallelism) + .complete(self.batch_parallelism, timezone.to_owned()) .await?; // For batch reading file source, the number of files involved is typically large. @@ -842,6 +875,7 @@ impl StageGraph { complete_stages, catalog_reader, worker_node_manager, + timezone.to_owned(), ) .await?; } @@ -935,7 +969,11 @@ impl BatchPlanFragmenter { pub async fn generate_complete_query(self) -> SchedulerResult { let stage_graph = self.stage_graph.unwrap(); let new_stage_graph = stage_graph - .complete(&self.catalog_reader, &self.worker_node_manager) + .complete( + &self.catalog_reader, + &self.worker_node_manager, + self.timezone.to_owned(), + ) .await?; Ok(Query { query_id: self.query_id, diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index fe03af7c0c786..7f99d91d6e0d5 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -22,6 +22,7 @@ use std::time::{Duration, Instant}; use anyhow::anyhow; use bytes::Bytes; use either::Either; +use itertools::Itertools; use parking_lot::{Mutex, RwLock, RwLockReadGuard}; use pgwire::error::{PsqlError, PsqlResult}; use pgwire::net::{Address, AddressRef}; @@ -961,6 +962,44 @@ impl SessionImpl { .map_err(RwError::from) } + pub fn check_function_name_duplicated( + &self, + stmt_type: StatementType, + name: ObjectName, + arg_types: &[DataType], + if_not_exists: bool, + ) -> Result> { + let db_name = &self.database(); + let (schema_name, function_name) = Binder::resolve_schema_qualified_name(db_name, name)?; + let (database_id, schema_id) = self.get_database_and_schema_id_for_create(schema_name)?; + + let catalog_reader = self.env().catalog_reader().read_guard(); + if catalog_reader + .get_schema_by_id(&database_id, &schema_id)? + .get_function_by_name_args(&function_name, arg_types) + .is_some() + { + let full_name = format!( + "{function_name}({})", + arg_types.iter().map(|t| t.to_string()).join(",") + ); + if if_not_exists { + Ok(Either::Right( + PgResponse::builder(stmt_type) + .notice(format!( + "function \"{}\" already exists, skipping", + full_name + )) + .into(), + )) + } else { + Err(CatalogError::Duplicated("function", full_name).into()) + } + } else { + Ok(Either::Left(())) + } + } + /// Also check if the user has the privilege to create in the schema. pub fn get_database_and_schema_id_for_create( &self, diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index f30e20b50f0b8..8676b2583227c 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -17,6 +17,7 @@ use std::num::NonZeroUsize; use anyhow::anyhow; use itertools::Itertools; +use risingwave_common::config::DefaultParallelism; use risingwave_common::hash::VnodeCountCompat; use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_common::util::stream_graph_visitor::visit_stream_node; @@ -119,9 +120,10 @@ impl CatalogController { let txn = inner.db.begin().await?; let create_type = streaming_job.create_type(); - let streaming_parallelism = match parallelism { - None => StreamingParallelism::Adaptive, - Some(n) => StreamingParallelism::Fixed(n.parallelism as _), + let streaming_parallelism = match (parallelism, self.env.opts.default_parallelism) { + (None, DefaultParallelism::Full) => StreamingParallelism::Adaptive, + (None, DefaultParallelism::Default(n)) => StreamingParallelism::Fixed(n.get()), + (Some(n), _) => StreamingParallelism::Fixed(n.parallelism as _), }; ensure_user_id(streaming_job.owner() as _, &txn).await?; diff --git a/src/meta/src/hummock/manager/gc.rs b/src/meta/src/hummock/manager/gc.rs index 7bd7034ae1f99..eba07b7002244 100644 --- a/src/meta/src/hummock/manager/gc.rs +++ b/src/meta/src/hummock/manager/gc.rs @@ -535,7 +535,7 @@ impl HummockManager { break; } let delete_batch: HashSet<_> = objects_to_delete.drain(..batch_size).collect(); - tracing::debug!(?delete_batch, "Attempt to delete objects."); + tracing::info!(?delete_batch, "Attempt to delete objects."); let deleted_object_ids = delete_batch.clone(); self.gc_manager .delete_objects(delete_batch.into_iter()) diff --git a/src/sqlparser/src/ast/mod.rs b/src/sqlparser/src/ast/mod.rs index 16496b71c97eb..98f8599ccf89a 100644 --- a/src/sqlparser/src/ast/mod.rs +++ b/src/sqlparser/src/ast/mod.rs @@ -1349,6 +1349,7 @@ pub enum Statement { CreateFunction { or_replace: bool, temporary: bool, + if_not_exists: bool, name: ObjectName, args: Option>, returns: Option, @@ -1361,6 +1362,7 @@ pub enum Statement { /// Postgres: CreateAggregate { or_replace: bool, + if_not_exists: bool, name: ObjectName, args: Vec, returns: DataType, @@ -1768,6 +1770,7 @@ impl fmt::Display for Statement { Statement::CreateFunction { or_replace, temporary, + if_not_exists, name, args, returns, @@ -1776,9 +1779,10 @@ impl fmt::Display for Statement { } => { write!( f, - "CREATE {or_replace}{temp}FUNCTION {name}", + "CREATE {or_replace}{temp}FUNCTION {if_not_exists}{name}", temp = if *temporary { "TEMPORARY " } else { "" }, or_replace = if *or_replace { "OR REPLACE " } else { "" }, + if_not_exists = if *if_not_exists { "IF NOT EXISTS " } else { "" }, )?; if let Some(args) = args { write!(f, "({})", display_comma_separated(args))?; @@ -1792,6 +1796,7 @@ impl fmt::Display for Statement { } Statement::CreateAggregate { or_replace, + if_not_exists, name, args, returns, @@ -1800,8 +1805,9 @@ impl fmt::Display for Statement { } => { write!( f, - "CREATE {or_replace}AGGREGATE {name}", + "CREATE {or_replace}AGGREGATE {if_not_exists}{name}", or_replace = if *or_replace { "OR REPLACE " } else { "" }, + if_not_exists = if *if_not_exists { "IF NOT EXISTS " } else { "" }, )?; write!(f, "({})", display_comma_separated(args))?; write!(f, " RETURNS {}", returns)?; @@ -3551,8 +3557,9 @@ mod tests { #[test] fn test_create_function_display() { let create_function = Statement::CreateFunction { - temporary: false, or_replace: false, + temporary: false, + if_not_exists: false, name: ObjectName(vec![Ident::new_unchecked("foo")]), args: Some(vec![OperateFunctionArg::unnamed(DataType::Int)]), returns: Some(CreateFunctionReturns::Value(DataType::Int)), @@ -3573,8 +3580,9 @@ mod tests { format!("{}", create_function) ); let create_function = Statement::CreateFunction { - temporary: false, or_replace: false, + temporary: false, + if_not_exists: false, name: ObjectName(vec![Ident::new_unchecked("foo")]), args: Some(vec![OperateFunctionArg::unnamed(DataType::Int)]), returns: Some(CreateFunctionReturns::Value(DataType::Int)), diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index 9eb3d9e439967..2df0183cf5c5a 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -2210,6 +2210,8 @@ impl Parser<'_> { or_replace: bool, temporary: bool, ) -> PResult { + impl_parse_to!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], self); + let name = self.parse_object_name()?; self.expect_token(&Token::LParen)?; let args = if self.peek_token().token == Token::RParen { @@ -2248,6 +2250,7 @@ impl Parser<'_> { Ok(Statement::CreateFunction { or_replace, temporary, + if_not_exists, name, args, returns: return_type, @@ -2257,6 +2260,8 @@ impl Parser<'_> { } fn parse_create_aggregate(&mut self, or_replace: bool) -> PResult { + impl_parse_to!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], self); + let name = self.parse_object_name()?; self.expect_token(&Token::LParen)?; let args = self.parse_comma_separated(Parser::parse_function_arg)?; @@ -2270,6 +2275,7 @@ impl Parser<'_> { Ok(Statement::CreateAggregate { or_replace, + if_not_exists, name, args, returns, diff --git a/src/sqlparser/tests/sqlparser_postgres.rs b/src/sqlparser/tests/sqlparser_postgres.rs index 7acf6d29b4444..1466a9024a6d5 100644 --- a/src/sqlparser/tests/sqlparser_postgres.rs +++ b/src/sqlparser/tests/sqlparser_postgres.rs @@ -753,6 +753,7 @@ fn parse_create_function() { Statement::CreateFunction { or_replace: false, temporary: false, + if_not_exists: false, name: ObjectName(vec![Ident::new_unchecked("add")]), args: Some(vec![ OperateFunctionArg::unnamed(DataType::Int), @@ -777,6 +778,7 @@ fn parse_create_function() { Statement::CreateFunction { or_replace: false, temporary: false, + if_not_exists: false, name: ObjectName(vec![Ident::new_unchecked("sub")]), args: Some(vec![ OperateFunctionArg::unnamed(DataType::Int), @@ -801,6 +803,7 @@ fn parse_create_function() { Statement::CreateFunction { or_replace: false, temporary: false, + if_not_exists: false, name: ObjectName(vec![Ident::new_unchecked("return_test")]), args: Some(vec![ OperateFunctionArg::unnamed(DataType::Int), @@ -826,6 +829,7 @@ fn parse_create_function() { Statement::CreateFunction { or_replace: true, temporary: false, + if_not_exists: false, name: ObjectName(vec![Ident::new_unchecked("add")]), args: Some(vec![ OperateFunctionArg::with_name("a", DataType::Int), @@ -851,12 +855,14 @@ fn parse_create_function() { } ); - let sql = "CREATE FUNCTION unnest(a INT[]) RETURNS TABLE (x INT) LANGUAGE SQL RETURN a"; + let sql = + "CREATE TEMPORARY FUNCTION unnest(a INT[]) RETURNS TABLE (x INT) LANGUAGE SQL RETURN a"; assert_eq!( verified_stmt(sql), Statement::CreateFunction { or_replace: false, - temporary: false, + temporary: true, + if_not_exists: false, name: ObjectName(vec![Ident::new_unchecked("unnest")]), args: Some(vec![OperateFunctionArg::with_name( "a", @@ -874,6 +880,32 @@ fn parse_create_function() { with_options: Default::default(), } ); + + let sql = + "CREATE FUNCTION IF NOT EXISTS add(INT, INT) RETURNS INT LANGUAGE SQL IMMUTABLE AS 'select $1 + $2;'"; + assert_eq!( + verified_stmt(sql), + Statement::CreateFunction { + or_replace: false, + temporary: false, + if_not_exists: true, + name: ObjectName(vec![Ident::new_unchecked("add")]), + args: Some(vec![ + OperateFunctionArg::unnamed(DataType::Int), + OperateFunctionArg::unnamed(DataType::Int), + ]), + returns: Some(CreateFunctionReturns::Value(DataType::Int)), + params: CreateFunctionBody { + language: Some("SQL".into()), + behavior: Some(FunctionBehavior::Immutable), + as_: Some(FunctionDefinition::SingleQuotedDef( + "select $1 + $2;".into() + )), + ..Default::default() + }, + with_options: Default::default(), + } + ); } #[test] @@ -884,6 +916,27 @@ fn parse_create_aggregate() { verified_stmt(sql), Statement::CreateAggregate { or_replace: true, + if_not_exists: false, + name: ObjectName(vec![Ident::new_unchecked("sum")]), + args: vec![OperateFunctionArg::unnamed(DataType::Int)], + returns: DataType::BigInt, + append_only: true, + params: CreateFunctionBody { + language: Some("python".into()), + as_: Some(FunctionDefinition::SingleQuotedDef("sum".into())), + using: Some(CreateFunctionUsing::Link("xxx".into())), + ..Default::default() + }, + } + ); + + let sql = + "CREATE AGGREGATE IF NOT EXISTS sum(INT) RETURNS BIGINT APPEND ONLY LANGUAGE python AS 'sum' USING LINK 'xxx'"; + assert_eq!( + verified_stmt(sql), + Statement::CreateAggregate { + or_replace: false, + if_not_exists: true, name: ObjectName(vec![Ident::new_unchecked("sum")]), args: vec![OperateFunctionArg::unnamed(DataType::Int)], returns: DataType::BigInt, diff --git a/src/tests/simulation/src/cluster.rs b/src/tests/simulation/src/cluster.rs index 21f113b7b9858..a3fd28bb54293 100644 --- a/src/tests/simulation/src/cluster.rs +++ b/src/tests/simulation/src/cluster.rs @@ -14,6 +14,7 @@ #![cfg_attr(not(madsim), allow(unused_imports))] +use std::cmp::max; use std::collections::HashMap; use std::future::Future; use std::io::Write; @@ -206,6 +207,37 @@ metrics_level = "Disabled" } } + pub fn for_default_parallelism(default_parallelism: usize) -> Self { + let config_path = { + let mut file = + tempfile::NamedTempFile::new().expect("failed to create temp config file"); + + let config_data = format!( + r#" +[server] +telemetry_enabled = false +metrics_level = "Disabled" +[meta] +default_parallelism = {default_parallelism} +"# + ) + .to_owned(); + file.write_all(config_data.as_bytes()) + .expect("failed to write config file"); + file.into_temp_path() + }; + + Configuration { + config_path: ConfigPath::Temp(config_path.into()), + frontend_nodes: 1, + compute_nodes: 1, + meta_nodes: 1, + compactor_nodes: 1, + compute_node_cores: default_parallelism * 2, + per_session_queries: vec![].into(), + } + } + /// Returns the config for backfill test. pub fn for_backfill() -> Self { // Embed the config file and create a temporary file at runtime. The file will be deleted diff --git a/src/tests/simulation/tests/integration_tests/default_parallelism.rs b/src/tests/simulation/tests/integration_tests/default_parallelism.rs new file mode 100644 index 0000000000000..2cb79d5c3a8cf --- /dev/null +++ b/src/tests/simulation/tests/integration_tests/default_parallelism.rs @@ -0,0 +1,56 @@ +// Copyright 2025 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use anyhow::Result; +use risingwave_simulation::cluster::{Cluster, Configuration}; +use risingwave_simulation::utils::AssertResult; + +#[tokio::test] +async fn test_no_default_parallelism() -> Result<()> { + let mut cluster = Cluster::start(Configuration::default()).await?; + + let mut session = cluster.start_session(); + + session.run("create table t(v int);").await?; + session + .run("select parallelism from rw_streaming_parallelism where name = 't';") + .await? + .assert_result_eq("ADAPTIVE"); + + Ok(()) +} + +#[tokio::test] +async fn test_default_parallelism() -> Result<()> { + let mut cluster = Cluster::start(Configuration::for_default_parallelism(2)).await?; + + let mut session = cluster.start_session(); + + session.run("create table t(v int);").await?; + session + .run("select parallelism from rw_streaming_parallelism where name = 't';") + .await? + .assert_result_eq("FIXED(2)"); + + session + .run("alter table t set parallelism = adaptive;") + .await?; + + session + .run("select parallelism from rw_streaming_parallelism where name = 't';") + .await? + .assert_result_eq("ADAPTIVE"); + + Ok(()) +} diff --git a/src/tests/simulation/tests/integration_tests/main.rs b/src/tests/simulation/tests/integration_tests/main.rs index 8244a0bb48f27..ad8e854a30e5f 100644 --- a/src/tests/simulation/tests/integration_tests/main.rs +++ b/src/tests/simulation/tests/integration_tests/main.rs @@ -29,4 +29,5 @@ mod storage; mod throttle; mod compaction; +mod default_parallelism; mod utils;