diff --git a/scaleph-common/src/main/java/cn/sliew/scaleph/common/dict/seatunnel/SeaTunnelPluginMapping.java b/scaleph-common/src/main/java/cn/sliew/scaleph/common/dict/seatunnel/SeaTunnelPluginMapping.java index f312a91a9..8c1e1dcd7 100644 --- a/scaleph-common/src/main/java/cn/sliew/scaleph/common/dict/seatunnel/SeaTunnelPluginMapping.java +++ b/scaleph-common/src/main/java/cn/sliew/scaleph/common/dict/seatunnel/SeaTunnelPluginMapping.java @@ -46,6 +46,7 @@ public enum SeaTunnelPluginMapping { SOURCE_HTTP(SEATUNNEL, SOURCE, HTTP, "connector-http-base", BETA, BATCH, STREAM, SCHEMA_PROJECTION), SINK_HTTP(SEATUNNEL, SINK, HTTP, "connector-http-base", BETA), SINK_FEISHU(SEATUNNEL, SINK, FEISHU, "connector-http-feishu", ALPHA), + @Deprecated SINK_WECHAT(SEATUNNEL, SINK, WECHAT, "connector-http-wechat", ALPHA), SINK_DINGTALK(SEATUNNEL, SINK, DINGTALK, "connector-dingtalk", ALPHA), SOURCE_MYHOURS(SEATUNNEL, SOURCE, MYHOURS, "connector-http-myhours", ALPHA, BATCH, SCHEMA_PROJECTION), @@ -86,6 +87,7 @@ public enum SeaTunnelPluginMapping { SINK_AMAZON_SQS(SEATUNNEL, SINK, AMAZON_SQS, "connector-amazonsqs", UNKNOWN), SOURCE_RABBITMQ(SEATUNNEL, SOURCE, RABBITMQ, "connector-rabbitmq", BETA, STREAM, EXACTLY_ONCE, SCHEMA_PROJECTION), SINK_RABBITMQ(SEATUNNEL, SINK, RABBITMQ, "connector-rabbitmq", BETA), + SINK_ACTIVEMQ(SEATUNNEL, SINK, ACTIVEMQ, "connector-activemq", BETA), SOURCE_JDBC(SEATUNNEL, SOURCE, JDBC, "connector-jdbc", GA, BATCH, SCHEMA_PROJECTION, PARALLELISM, SUPPORT_USER_DEFINED_SPLIT), SINK_JDBC(SEATUNNEL, SINK, JDBC, "connector-jdbc", GA, EXACTLY_ONCE, CDC), @@ -101,6 +103,7 @@ public enum SeaTunnelPluginMapping { SINK_AMAZON_DYNAMODB(SEATUNNEL, SINK, AMAZON_DYNAMODB, "connector-amazondynamodb", BETA), SOURCE_CASSANDRA(SEATUNNEL, SOURCE, CASSANDRA, "connector-cassandra", BETA, BATCH, SCHEMA_PROJECTION), SINK_CASSANDRA(SEATUNNEL, SINK, CASSANDRA, "connector-cassandra", BETA), + SOURCE_TABLESTORE(SEATUNNEL, SOURCE, TABLESTORE, "connector-tablestore", ALPHA), SINK_TABLESTORE(SEATUNNEL, SINK, TABLESTORE, "connector-tablestore", ALPHA), SINK_GOOGLE_FIRE_STORE(SEATUNNEL, SINK, GOOGLE_FIRE_STORE, "connector-google-firestore", UNKNOWN), @@ -108,6 +111,7 @@ public enum SeaTunnelPluginMapping { SOURCE_SQLSERVER_CDC(SEATUNNEL, SOURCE, SQLSERVER_CDC, "connector-cdc-sqlserver", GA, STREAM, EXACTLY_ONCE, PARALLELISM, SUPPORT_USER_DEFINED_SPLIT), SOURCE_ORACLE_CDC(SEATUNNEL, SOURCE, ORACLE_CDC, "connector-cdc-oracle", UNKNOWN, STREAM, EXACTLY_ONCE, PARALLELISM, SUPPORT_USER_DEFINED_SPLIT), SOURCE_POSTGRESQL_CDC(SEATUNNEL, SOURCE, POSTGRESQL_CDC, "connector-cdc-postgres", UNKNOWN, STREAM, EXACTLY_ONCE, PARALLELISM, SUPPORT_USER_DEFINED_SPLIT), + SOURCE_OPENGAUSS_CDC(SEATUNNEL, SOURCE, OPENGAUSS_CDC, "connector-cdc-opengauss", UNKNOWN, STREAM, EXACTLY_ONCE, PARALLELISM, SUPPORT_USER_DEFINED_SPLIT), SOURCE_MONGODB_CDC(SEATUNNEL, SOURCE, MONGODB_CDC, "connector-cdc-mongodb", UNKNOWN, STREAM, EXACTLY_ONCE, PARALLELISM, SUPPORT_USER_DEFINED_SPLIT), SOURCE_HIVE(SEATUNNEL, SOURCE, HIVE, "connector-hive", GA, BATCH, EXACTLY_ONCE, SCHEMA_PROJECTION, PARALLELISM), @@ -128,9 +132,11 @@ public enum SeaTunnelPluginMapping { SINK_S3REDSHIFT(SEATUNNEL, SINK, S3REDSHIFT, "connector-s3-redshift", GA, EXACTLY_ONCE), SOURCE_MAXCOMPUTE(SEATUNNEL, SOURCE, MAXCOMPUTE, "connector-maxcompute", ALPHA, BATCH, PARALLELISM), SINK_MAXCOMPUTE(SEATUNNEL, SINK, MAXCOMPUTE, "connector-maxcompute", ALPHA), + SOURCE_HBASE(SEATUNNEL, SOURCE, HBASE, "connector-hbase", ALPHA), SINK_HBASE(SEATUNNEL, SINK, HBASE, "connector-hbase", ALPHA), SOURCE_KUDU(SEATUNNEL, SOURCE, KUDU, "connector-kudu", BETA, BATCH), SINK_KUDU(SEATUNNEL, SINK, KUDU, "connector-kudu", BETA), + SINK_DRUID(SEATUNNEL, SINK, DRUID, "connector-druid", BETA), SOURCE_IOTDB(SEATUNNEL, SOURCE, IOTDB, "connector-iotdb", GA, BATCH, EXACTLY_ONCE, SCHEMA_PROJECTION, PARALLELISM), SINK_IOTDB(SEATUNNEL, SINK, IOTDB, "connector-iotdb", GA, EXACTLY_ONCE), SOURCE_OPENMLDB(SEATUNNEL, SOURCE, OPENMLDB, "connector-openmldb", BETA, BATCH, STREAM), @@ -140,17 +146,25 @@ public enum SeaTunnelPluginMapping { SINK_INFLUXDB(SEATUNNEL, SINK, INFLUXDB, "connector-influxdb", BETA), SOURCE_TDENGINE(SEATUNNEL, SOURCE, TDENGINE, "connector-tdengine", BETA), SINK_TDENGINE(SEATUNNEL, SINK, TDENGINE, "connector-tdengine", BETA), + SOURCE_SLS(SEATUNNEL, SOURCE, SLS, "connector-sls", BETA), + + SOURCE_MILVUS(SEATUNNEL, SOURCE, MILVUS, "connector-milvus", UNKNOWN), + SINK_MILVUS(SEATUNNEL, SINK, MILVUS, "connector-milvus", UNKNOWN), + SOURCE_WEB3J(SEATUNNEL, SOURCE, WEB3J, "connector-web3j", UNKNOWN), SINK_SENTRY(SEATUNNEL, SINK, SENTRY, "connector-sentry", ALPHA), SOURCE_GOOGLE_SHEETS(SEATUNNEL, SOURCE, GOOGLE_SHEETS, "connector-google-sheets", UNKNOWN, BATCH, SCHEMA_PROJECTION), - TRANSFORM_COPY(SEATUNNEL, TRANSFORM, COPY, "connector-copy", UNKNOWN), - TRANSFORM_FIELD_MAPPER(SEATUNNEL, TRANSFORM, FIELD_MAPPER, "connector-field-mapper", UNKNOWN), - TRANSFORM_FILTER_ROW_KIND(SEATUNNEL, TRANSFORM, FILTER_ROW_KIND, "connector-field-row-kind", UNKNOWN), - TRANSFORM_FILTER(SEATUNNEL, TRANSFORM, FILTER, "connector-filter", UNKNOWN), - TRANSFORM_REPLACE(SEATUNNEL, TRANSFORM, REPLACE, "connector-replace", UNKNOWN), - TRANSFORM_SPLIT(SEATUNNEL, TRANSFORM, SPLIT, "connector-split", UNKNOWN), - TRANSFORM_SQL(SEATUNNEL, TRANSFORM, SQL, "connector-sql", UNKNOWN), + TRANSFORM_COPY(SEATUNNEL, TRANSFORM, COPY, "seatunnel-transforms-v2", UNKNOWN), + TRANSFORM_FIELD_MAPPER(SEATUNNEL, TRANSFORM, FIELD_MAPPER, "seatunnel-transforms-v2", UNKNOWN), + TRANSFORM_FILTER_ROW_KIND(SEATUNNEL, TRANSFORM, FILTER_ROW_KIND, "seatunnel-transforms-v2", UNKNOWN), + TRANSFORM_FILTER(SEATUNNEL, TRANSFORM, FILTER, "seatunnel-transforms-v2", UNKNOWN), + TRANSFORM_REPLACE(SEATUNNEL, TRANSFORM, REPLACE, "seatunnel-transforms-v2", UNKNOWN), + TRANSFORM_SPLIT(SEATUNNEL, TRANSFORM, SPLIT, "seatunnel-transforms-v2", UNKNOWN), + TRANSFORM_SQL(SEATUNNEL, TRANSFORM, SQL, "seatunnel-transforms-v2", UNKNOWN), + TRANSFORM_JSON_PATH(SEATUNNEL, TRANSFORM, JSON_PATH, "seatunnel-transforms-v2", UNKNOWN), + TRANSFORM_DYNAMIC_COMPILE(SEATUNNEL, TRANSFORM, DYNAMIC_COMPILE, "seatunnel-transforms-v2", UNKNOWN), + TRANSFORM_LLM(SEATUNNEL, TRANSFORM, LLM, "seatunnel-transforms-v2", UNKNOWN), ; private SeaTunnelEngineType engineType; diff --git a/scaleph-common/src/main/java/cn/sliew/scaleph/common/dict/seatunnel/SeaTunnelPluginName.java b/scaleph-common/src/main/java/cn/sliew/scaleph/common/dict/seatunnel/SeaTunnelPluginName.java index 8b809185d..1e0d048e9 100644 --- a/scaleph-common/src/main/java/cn/sliew/scaleph/common/dict/seatunnel/SeaTunnelPluginName.java +++ b/scaleph-common/src/main/java/cn/sliew/scaleph/common/dict/seatunnel/SeaTunnelPluginName.java @@ -36,7 +36,6 @@ public enum SeaTunnelPluginName implements DictInstance { SLACK("SlackSink", "Slack"), HTTP("Http", "Http"), FEISHU("Feishu", "Feishu"), - @Deprecated WECHAT("WeChat", "WeChat"), @@ -83,6 +82,7 @@ public enum SeaTunnelPluginName implements DictInstance { SQLSERVER_CDC("SqlServer-CDC", "SqlServer-CDC"), ORACLE_CDC("Oracle-CDC", "Oracle-CDC"), POSTGRESQL_CDC("Postgres-CDC", "PostgreSQL-CDC"), + OPENGAUSS_CDC("Opengauss-CDC", "Opengauss-CDC"), MONGODB_CDC("MongoDB-CDC", "MongoDB-CDC"), HIVE("Hive", "Hive"), diff --git a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/cassandra/source/CassandraSourcePlugin.java b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/cassandra/source/CassandraSourcePlugin.java index 8759882fb..e5da625b2 100644 --- a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/cassandra/source/CassandraSourcePlugin.java +++ b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/cassandra/source/CassandraSourcePlugin.java @@ -17,9 +17,10 @@ */ package cn.sliew.scaleph.plugin.seatunnel.flink.connectors.cassandra.source; +import cn.sliew.carp.module.datasource.modal.DataSourceInfo; +import cn.sliew.carp.module.datasource.modal.nosql.CassandraDataSourceProperties; +import cn.sliew.milky.common.util.JacksonUtil; import cn.sliew.scaleph.common.dict.seatunnel.SeaTunnelPluginMapping; -import cn.sliew.scaleph.ds.modal.AbstractDataSource; -import cn.sliew.scaleph.ds.modal.nosql.CassandraDataSource; import cn.sliew.scaleph.plugin.framework.core.PluginInfo; import cn.sliew.scaleph.plugin.framework.property.PropertyDescriptor; import cn.sliew.scaleph.plugin.seatunnel.flink.SeaTunnelConnectorPlugin; @@ -63,17 +64,19 @@ public List getRequiredResources() { public ObjectNode createConf() { ObjectNode conf = super.createConf(); JsonNode jsonNode = properties.get(ResourceProperties.DATASOURCE); - CassandraDataSource dataSource = (CassandraDataSource) AbstractDataSource.fromDsInfo((ObjectNode) jsonNode); - conf.putPOJO(HOST.getName(), dataSource.getHost()); - conf.putPOJO(KEYSPACE.getName(), dataSource.getKeyspace()); - if (StringUtils.hasText(dataSource.getUsername())) { - conf.putPOJO(USERNAME.getName(), dataSource.getUsername()); + + DataSourceInfo dataSourceInfo = JacksonUtil.toObject(jsonNode, DataSourceInfo.class); + CassandraDataSourceProperties props = (CassandraDataSourceProperties) dataSourceInfo.getProps(); + conf.putPOJO(HOST.getName(), props.getHost()); + conf.putPOJO(KEYSPACE.getName(), props.getKeyspace()); + if (StringUtils.hasText(props.getUsername())) { + conf.putPOJO(USERNAME.getName(), props.getUsername()); } - if (StringUtils.hasText(dataSource.getPassword())) { - conf.putPOJO(PASSWORD.getName(), dataSource.getPassword()); + if (StringUtils.hasText(props.getPassword())) { + conf.putPOJO(PASSWORD.getName(), props.getPassword()); } - if (StringUtils.hasText(dataSource.getDatacenter())) { - conf.putPOJO(DATACENTER.getName(), dataSource.getDatacenter()); + if (StringUtils.hasText(props.getDatacenter())) { + conf.putPOJO(DATACENTER.getName(), props.getDatacenter()); } return conf; } diff --git a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/cdc/oracle/source/OracleCDCSourcePlugin.java b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/cdc/oracle/source/OracleCDCSourcePlugin.java index 9d5cfeed1..9a866a151 100644 --- a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/cdc/oracle/source/OracleCDCSourcePlugin.java +++ b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/cdc/oracle/source/OracleCDCSourcePlugin.java @@ -45,6 +45,8 @@ public OracleCDCSourcePlugin() { props.add(PASSWORD); props.add(DATABASE); props.add(OracleCDCSourceProperties.SCHEMA); + props.add(OracleCDCSourceProperties.USE_SELECT_COUNT); + props.add(OracleCDCSourceProperties.SKIP_ANALYZE); props.add(TABLE); props.add(TABLE_CONFIG); props.add(STARTUP_MODE); diff --git a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/cdc/oracle/source/OracleCDCSourceProperties.java b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/cdc/oracle/source/OracleCDCSourceProperties.java index f57efecf2..da9b90611 100644 --- a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/cdc/oracle/source/OracleCDCSourceProperties.java +++ b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/cdc/oracle/source/OracleCDCSourceProperties.java @@ -35,4 +35,20 @@ public enum OracleCDCSourceProperties { .parser(Parsers.STRING_ARRAY_PARSER) .addValidator(Validators.NON_BLANK_VALIDATOR) .validateAndBuild(); + + public static final PropertyDescriptor USE_SELECT_COUNT = new PropertyDescriptor.Builder() + .name("use_select_count") + .description("Use select count for table count rather then other methods in full stage.In this scenario, select count directly is used when it is faster to update statistics using sql from analysis table") + .type(PropertyType.BOOLEAN) + .parser(Parsers.BOOLEAN_PARSER) + .addValidator(Validators.BOOLEAN_VALIDATOR) + .validateAndBuild(); + + public static final PropertyDescriptor SKIP_ANALYZE = new PropertyDescriptor.Builder() + .name("skip_analyze") + .description("Skip the analysis of table count in full stage.In this scenario, you schedule analysis table sql to update related table statistics periodically or your table data does not change frequently") + .type(PropertyType.BOOLEAN) + .parser(Parsers.BOOLEAN_PARSER) + .addValidator(Validators.BOOLEAN_VALIDATOR) + .validateAndBuild(); } diff --git a/scaleph-ui-react/src/locales/zh-CN/pages/project.ts b/scaleph-ui-react/src/locales/zh-CN/pages/project.ts index 261df43ee..72dfefc89 100644 --- a/scaleph-ui-react/src/locales/zh-CN/pages/project.ts +++ b/scaleph-ui-react/src/locales/zh-CN/pages/project.ts @@ -793,8 +793,6 @@ export default { 'pages.project.di.step.cdc.password': '密码', 'pages.project.di.step.cdc.databases': '数据库', 'pages.project.di.step.cdc.databases.placeholder': 'db1, db2', - 'pages.project.di.step.cdc.schemas': 'Schema', - 'pages.project.di.step.cdc.schemas.placeholder': 'DEBEZIUM1, DEBEZIUM2', 'pages.project.di.step.cdc.tables': '表', 'pages.project.di.step.cdc.tables.placeholder': 'table1, table2', 'pages.project.di.step.cdc.tableConfig': '表配置', @@ -832,6 +830,12 @@ export default { 'pages.project.di.step.cdc.debeziums.value.placeholder': 'never', 'pages.project.di.step.cdc.format': 'CDC 格式', + // oracle-cdc + 'pages.project.di.step.oracle-cdc.schemaNames': 'Schema', + 'pages.project.di.step.oracle-cdc.schemaNames.placeholder': 'DEBEZIUM1, DEBEZIUM2', + 'pages.project.di.step.oracle-cdc.useSelectCount': '使用 select count()', + 'pages.project.di.step.oracle-cdc.skipAnalyze': '跳过表 analysis', + // mongodb-cdc 'pages.project.di.step.mongodb-cdc.hosts': '服务器地址', 'pages.project.di.step.mongodb-cdc.hosts.placeholder': 'localhost:27017,localhost:27018', diff --git a/scaleph-ui-react/src/pages/Project/Workspace/DataIntegration/SeaTunnel/Dag/components/node/steps/constant.tsx b/scaleph-ui-react/src/pages/Project/Workspace/DataIntegration/SeaTunnel/Dag/components/node/steps/constant.tsx index 84428031c..2efea8b15 100644 --- a/scaleph-ui-react/src/pages/Project/Workspace/DataIntegration/SeaTunnel/Dag/components/node/steps/constant.tsx +++ b/scaleph-ui-react/src/pages/Project/Workspace/DataIntegration/SeaTunnel/Dag/components/node/steps/constant.tsx @@ -702,6 +702,12 @@ export const CDCParams = { format: 'format', }; +export const OracleCDCParams = { + schemaNames: 'schema-names', + useSelectCount: 'use_select_count', + skipAnalyze: 'skip_analyze', +}; + export const MongoDBCDCParams = { hosts: 'hosts', username: 'username', diff --git a/scaleph-ui-react/src/pages/Project/Workspace/DataIntegration/SeaTunnel/Dag/components/node/steps/sink/sink-cassandra-step.tsx b/scaleph-ui-react/src/pages/Project/Workspace/DataIntegration/SeaTunnel/Dag/components/node/steps/sink/sink-cassandra-step.tsx index aea206346..9a6439210 100644 --- a/scaleph-ui-react/src/pages/Project/Workspace/DataIntegration/SeaTunnel/Dag/components/node/steps/sink/sink-cassandra-step.tsx +++ b/scaleph-ui-react/src/pages/Project/Workspace/DataIntegration/SeaTunnel/Dag/components/node/steps/sink/sink-cassandra-step.tsx @@ -1,5 +1,6 @@ import React, {useEffect} from 'react'; import {Form} from 'antd'; +import {InfoCircleOutlined} from "@ant-design/icons"; import { DrawerForm, ProFormDigit, @@ -15,7 +16,6 @@ import {CassandraParams, STEP_ATTR_TYPE} from '../constant'; import {StepSchemaService} from "../helper"; import DataSourceItem from "../dataSource"; import CommonListItem from "@/pages/Project/Workspace/DataIntegration/SeaTunnel/Dag/components/node/steps/common/list"; -import {InfoCircleOutlined} from "@ant-design/icons"; const SinkCassandraStepForm: React.FC> = ({data, visible, onVisibleChange, onOK}) => { const intl = getIntl(getLocale()); diff --git a/scaleph-ui-react/src/pages/Project/Workspace/DataIntegration/SeaTunnel/Dag/components/node/steps/source/source-cdc-oracle-step.tsx b/scaleph-ui-react/src/pages/Project/Workspace/DataIntegration/SeaTunnel/Dag/components/node/steps/source/source-cdc-oracle-step.tsx index 52a6de570..c178f8de2 100644 --- a/scaleph-ui-react/src/pages/Project/Workspace/DataIntegration/SeaTunnel/Dag/components/node/steps/source/source-cdc-oracle-step.tsx +++ b/scaleph-ui-react/src/pages/Project/Workspace/DataIntegration/SeaTunnel/Dag/components/node/steps/source/source-cdc-oracle-step.tsx @@ -14,7 +14,7 @@ import { import {getIntl, getLocale} from "@umijs/max"; import {Node, XFlow} from '@antv/xflow'; import {ModalFormProps} from '@/typings'; -import {CDCParams, STEP_ATTR_TYPE} from '../constant'; +import {CDCParams, OracleCDCParams, STEP_ATTR_TYPE} from '../constant'; import {StepSchemaService} from '../helper'; import {DictDataService} from "@/services/admin/dictData.service"; import {DICT_TYPE} from "@/constants/dictType"; @@ -83,9 +83,9 @@ const SourceCDCOracleStepForm: React.FC> = ({data, visible, colProps={{span: 8}} /> > = ({data, visible, return DictDataService.listDictDataByType2(DICT_TYPE.seatunnelCDCFormat) }} /> + + );