Skip to content

Commit

Permalink
Merge branch '1.8_release_3.10.x' into 1.8_release-github
Browse files Browse the repository at this point in the history
# Conflicts:
#	launcher/src/main/java/com/dtstack/flink/sql/launcher/ClusterClientFactory.java
#	launcher/src/main/java/com/dtstack/flink/sql/launcher/LauncherMain.java
  • Loading branch information
dapeng committed Jun 23, 2020
2 parents a9f9299 + 31f1ea1 commit d6e7274
Show file tree
Hide file tree
Showing 76 changed files with 2,174 additions and 185 deletions.
2 changes: 1 addition & 1 deletion ci/sonar_notify.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/bin/bash
#参考钉钉文档 https://open-doc.dingtalk.com/microapp/serverapi2/qf2nxq
sonarreport=$(curl -s http://172.16.100.198:8082/?projectname=dt-insight-engine/flinkStreamSQL)
curl -s "https://oapi.dingtalk.com/robot/send?access_token=71555061297a53d3ac922a6f4d94285d8e23bccdca0c00b4dc6df0a2d49da724" \
curl -s "https://oapi.dingtalk.com/robot/send?access_token=58fd731d8bed3b17708d3aa27e49a7e2c41c7e6545f6c4be3170963a7bba7e2a" \
-H "Content-Type: application/json" \
-d "{
\"msgtype\": \"markdown\",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,6 @@ public void open(Configuration parameters) throws Exception {
vo.setFileResolverCachingEnabled(false);
Vertx vertx = Vertx.vertx(vo);
setRdbSqlClient(JDBCClient.createNonShared(vertx, clickhouseClientConfig));
setExecutor(new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(10), new DTThreadFactory("clickhouseAsyncExec")));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.minicluster.MiniCluster;
Expand Down Expand Up @@ -93,33 +94,48 @@ public JobExecutionResult execute(String jobName) throws Exception {
// transform the streaming program into a JobGraph
StreamGraph streamGraph = getStreamGraph();
streamGraph.setJobName(jobName);
return execute(streamGraph);
}

public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {

JobGraph jobGraph = streamGraph.getJobGraph();
jobGraph.setClasspaths(classpaths);
jobGraph.setAllowQueuedScheduling(true);

Configuration configuration = new Configuration();
configuration.addAll(jobGraph.getJobConfiguration());

configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "512M");
configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());
configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "0");

// add (and override) the settings with what the user defined
configuration.addAll(this.conf);

MiniClusterConfiguration.Builder configBuilder = new MiniClusterConfiguration.Builder();
configBuilder.setConfiguration(configuration);
if (!configuration.contains(RestOptions.BIND_PORT)) {
configuration.setString(RestOptions.BIND_PORT, "0");
}

int numSlotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());

MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
.setConfiguration(configuration)
.setNumSlotsPerTaskManager(numSlotsPerTaskManager)
.build();

if (LOG.isInfoEnabled()) {
LOG.info("Running job on local embedded Flink mini cluster");
}

try (MiniCluster exec = new MiniCluster(configBuilder.build());) {
exec.start();
JobExecutionResult jobExecutionResult = exec.executeJobBlocking(jobGraph);
MiniCluster miniCluster = new MiniCluster(cfg);

try {
miniCluster.start();
configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().get().getPort());

return miniCluster.executeJobBlocking(jobGraph);
}
finally {
transformations.clear();
return jobExecutionResult;
} catch (Exception e) {
throw new RuntimeException(e);
miniCluster.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -200,14 +200,18 @@ private static void sqlTranslation(String localSqlPluginPath,

SideSqlExec sideSqlExec = new SideSqlExec();
sideSqlExec.setLocalSqlPluginPath(localSqlPluginPath);

int scope = 0;
for (CreateTmpTableParser.SqlParserResult result : sqlTree.getTmpSqlList()) {
sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache, queryConfig, result);
sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache, queryConfig, result, scope + "");
scope++;
}

for (InsertSqlParser.SqlParseResult result : sqlTree.getExecSqlList()) {
if (LOG.isInfoEnabled()) {
LOG.info("exe-sql:\n" + result.getExecSql());
}

boolean isSide = false;
for (String tableName : result.getTargetTableList()) {
if (sqlTree.getTmpTableMap().containsKey(tableName)) {
Expand All @@ -218,7 +222,7 @@ private static void sqlTranslation(String localSqlPluginPath,
SqlNode sqlNode = flinkPlanner.parse(realSql);
String tmpSql = ((SqlInsert) sqlNode).getSource().toString();
tmp.setExecSql(tmpSql);
sideSqlExec.exec(tmp.getExecSql(), sideTableMap, tableEnv, registerTableCache, queryConfig, tmp);
sideSqlExec.exec(tmp.getExecSql(), sideTableMap, tableEnv, registerTableCache, queryConfig, tmp, scope + "");
} else {
for (String sourceTable : result.getSourceTableList()) {
if (sideTableMap.containsKey(sourceTable)) {
Expand All @@ -228,7 +232,7 @@ private static void sqlTranslation(String localSqlPluginPath,
}
if (isSide) {
//sql-dimensional table contains the dimension table of execution
sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache, queryConfig, null);
sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache, queryConfig, null, null);
} else {
LOG.info("----------exec sql without dimension join-----------");
LOG.info("----------real sql exec is--------------------------\n{}", result.getExecSql());
Expand All @@ -238,6 +242,8 @@ private static void sqlTranslation(String localSqlPluginPath,
}
}
}

scope++;
}
}
}
Expand Down Expand Up @@ -289,9 +295,7 @@ public static Set<URL> registerTable(SqlTree sqlTree, StreamExecutionEnvironment

RowTypeInfo typeInfo = new RowTypeInfo(adaptTable.getSchema().getFieldTypes(), adaptTable.getSchema().getFieldNames());
DataStream adaptStream = tableEnv.toRetractStream(adaptTable, typeInfo)
.map((Tuple2<Boolean, Row> f0) -> {
return f0.f1;
})
.map((Tuple2<Boolean, Row> f0) -> f0.f1)
.returns(typeInfo);

String fields = String.join(",", typeInfo.getFieldNames());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.commons.lang.StringUtils;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.io.File;
Expand Down Expand Up @@ -102,8 +103,8 @@ public List<String> getProgramExeArgList() throws Exception {
continue;
} else if (OPTION_SQL.equalsIgnoreCase(key)) {
File file = new File(value.toString());
String content = FileUtils.readFile(file, "UTF-8");
value = URLEncoder.encode(content, Charsets.UTF_8.name());
String content = FileUtils.readFile(file, StandardCharsets.UTF_8.name());
value = URLEncoder.encode(content, StandardCharsets.UTF_8.name());
}
args.add("-" + key);
args.add(value.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public class CreateTableParser implements IParser {

private static final Pattern PATTERN = Pattern.compile(PATTERN_STR);

private static final Pattern PROP_PATTERN = Pattern.compile("^'\\s*(.+)\\s*'$");

public static CreateTableParser newInstance(){
return new CreateTableParser();
}
Expand Down Expand Up @@ -69,18 +71,27 @@ public void parseSql(String sql, SqlTree sqlTree) {
}

private Map parseProp(String propsStr){
String[] strs = propsStr.trim().split("'\\s*,");
propsStr = propsStr.replaceAll("'\\s*,", "'|");
String[] strs = propsStr.trim().split("\\|");
Map<String, Object> propMap = Maps.newHashMap();
for(int i=0; i<strs.length; i++){
List<String> ss = DtStringUtil.splitIgnoreQuota(strs[i], '=');
String key = ss.get(0).trim();
String value = ss.get(1).trim().replaceAll("'", "").trim();
String value = extractValue(ss.get(1).trim());
propMap.put(key, value);
}

return propMap;
}

private String extractValue(String value) {
Matcher matcher = PROP_PATTERN.matcher(value);
if (matcher.find()) {
return matcher.group(1);
}
throw new RuntimeException("[" + value + "] format is invalid");
}

public static class SqlParserResult{

private String tableName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,18 @@

package com.dtstack.flink.sql.parser;

import org.apache.calcite.config.Lex;
import org.apache.calcite.sql.SqlBasicCall;
import org.apache.calcite.sql.SqlInsert;
import org.apache.calcite.sql.SqlJoin;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlMatchRecognize;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlOrderBy;
import org.apache.calcite.sql.SqlSelect;
import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.calcite.sql.parser.SqlParser;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.SqlBasicCall;
import org.apache.calcite.sql.SqlMatchRecognize;
import org.apache.calcite.sql.SqlOrderBy;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlAsOperator;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.commons.lang3.StringUtils;
import com.google.common.collect.Lists;
import org.apache.flink.table.calcite.FlinkPlannerImpl;
Expand All @@ -49,6 +50,9 @@

public class InsertSqlParser implements IParser {

// 用来标识当前解析节点的上一层节点是否为 insert 节点
private static Boolean parentIsInsert = false;

@Override
public boolean verify(String sql) {
return StringUtils.isNotBlank(sql) && sql.trim().toLowerCase().startsWith("insert");
Expand Down Expand Up @@ -78,13 +82,19 @@ private static void parseNode(SqlNode sqlNode, SqlParseResult sqlParseResult){
SqlNode sqlTarget = ((SqlInsert)sqlNode).getTargetTable();
SqlNode sqlSource = ((SqlInsert)sqlNode).getSource();
sqlParseResult.addTargetTable(sqlTarget.toString());
parentIsInsert = true;
parseNode(sqlSource, sqlParseResult);
break;
case SELECT:
SqlNode sqlFrom = ((SqlSelect)sqlNode).getFrom();
if(sqlFrom.getKind() == IDENTIFIER){
SqlSelect sqlSelect = (SqlSelect) sqlNode;
if (parentIsInsert) {
rebuildSelectNode(sqlSelect.getSelectList(), sqlSelect);
}
SqlNode sqlFrom = ((SqlSelect) sqlNode).getFrom();
if (sqlFrom.getKind() == IDENTIFIER) {
sqlParseResult.addSourceTable(sqlFrom.toString());
}else{
} else {
parentIsInsert = false;
parseNode(sqlFrom, sqlParseResult);
}
break;
Expand Down Expand Up @@ -141,6 +151,42 @@ private static void parseNode(SqlNode sqlNode, SqlParseResult sqlParseResult){
}
}

/**
* 将第一层 select 中的 sqlNode 转化为 AsNode,解决字段名冲突问题
* @param selectList select Node 的 select 字段
* @param sqlSelect 第一层解析出来的 selectNode
*/
private static void rebuildSelectNode(SqlNodeList selectList, SqlSelect sqlSelect) {
SqlNodeList sqlNodes = new SqlNodeList(selectList.getParserPosition());

for (int index = 0; index < selectList.size(); index++) {
if (selectList.get(index).getKind().equals(SqlKind.AS)) {
sqlNodes.add(selectList.get(index));
continue;
}
sqlNodes.add(transformToAsNode(selectList.get(index)));
}
sqlSelect.setSelectList(sqlNodes);
}

/**
* 将 sqlNode 转化为 AsNode
* @param sqlNode 需要转化的 sqlNode
* @return 重新构造的 AsNode
*/
public static SqlBasicCall transformToAsNode(SqlNode sqlNode) {
String asName = "";
SqlParserPos pos = new SqlParserPos(sqlNode.getParserPosition().getLineNum(),
sqlNode.getParserPosition().getEndColumnNum());
if (sqlNode.getKind().equals(SqlKind.IDENTIFIER)) {
asName = ((SqlIdentifier) sqlNode).names.get(1);
}
SqlNode[] operands = new SqlNode[2];
operands[0] = sqlNode;
operands[1] = new SqlIdentifier(asName, null, pos);
return new SqlBasicCall(new SqlAsOperator(), operands, pos);
}

public static class SqlParseResult {

private List<String> sourceTableList = Lists.newArrayList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public static SqlTree parseSql(String sql) throws Exception {
throw new RuntimeException("need to set local sql plugin root");
}

sql = sql.replaceAll("--.*", "")
sql = DtStringUtil.dealSqlComment(sql)
.replaceAll("\r\n", " ")
.replaceAll("\n", " ")
.replace("\t", " ").trim();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import com.dtstack.flink.sql.enums.ECacheContentType;
import com.dtstack.flink.sql.enums.ECacheType;
import com.dtstack.flink.sql.factory.DTThreadFactory;
import com.dtstack.flink.sql.metric.MetricConstant;
import com.dtstack.flink.sql.side.cache.AbstractSideCache;
import com.dtstack.flink.sql.side.cache.CacheObj;
Expand Down Expand Up @@ -68,6 +69,7 @@ public abstract class BaseAsyncReqRow extends RichAsyncFunction<CRow, CRow> impl
private int timeOutNum = 0;
protected BaseSideInfo sideInfo;
protected transient Counter parseErrorRecords;
private transient ThreadPoolExecutor cancelExecutor;

public BaseAsyncReqRow(BaseSideInfo sideInfo){
this.sideInfo = sideInfo;
Expand All @@ -82,6 +84,8 @@ public void open(Configuration parameters) throws Exception {
super.open(parameters);
initCache();
initMetric();
cancelExecutor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(100000),
new DTThreadFactory("cancel-timer-executor"));
LOG.info("async dim table config info: {} ", sideInfo.getSideTableInfo().toString());
}

Expand Down Expand Up @@ -248,12 +252,11 @@ public void onProcessingTime(long timestamp) throws Exception {
}

protected void cancelTimerWhenComplete(ResultFuture<CRow> resultFuture, ScheduledFuture<?> timerFuture){
ThreadPoolExecutor executors = new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
if(resultFuture instanceof StreamRecordQueueEntry){
StreamRecordQueueEntry streamRecordBufferEntry = (StreamRecordQueueEntry) resultFuture;
streamRecordBufferEntry.onComplete((Object value) -> {
timerFuture.cancel(true);
},executors);
}, cancelExecutor);
}
}

Expand Down
17 changes: 15 additions & 2 deletions core/src/main/java/com/dtstack/flink/sql/side/JoinInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

package com.dtstack.flink.sql.side;

import com.dtstack.flink.sql.util.TableUtils;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Maps;
import org.apache.calcite.sql.JoinType;
Expand Down Expand Up @@ -66,6 +67,8 @@ public class JoinInfo implements Serializable {

private JoinType joinType;

private String scope = "";

/**
* 左表需要查询的字段信息和output的时候对应的列名称
*/
Expand Down Expand Up @@ -96,12 +99,14 @@ public String getNewTableName(){
//兼容左边表是as 的情况
String leftStr = leftTableName;
leftStr = Strings.isNullOrEmpty(leftStr) ? leftTableAlias : leftStr;
return leftStr + "_" + rightTableName;
String newName = leftStr + "_" + rightTableName;
return TableUtils.buildTableNameWithScope(newName, scope);
}


public String getNewTableAlias(){
return leftTableAlias + "_" + rightTableAlias;
String newName = leftTableAlias + "_" + rightTableAlias;
return TableUtils.buildTableNameWithScope(newName, scope);
}

public boolean isLeftIsSideTable() {
Expand Down Expand Up @@ -233,6 +238,14 @@ public HashBasedTable<String, String, String> getTableFieldRef(){
return mappingTable;
}

public String getScope() {
return scope;
}

public void setScope(String scope) {
this.scope = scope;
}

@Override
public String toString() {
return "JoinInfo{" +
Expand Down
Loading

0 comments on commit d6e7274

Please sign in to comment.