From 99289fc33432b7695134044b912222ba6d8db8b2 Mon Sep 17 00:00:00 2001 From: ouyangwulink Date: Sat, 5 Oct 2024 17:00:20 +0800 Subject: [PATCH 1/6] [fixed] fixed envsetting unload flink-conf.yaml,if not like catalogstore conf will not work --- .../streampark/flink/core/FlinkTableInitializer.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala index 7cb463ed75..8e1e19b7b5 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala @@ -118,6 +118,12 @@ private[flink] class FlinkTableInitializer(args: Array[String], apiType: ApiType } } + parameter.get(KEY_FLINK_CONF(), null) match { + case null | "" => + throw new ExceptionInInitializerError( + "[StreamPark] Usage:can't find config,please set \"--flink.conf $conf \" in main arguments") + case conf => builder.withConfiguration(Configuration.fromMap(PropertiesUtils.fromYamlText(DeflaterUtils.unzipString(conf)))) + } val buildWith = (parameter.get(KEY_FLINK_TABLE_CATALOG), parameter.get(KEY_FLINK_TABLE_DATABASE)) buildWith match { From e4f932e5e4eb1a79c502916db91dd4483a80440f Mon Sep 17 00:00:00 2001 From: wulin Date: Sun, 12 Jan 2025 14:32:34 +0800 Subject: [PATCH 2/6] [Feature] Add CDC jar client for support cdc yaml --- pom.xml | 1 + .../common/util/PropertiesUtils.scala | 6 + .../streampark-flink-cdcclient/pom.xml | 124 ++++++++++++++++++ .../apache/streampark/cdc/cli/CDCClient.java | 80 +++++++++++ .../streampark/cdc/cli/CDCExecutor.java | 69 ++++++++++ .../org/apache/streampark/package-info.java | 1 + 6 files changed, 281 insertions(+) create mode 100644 streampark-flink/streampark-flink-cdcclient/pom.xml create mode 100644 streampark-flink/streampark-flink-cdcclient/src/main/java/org/apache/streampark/cdc/cli/CDCClient.java create mode 100644 streampark-flink/streampark-flink-cdcclient/src/main/java/org/apache/streampark/cdc/cli/CDCExecutor.java create mode 100644 streampark-flink/streampark-flink-cdcclient/src/test/java/org/apache/streampark/package-info.java diff --git a/pom.xml b/pom.xml index 4a65812bae..3c4cbf74ec 100644 --- a/pom.xml +++ b/pom.xml @@ -67,6 +67,7 @@ streampark-flink streampark-spark streampark-console + streampark-flink/streampark-flink-cdcclient diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala index f7f68ae458..08eeeb6fdf 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala @@ -101,6 +101,12 @@ object PropertiesUtils extends Logger { } } + def fromYamlTextToJava(text: String): java.util.Map[String, String] = { + val scalaMap = fromYamlText(text) + val javaMap: java.util.Map[String, String] = scalaMap.asJava + javaMap + } + def fromHoconText(conf: String): Map[String, String] = { require(conf != null, s"[StreamPark] fromHoconText: Hocon content must not be null") try parseHoconByReader(new StringReader(conf)) diff --git a/streampark-flink/streampark-flink-cdcclient/pom.xml b/streampark-flink/streampark-flink-cdcclient/pom.xml new file mode 100644 index 0000000000..53a2adf714 --- /dev/null +++ b/streampark-flink/streampark-flink-cdcclient/pom.xml @@ -0,0 +1,124 @@ + + + 4.0.0 + + org.apache.streampark + streampark + 2.2.0-SNAPSHOT + + + streampark-flink-cdc-client_${scala.binary.version} + 2.2.0-SNAPSHOT + StreamPark : Flink CDC Client + + + 3.2.1 + + + + + org.apache.flink + flink-cdc-common + ${flink.cdc.version} + + + org.apache.flink + flink-cdc-runtime + ${flink.cdc.version} + + + org.apache.flink + flink-cdc-cli + ${flink.cdc.version} + + + org.apache.flink + flink-cdc-composer + ${flink.cdc.version} + + + + org.apache.flink + flink-java + ${flink.version} + provided + + + org.apache.flink + flink-runtime + ${flink.version} + provided + + + + org.apache.streampark + streampark-common_${scala.binary.version} + + + org.apache.flink + flink-yarn + 1.18.1 + provided + + + org.slf4j + slf4j-api + + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + shade-dist + + shade + + package + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + false + false + false + + + * + + + + + org.apache.commons.cli + org.apache.flink.cdc.shaded.com.apache.commons.cli + + + org.apache.calcite + org.apache.flink.cdc.shaded.org.apache.calcite + + + + + org.apache.streampark.cdc.CDCClient + + + + + + + + + + diff --git a/streampark-flink/streampark-flink-cdcclient/src/main/java/org/apache/streampark/cdc/cli/CDCClient.java b/streampark-flink/streampark-flink-cdcclient/src/main/java/org/apache/streampark/cdc/cli/CDCClient.java new file mode 100644 index 0000000000..5d31a99187 --- /dev/null +++ b/streampark-flink/streampark-flink-cdcclient/src/main/java/org/apache/streampark/cdc/cli/CDCClient.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.cdc.cli; + +import org.apache.streampark.common.conf.ConfigKeys; +import org.apache.streampark.common.util.DeflaterUtils; +import org.apache.streampark.common.util.PropertiesUtils; + +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.cdc.composer.PipelineExecution; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.util.StringUtils; +import org.apache.flink.yarn.configuration.YarnConfigOptions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; + +/** + * cdc client + */ +public class CDCClient { + + private static final Logger LOG = LoggerFactory.getLogger(CDCClient.class); + + public static void main(String[] args) throws Exception { + ParameterTool parameter = ParameterTool.fromArgs(args); + Map configMap = new HashMap<>(); + String cdcYamlDecode = parameter.get(ConfigKeys.KEY_FLINK_SQL(null)); + String appNameDecode = parameter.get(ConfigKeys.KEY_APP_NAME(null)); + String flinkConfigDecode = parameter.get(ConfigKeys.KEY_FLINK_CONF(null)); + String parallelism = parameter.get(ConfigKeys.KEY_FLINK_PARALLELISM(null)); + if (StringUtils.isNullOrWhitespaceOnly(cdcYamlDecode) + || StringUtils.isNullOrWhitespaceOnly(appNameDecode) + || StringUtils.isNullOrWhitespaceOnly(flinkConfigDecode)) { + LOG.error("--flink.conf or --app.name must not be null."); + return; + } + + String cdcYaml = DeflaterUtils.unzipString(cdcYamlDecode); + String appName = DeflaterUtils.unzipString(appNameDecode); + String flinkConfigString = DeflaterUtils.unzipString(flinkConfigDecode); + configMap.putAll(PropertiesUtils.fromYamlTextAsJava(flinkConfigString)); + configMap.put(YarnConfigOptions.APPLICATION_NAME.key(), appName); + configMap.put(CoreOptions.DEFAULT_PARALLELISM.key(), parallelism); + Configuration flinkConfig = Configuration.fromMap(configMap); + LOG.debug("Flink cdc config {}", flinkConfig); + LOG.debug("Flink cdc yaml {}", cdcYaml); + PipelineExecution.ExecutionInfo result = + new CDCExecutor(cdcYaml, flinkConfig, new ArrayList<>(), SavepointRestoreSettings.none()).run(); + printExecutionInfo(result); + + } + + private static void printExecutionInfo(PipelineExecution.ExecutionInfo info) { + System.out.println("Pipeline has been submitted to cluster."); + System.out.printf("Job ID: %s\n", info.getId()); + System.out.printf("Job Description: %s\n", info.getDescription()); + } +} diff --git a/streampark-flink/streampark-flink-cdcclient/src/main/java/org/apache/streampark/cdc/cli/CDCExecutor.java b/streampark-flink/streampark-flink-cdcclient/src/main/java/org/apache/streampark/cdc/cli/CDCExecutor.java new file mode 100644 index 0000000000..973b869e8b --- /dev/null +++ b/streampark-flink/streampark-flink-cdcclient/src/main/java/org/apache/streampark/cdc/cli/CDCExecutor.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.cdc.cli; + +import org.apache.flink.cdc.cli.parser.PipelineDefinitionParser; +import org.apache.flink.cdc.cli.parser.YamlPipelineDefinitionParser; +import org.apache.flink.cdc.cli.utils.FlinkEnvironmentUtils; +import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.cdc.composer.PipelineComposer; +import org.apache.flink.cdc.composer.PipelineExecution; +import org.apache.flink.cdc.composer.definition.PipelineDef; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; + +import java.nio.file.Path; +import java.util.List; + +/** + * cdc executor + */ +public class CDCExecutor { + + private final String pipelineString; + private final Configuration configuration; + private final SavepointRestoreSettings savePointSettings; + private final List additionalJar; + + private PipelineComposer composer; + + public CDCExecutor(String pipelineString, + Configuration flinkConfig, + List additionalJar, + SavepointRestoreSettings savePointRestoreSettings) { + this.pipelineString = pipelineString; + this.configuration = flinkConfig; + this.additionalJar = additionalJar; + this.savePointSettings = savePointRestoreSettings; + } + + public PipelineExecution.ExecutionInfo run() throws Exception { + PipelineDefinitionParser pipelineDefinitionParser = new YamlPipelineDefinitionParser(); + PipelineDef pipelineDef = pipelineDefinitionParser.parse(pipelineString, configuration); + PipelineComposer composer = getComposer(); + PipelineExecution execution = composer.compose(pipelineDef); + return execution.execute(); + } + + private PipelineComposer getComposer() throws Exception { + if (composer == null) { + return FlinkEnvironmentUtils.createComposer( + true, configuration, additionalJar, savePointSettings); + } + return composer; + } +} diff --git a/streampark-flink/streampark-flink-cdcclient/src/test/java/org/apache/streampark/package-info.java b/streampark-flink/streampark-flink-cdcclient/src/test/java/org/apache/streampark/package-info.java new file mode 100644 index 0000000000..0ef7971d70 --- /dev/null +++ b/streampark-flink/streampark-flink-cdcclient/src/test/java/org/apache/streampark/package-info.java @@ -0,0 +1 @@ +package org.apache.streampark; From f8d3a7ccded1c6166bac7c606add96a79b6072a8 Mon Sep 17 00:00:00 2001 From: wulin Date: Sun, 12 Jan 2025 15:32:55 +0800 Subject: [PATCH 3/6] [Feature] The front-end is modified to support the cdc yaml api --- .../common/constants/Constants.java | 6 ++++- .../console/core/util/ServiceHelper.java | 24 +++++++++++++++++++ .../streampark-flink-cdcclient/pom.xml | 2 +- .../org/apache/streampark/package-info.java | 1 - 4 files changed, 30 insertions(+), 3 deletions(-) delete mode 100644 streampark-flink/streampark-flink-cdcclient/src/test/java/org/apache/streampark/package-info.java diff --git a/streampark-common/src/main/java/org/apache/streampark/common/constants/Constants.java b/streampark-common/src/main/java/org/apache/streampark/common/constants/Constants.java index b6d84e2727..8162890605 100644 --- a/streampark-common/src/main/java/org/apache/streampark/common/constants/Constants.java +++ b/streampark-common/src/main/java/org/apache/streampark/common/constants/Constants.java @@ -19,7 +19,9 @@ import java.time.Duration; -/** A constant class to hold the constants variables. */ +/** + * A constant class to hold the constants variables. + */ public final class Constants { private Constants() { @@ -51,6 +53,8 @@ private Constants() { public static final String STREAMPARK_SPARKSQL_CLIENT_CLASS = "org.apache.streampark.spark.cli.SqlClient"; + public static final String STREAMPARK_FLINKCDC_CLIENT_CLASS = "org.apache.streampark.cdc.cli.CDCClient"; + public static final String PYTHON_EXECUTABLE = "venv.zip/venv/bin/python3"; public static final String SINGLE_SLASH = "/"; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/util/ServiceHelper.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/util/ServiceHelper.java index 252b7364e1..02a6e8ab39 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/util/ServiceHelper.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/util/ServiceHelper.java @@ -39,6 +39,8 @@ public class ServiceHelper { private static String flinkSqlClientJar = null; + private static String flinkCDCClientJar = null; + private static String sparkSqlClientJar = null; public static User getLoginUser() { @@ -58,6 +60,28 @@ public static Long getUserId() { return null; } + public static String getFlinkCDCClientJar(FlinkEnv flinkEnv) { + if (flinkCDCClientJar == null) { + File localClient = WebUtils.getAppClientDir(); + ApiAlertException.throwIfFalse(localClient.exists(), + "[StreamPark]" + localClient + " no exists. please check."); + String regex = String.format("streampark-flink-cdcclient_%s-.*\\.jar", flinkEnv.getScalaVersion()); + + List jars = Arrays.stream(Objects.requireNonNull(localClient.list())) + .filter(x -> x.matches(regex)) + .collect(Collectors.toList()); + ApiAlertException.throwIfTrue( + jars.isEmpty(), + "[StreamPark] can't found streampark-flink-cdcclient jar in " + localClient); + + ApiAlertException.throwIfTrue( + jars.size() > 1, + "[StreamPark] found multiple streampark-flink-cdclient jar in " + localClient); + flinkCDCClientJar = jars.get(0); + } + return flinkCDCClientJar; + } + public static String getFlinkSqlClientJar(FlinkEnv flinkEnv) { if (flinkSqlClientJar == null) { File localClient = WebUtils.getAppClientDir(); diff --git a/streampark-flink/streampark-flink-cdcclient/pom.xml b/streampark-flink/streampark-flink-cdcclient/pom.xml index 53a2adf714..060cca6e14 100644 --- a/streampark-flink/streampark-flink-cdcclient/pom.xml +++ b/streampark-flink/streampark-flink-cdcclient/pom.xml @@ -111,7 +111,7 @@ - org.apache.streampark.cdc.CDCClient + org.apache.streampark.cdc.cli.CDCClient diff --git a/streampark-flink/streampark-flink-cdcclient/src/test/java/org/apache/streampark/package-info.java b/streampark-flink/streampark-flink-cdcclient/src/test/java/org/apache/streampark/package-info.java deleted file mode 100644 index 0ef7971d70..0000000000 --- a/streampark-flink/streampark-flink-cdcclient/src/test/java/org/apache/streampark/package-info.java +++ /dev/null @@ -1 +0,0 @@ -package org.apache.streampark; From 951aced67057f9819d1f365bc5ac2d326ad7a6a5 Mon Sep 17 00:00:00 2001 From: wulin Date: Mon, 13 Jan 2025 20:29:55 +0800 Subject: [PATCH 4/6] fixed package and log info --- .../streampark/common/constants/Constants.java | 2 +- .../streampark-flink-cdcclient/pom.xml | 18 +++++++++++++++++- .../{ => flink}/cdc/cli/CDCClient.java | 12 +++++------- .../{ => flink}/cdc/cli/CDCExecutor.java | 2 +- .../streampark/flink/cdc/cli/package-info.java | 18 ++++++++++++++++++ 5 files changed, 42 insertions(+), 10 deletions(-) rename streampark-flink/streampark-flink-cdcclient/src/main/java/org/apache/streampark/{ => flink}/cdc/cli/CDCClient.java (96%) rename streampark-flink/streampark-flink-cdcclient/src/main/java/org/apache/streampark/{ => flink}/cdc/cli/CDCExecutor.java (98%) create mode 100644 streampark-flink/streampark-flink-cdcclient/src/test/java/org/apache/streampark/flink/cdc/cli/package-info.java diff --git a/streampark-common/src/main/java/org/apache/streampark/common/constants/Constants.java b/streampark-common/src/main/java/org/apache/streampark/common/constants/Constants.java index 8162890605..c8f27bcb5a 100644 --- a/streampark-common/src/main/java/org/apache/streampark/common/constants/Constants.java +++ b/streampark-common/src/main/java/org/apache/streampark/common/constants/Constants.java @@ -53,7 +53,7 @@ private Constants() { public static final String STREAMPARK_SPARKSQL_CLIENT_CLASS = "org.apache.streampark.spark.cli.SqlClient"; - public static final String STREAMPARK_FLINKCDC_CLIENT_CLASS = "org.apache.streampark.cdc.cli.CDCClient"; + public static final String STREAMPARK_FLINKCDC_CLIENT_CLASS = "org.apache.streampark.flink.cdc.cli.CDCClient"; public static final String PYTHON_EXECUTABLE = "venv.zip/venv/bin/python3"; diff --git a/streampark-flink/streampark-flink-cdcclient/pom.xml b/streampark-flink/streampark-flink-cdcclient/pom.xml index 060cca6e14..dced335426 100644 --- a/streampark-flink/streampark-flink-cdcclient/pom.xml +++ b/streampark-flink/streampark-flink-cdcclient/pom.xml @@ -1,4 +1,20 @@ + 4.0.0 @@ -111,7 +127,7 @@ - org.apache.streampark.cdc.cli.CDCClient + org.apache.streampark.flink.cdc.cli.CDCClient diff --git a/streampark-flink/streampark-flink-cdcclient/src/main/java/org/apache/streampark/cdc/cli/CDCClient.java b/streampark-flink/streampark-flink-cdcclient/src/main/java/org/apache/streampark/flink/cdc/cli/CDCClient.java similarity index 96% rename from streampark-flink/streampark-flink-cdcclient/src/main/java/org/apache/streampark/cdc/cli/CDCClient.java rename to streampark-flink/streampark-flink-cdcclient/src/main/java/org/apache/streampark/flink/cdc/cli/CDCClient.java index 5d31a99187..9f9414b685 100644 --- a/streampark-flink/streampark-flink-cdcclient/src/main/java/org/apache/streampark/cdc/cli/CDCClient.java +++ b/streampark-flink/streampark-flink-cdcclient/src/main/java/org/apache/streampark/flink/cdc/cli/CDCClient.java @@ -15,11 +15,7 @@ * limitations under the License. */ -package org.apache.streampark.cdc.cli; - -import org.apache.streampark.common.conf.ConfigKeys; -import org.apache.streampark.common.util.DeflaterUtils; -import org.apache.streampark.common.util.PropertiesUtils; +package org.apache.streampark.flink.cdc.cli; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.cdc.common.configuration.Configuration; @@ -28,7 +24,9 @@ import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.util.StringUtils; import org.apache.flink.yarn.configuration.YarnConfigOptions; - +import org.apache.streampark.common.conf.ConfigKeys; +import org.apache.streampark.common.util.DeflaterUtils; +import org.apache.streampark.common.util.PropertiesUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,7 +51,7 @@ public static void main(String[] args) throws Exception { if (StringUtils.isNullOrWhitespaceOnly(cdcYamlDecode) || StringUtils.isNullOrWhitespaceOnly(appNameDecode) || StringUtils.isNullOrWhitespaceOnly(flinkConfigDecode)) { - LOG.error("--flink.conf or --app.name must not be null."); + LOG.error("--flink.conf or --app.name or --sql as cdc yaml must not be null."); return; } diff --git a/streampark-flink/streampark-flink-cdcclient/src/main/java/org/apache/streampark/cdc/cli/CDCExecutor.java b/streampark-flink/streampark-flink-cdcclient/src/main/java/org/apache/streampark/flink/cdc/cli/CDCExecutor.java similarity index 98% rename from streampark-flink/streampark-flink-cdcclient/src/main/java/org/apache/streampark/cdc/cli/CDCExecutor.java rename to streampark-flink/streampark-flink-cdcclient/src/main/java/org/apache/streampark/flink/cdc/cli/CDCExecutor.java index 973b869e8b..78a46d7a84 100644 --- a/streampark-flink/streampark-flink-cdcclient/src/main/java/org/apache/streampark/cdc/cli/CDCExecutor.java +++ b/streampark-flink/streampark-flink-cdcclient/src/main/java/org/apache/streampark/flink/cdc/cli/CDCExecutor.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.streampark.cdc.cli; +package org.apache.streampark.flink.cdc.cli; import org.apache.flink.cdc.cli.parser.PipelineDefinitionParser; import org.apache.flink.cdc.cli.parser.YamlPipelineDefinitionParser; diff --git a/streampark-flink/streampark-flink-cdcclient/src/test/java/org/apache/streampark/flink/cdc/cli/package-info.java b/streampark-flink/streampark-flink-cdcclient/src/test/java/org/apache/streampark/flink/cdc/cli/package-info.java new file mode 100644 index 0000000000..b85a06e754 --- /dev/null +++ b/streampark-flink/streampark-flink-cdcclient/src/test/java/org/apache/streampark/flink/cdc/cli/package-info.java @@ -0,0 +1,18 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.flink.cdc.cli; From eeb763acfda72a293c222200d2b9b4db173b2fae Mon Sep 17 00:00:00 2001 From: wulin Date: Mon, 13 Jan 2025 20:35:19 +0800 Subject: [PATCH 5/6] delete unused method --- .../streampark/common/util/PropertiesUtils.scala | 6 ------ .../org/apache/streampark/flink/cdc/cli/CDCClient.java | 10 ++++++---- 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala index 08eeeb6fdf..f7f68ae458 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala @@ -101,12 +101,6 @@ object PropertiesUtils extends Logger { } } - def fromYamlTextToJava(text: String): java.util.Map[String, String] = { - val scalaMap = fromYamlText(text) - val javaMap: java.util.Map[String, String] = scalaMap.asJava - javaMap - } - def fromHoconText(conf: String): Map[String, String] = { require(conf != null, s"[StreamPark] fromHoconText: Hocon content must not be null") try parseHoconByReader(new StringReader(conf)) diff --git a/streampark-flink/streampark-flink-cdcclient/src/main/java/org/apache/streampark/flink/cdc/cli/CDCClient.java b/streampark-flink/streampark-flink-cdcclient/src/main/java/org/apache/streampark/flink/cdc/cli/CDCClient.java index 9f9414b685..3793595483 100644 --- a/streampark-flink/streampark-flink-cdcclient/src/main/java/org/apache/streampark/flink/cdc/cli/CDCClient.java +++ b/streampark-flink/streampark-flink-cdcclient/src/main/java/org/apache/streampark/flink/cdc/cli/CDCClient.java @@ -17,6 +17,10 @@ package org.apache.streampark.flink.cdc.cli; +import org.apache.streampark.common.conf.ConfigKeys; +import org.apache.streampark.common.util.DeflaterUtils; +import org.apache.streampark.common.util.PropertiesUtils; + import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.cdc.common.configuration.Configuration; import org.apache.flink.cdc.composer.PipelineExecution; @@ -24,9 +28,7 @@ import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.util.StringUtils; import org.apache.flink.yarn.configuration.YarnConfigOptions; -import org.apache.streampark.common.conf.ConfigKeys; -import org.apache.streampark.common.util.DeflaterUtils; -import org.apache.streampark.common.util.PropertiesUtils; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,7 +53,7 @@ public static void main(String[] args) throws Exception { if (StringUtils.isNullOrWhitespaceOnly(cdcYamlDecode) || StringUtils.isNullOrWhitespaceOnly(appNameDecode) || StringUtils.isNullOrWhitespaceOnly(flinkConfigDecode)) { - LOG.error("--flink.conf or --app.name or --sql as cdc yaml must not be null."); + LOG.error("--flink.conf or --app.name or `cdc yaml` must not be null."); return; } From d7f0444991b4545bbab2c9a5d8762fe3c066e45b Mon Sep 17 00:00:00 2001 From: wulin Date: Mon, 13 Jan 2025 21:18:55 +0800 Subject: [PATCH 6/6] fixed module --- pom.xml | 1 - streampark-flink/pom.xml | 1 + streampark-flink/streampark-flink-cdcclient/pom.xml | 2 +- 3 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 3c4cbf74ec..4a65812bae 100644 --- a/pom.xml +++ b/pom.xml @@ -67,7 +67,6 @@ streampark-flink streampark-spark streampark-console - streampark-flink/streampark-flink-cdcclient diff --git a/streampark-flink/pom.xml b/streampark-flink/pom.xml index 7976177a70..1543627153 100644 --- a/streampark-flink/pom.xml +++ b/streampark-flink/pom.xml @@ -39,6 +39,7 @@ streampark-flink-kubernetes streampark-flink-catalog-store streampark-flink-connector-plugin + streampark-flink-cdcclient diff --git a/streampark-flink/streampark-flink-cdcclient/pom.xml b/streampark-flink/streampark-flink-cdcclient/pom.xml index dced335426..ca972a301b 100644 --- a/streampark-flink/streampark-flink-cdcclient/pom.xml +++ b/streampark-flink/streampark-flink-cdcclient/pom.xml @@ -20,7 +20,7 @@ 4.0.0 org.apache.streampark - streampark + streampark-flink 2.2.0-SNAPSHOT