Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/feat_1.8_rowkey' into 1.8_test_3…
Browse files Browse the repository at this point in the history
….10.x

# Conflicts:
#	core/pom.xml
#	core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java
#	launcher/src/main/java/com/dtstack/flink/sql/launcher/LauncherMain.java
#	rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/all/AbstractRdbAllReqRow.java
  • Loading branch information
dapeng committed Jun 4, 2020
2 parents 48a12f2 + 75f0349 commit 1f467e8
Show file tree
Hide file tree
Showing 29 changed files with 925 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -292,9 +292,7 @@ public static Set<URL> registerTable(SqlTree sqlTree, StreamExecutionEnvironment

RowTypeInfo typeInfo = new RowTypeInfo(adaptTable.getSchema().getFieldTypes(), adaptTable.getSchema().getFieldNames());
DataStream adaptStream = tableEnv.toRetractStream(adaptTable, typeInfo)
.map((Tuple2<Boolean, Row> f0) -> {
return f0.f1;
})
.map((Tuple2<Boolean, Row> f0) -> f0.f1)
.returns(typeInfo);

String fields = String.join(",", typeInfo.getFieldNames());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.commons.lang.StringUtils;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.io.File;
Expand Down Expand Up @@ -102,8 +103,8 @@ public List<String> getProgramExeArgList() throws Exception {
continue;
} else if (OPTION_SQL.equalsIgnoreCase(key)) {
File file = new File(value.toString());
String content = FileUtils.readFile(file, "UTF-8");
value = URLEncoder.encode(content, Charsets.UTF_8.name());
String content = FileUtils.readFile(file, StandardCharsets.UTF_8.name());
value = URLEncoder.encode(content, StandardCharsets.UTF_8.name());
}
args.add("-" + key);
args.add(value.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public class CreateTableParser implements IParser {

private static final Pattern PATTERN = Pattern.compile(PATTERN_STR);

private static final Pattern PROP_PATTERN = Pattern.compile("^'\\s*(.+)\\s*'$");

public static CreateTableParser newInstance(){
return new CreateTableParser();
}
Expand Down Expand Up @@ -69,18 +71,27 @@ public void parseSql(String sql, SqlTree sqlTree) {
}

private Map parseProp(String propsStr){
String[] strs = propsStr.trim().split("'\\s*,");
propsStr = propsStr.replaceAll("'\\s*,", "'|");
String[] strs = propsStr.trim().split("\\|");
Map<String, Object> propMap = Maps.newHashMap();
for(int i=0; i<strs.length; i++){
List<String> ss = DtStringUtil.splitIgnoreQuota(strs[i], '=');
String key = ss.get(0).trim();
String value = ss.get(1).trim().replaceAll("'", "").trim();
String value = extractValue(ss.get(1));
propMap.put(key, value);
}

return propMap;
}

private String extractValue(String value) {
Matcher matcher = PROP_PATTERN.matcher(value);
if (matcher.find()) {
return matcher.group(1);
}
throw new RuntimeException("[" + value + "] format is invalid");
}

public static class SqlParserResult{

private String tableName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,12 @@ public void exec(String sql,

SideSQLParser sideSQLParser = new SideSQLParser();
sideSQLParser.setLocalTableCache(localTableCache);
Queue<Object> exeQueue = sideSQLParser.getExeQueue(sql, sideTableMap.keySet(), scope);
Object pollObj = null;
Queue<Object> exeQueue = sideSQLParser.getExeQueue(sql, sideTableMap.keySet());
Object pollObj;

//need clean
boolean preIsSideJoin = false;
List<FieldReplaceInfo> replaceInfoList = Lists.newArrayList();

while((pollObj = exeQueue.poll()) != null){

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,7 @@
public class WaterMarkerAssigner {

public boolean checkNeedAssignWaterMarker(AbstractSourceTableInfo tableInfo){
if(Strings.isNullOrEmpty(tableInfo.getEventTimeField())){
return false;
}

return true;
return !Strings.isNullOrEmpty(tableInfo.getEventTimeField());
}

public DataStream assignWaterMarker(DataStream<Row> dataStream, RowTypeInfo typeInfo, AbstractSourceTableInfo sourceTableInfo){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public ReplaceInfo getReplaceInfo(String field){
}

private List<ReplaceInfo> makeFormula(String formula){
if (formula == null || formula.length() <= 0) {
if(formula == null || formula.length() <= 0){
return Lists.newArrayList();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import com.dtstack.flink.sql.enums.EUpdateMode;
import com.dtstack.flink.sql.outputformat.AbstractDtRichOutputFormat;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.java.tuple.Tuple2;
Expand All @@ -41,7 +40,6 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public ReplaceInfo getReplaceInfo(String field){

private List<ReplaceInfo> makeFormula(String formula){
if(formula == null || formula.length() <= 0){
Lists.newArrayList();
return Lists.newArrayList();
}
List<ReplaceInfo> result = Lists.newArrayList();
for(String meta: splitIgnoreQuotaBrackets(formula, "\\+")){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -21,18 +21,15 @@
import com.dtstack.flink.sql.enums.ClusterMode;
import com.dtstack.flink.sql.option.Options;
import com.dtstack.flink.sql.util.PluginUtil;
import com.esotericsoftware.minlog.Log;
import org.apache.commons.io.Charsets;
import org.apache.commons.lang.StringUtils;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.MiniClusterClient;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.runtime.util.LeaderConnectionInfo;
import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
import org.apache.flink.yarn.YarnClusterDescriptor;
Expand All @@ -42,21 +39,35 @@
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.StringHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;
import java.net.URLDecoder;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.Iterator;

/**
* @author sishu.yss
*/
public class ClusterClientFactory {

private static final Logger LOG = LoggerFactory.getLogger(ClusterClientFactory.class);

private static final String HA_CLUSTER_ID = "high-availability.cluster-id";

private static final String HIGH_AVAILABILITY = "high-availability";

private static final String NODE = "NONE";

private static final String ZOOKEEPER = "zookeeper";

private static final String HADOOP_CONF = "fs.hdfs.hadoopconf";

public static ClusterClient createClusterClient(Options launcherOptions) throws Exception {
String mode = launcherOptions.getMode();
if (mode.equals(ClusterMode.standalone.name())) {
Expand All @@ -70,10 +81,12 @@ public static ClusterClient createClusterClient(Options launcherOptions) throws
public static ClusterClient createStandaloneClient(Options launcherOptions) throws Exception {
String flinkConfDir = launcherOptions.getFlinkconf();
Configuration config = GlobalConfiguration.loadConfiguration(flinkConfDir);
MiniClusterConfiguration.Builder configBuilder = new MiniClusterConfiguration.Builder();
configBuilder.setConfiguration(config);
MiniCluster miniCluster = new MiniCluster(configBuilder.build());
MiniClusterClient clusterClient = new MiniClusterClient(config, miniCluster);

LOG.info("------------config params-------------------------");
config.toMap().forEach((key, value) -> LOG.info("{}: {}", key, value));
LOG.info("-------------------------------------------");

RestClusterClient clusterClient = new RestClusterClient<>(config, "clusterClient");
LeaderConnectionInfo connectionInfo = clusterClient.getClusterConnectionInfo();
InetSocketAddress address = AkkaUtils.getInetSocketAddressFromAkkaURL(connectionInfo.getAddress());
config.setString(JobManagerOptions.ADDRESS, address.getAddress().getHostName());
Expand All @@ -89,18 +102,21 @@ public static ClusterClient createYarnSessionClient(Options launcherOptions) {

if (StringUtils.isNotBlank(yarnConfDir)) {
try {
config.setString("fs.hdfs.hadoopconf", yarnConfDir);
boolean isHighAvailability;

config.setString(HADOOP_CONF, yarnConfDir);
FileSystem.initialize(config);

YarnConfiguration yarnConf = YarnConfLoader.getYarnConf(yarnConfDir);
YarnClient yarnClient = YarnClient.createYarnClient();
yarnClient.init(yarnConf);
yarnClient.start();
ApplicationId applicationId = null;
ApplicationId applicationId;

String yarnSessionConf = launcherOptions.getYarnSessionConf();
yarnSessionConf = URLDecoder.decode(yarnSessionConf, Charsets.UTF_8.toString());
Properties yarnSessionConfProperties = PluginUtil.jsonStrToObject(yarnSessionConf, Properties.class);

Object yid = yarnSessionConfProperties.get("yid");

if (null != yid) {
Expand All @@ -109,20 +125,30 @@ public static ClusterClient createYarnSessionClient(Options launcherOptions) {
applicationId = getYarnClusterApplicationId(yarnClient);
}

Log.info("applicationId={}", applicationId.toString());
LOG.info("current applicationId = {}", applicationId.toString());

if (StringUtils.isEmpty(applicationId.toString())) {
throw new RuntimeException("No flink session found on yarn cluster.");
}

isHighAvailability = config.getString(HIGH_AVAILABILITY, NODE).equals(ZOOKEEPER);

if (isHighAvailability && config.getString(HA_CLUSTER_ID, null) == null) {
config.setString(HA_CLUSTER_ID, applicationId.toString());
}

LOG.info("------------config params-------------------------");
config.toMap().forEach((key, value) -> LOG.info("{}: {}", key, value));
LOG.info("-------------------------------------------");

AbstractYarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(config, yarnConf, flinkConfDir, yarnClient, false);
ClusterClient clusterClient = clusterDescriptor.retrieve(applicationId);
clusterClient.setDetached(true);
return clusterClient;
} catch (Exception e) {
throw new RuntimeException(e);
}
}else{
} else {
throw new RuntimeException("yarn mode must set param of 'yarnconf'!!!");
}
}
Expand Down Expand Up @@ -158,7 +184,7 @@ private static ApplicationId getYarnClusterApplicationId(YarnClient yarnClient)

}

if (null == applicationId) {
if (applicationId == null || StringUtils.isEmpty(applicationId.toString())) {
throw new RuntimeException("No flink session found on yarn cluster.");
}
return applicationId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
* limitations under the License.
*/



package com.dtstack.flink.sql.launcher;

Expand Down Expand Up @@ -55,6 +54,7 @@
/**
* Date: 2017/2/20
* Company: www.dtstack.com
*
* @author xuchao
*/

Expand All @@ -63,19 +63,17 @@ public class LauncherMain {
private static final Logger LOG = LoggerFactory.getLogger(LauncherMain.class);
private static final String CORE_JAR = "core";

private static final Logger LOG = LoggerFactory.getLogger(LauncherMain.class);

private static String SP = File.separator;

private static String getLocalCoreJarPath(String localSqlRootJar) throws Exception {
String jarPath = PluginUtil.getCoreJarFileName(localSqlRootJar, CORE_JAR);
String corePath = localSqlRootJar + SP + jarPath;
return corePath;
return localSqlRootJar + SP + jarPath;
}

public static void main(String[] args) throws Exception {

LOG.info("----start----");

if (args.length == 1 && args[0].endsWith(".json")){
if (args.length == 1 && args[0].endsWith(".json")) {
args = parseJson(args);
}

Expand All @@ -88,28 +86,33 @@ public static void main(String[] args) throws Exception {
confProp = URLDecoder.decode(confProp, Charsets.UTF_8.toString());
Properties confProperties = PluginUtil.jsonStrToObject(confProp, Properties.class);

if(mode.equals(ClusterMode.local.name())) {
String[] localArgs = argList.toArray(new String[argList.size()]);
LOG.info("current job mode is {}", mode);

if (mode.equals(ClusterMode.local.name())) {
String[] localArgs = argList.toArray(new String[0]);
Main.main(localArgs);
return;
}

String pluginRoot = launcherOptions.getLocalSqlPluginPath();
File jarFile = new File(getLocalCoreJarPath(pluginRoot));
String[] remoteArgs = argList.toArray(new String[argList.size()]);
String[] remoteArgs = argList.toArray(new String[0]);
PackagedProgram program = new PackagedProgram(jarFile, Lists.newArrayList(), remoteArgs);

String savePointPath = confProperties.getProperty(ConfigConstrant.SAVE_POINT_PATH_KEY);
if(StringUtils.isNotBlank(savePointPath)){
if (StringUtils.isNotBlank(savePointPath)) {
String allowNonRestoredState = confProperties.getOrDefault(ConfigConstrant.ALLOW_NON_RESTORED_STATE_KEY, "false").toString();
program.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savePointPath, BooleanUtils.toBoolean(allowNonRestoredState)));
}

if(mode.equals(ClusterMode.yarnPer.name())){
if (mode.equals(ClusterMode.yarnPer.name())) {
String flinkConfDir = launcherOptions.getFlinkconf();
Configuration config = StringUtils.isEmpty(flinkConfDir) ? new Configuration() : GlobalConfiguration.loadConfiguration(flinkConfDir);
JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, config, 1);
PerJobSubmitter.submit(launcherOptions, jobGraph, config);

LOG.info("current jobID is {}", jobGraph.getJobID());

LOG.info("submit applicationId is {}", PerJobSubmitter.submit(launcherOptions, jobGraph, config));
} else {
ClusterClient clusterClient = ClusterClientFactory.createClusterClient(launcherOptions);
clusterClient.run(program, 1);
Expand All @@ -127,7 +130,6 @@ private static String[] parseJson(String[] args) throws IOException {
list.add("-" + entry.getKey());
list.add(entry.getValue().toString());
}
String[] array = list.toArray(new String[list.size()]);
return array;
return list.toArray(new String[0]);
}
}
Loading

0 comments on commit 1f467e8

Please sign in to comment.