Skip to content

Commit

Permalink
Merge branch '1.8_release_3.9.x' into 1.8_release_3.10.x
Browse files Browse the repository at this point in the history
  • Loading branch information
gituser committed Mar 3, 2020
2 parents 0dbf0d4 + 93f6b7d commit a83e544
Show file tree
Hide file tree
Showing 12 changed files with 1,467 additions and 586 deletions.
10 changes: 7 additions & 3 deletions core/src/main/java/com/dtstack/flink/sql/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ private static void sqlTranslation(String localSqlPluginPath, StreamTableEnviron
SideSqlExec sideSqlExec = new SideSqlExec();
sideSqlExec.setLocalSqlPluginPath(localSqlPluginPath);
for (CreateTmpTableParser.SqlParserResult result : sqlTree.getTmpSqlList()) {
sideSqlExec.registerTmpTable(result, sideTableMap, tableEnv, registerTableCache);
sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache, queryConfig, result);
}

for (InsertSqlParser.SqlParseResult result : sqlTree.getExecSqlList()) {
Expand All @@ -169,7 +169,7 @@ private static void sqlTranslation(String localSqlPluginPath, StreamTableEnviron
SqlNode sqlNode = org.apache.calcite.sql.parser.SqlParser.create(realSql, CalciteConfig.MYSQL_LEX_CONFIG).parseStmt();
String tmpSql = ((SqlInsert) sqlNode).getSource().toString();
tmp.setExecSql(tmpSql);
sideSqlExec.registerTmpTable(tmp, sideTableMap, tableEnv, registerTableCache);
sideSqlExec.exec(tmp.getExecSql(), sideTableMap, tableEnv, registerTableCache, queryConfig, tmp);
} else {
for (String sourceTable : result.getSourceTableList()) {
if (sideTableMap.containsKey(sourceTable)) {
Expand All @@ -179,10 +179,14 @@ private static void sqlTranslation(String localSqlPluginPath, StreamTableEnviron
}
if (isSide) {
//sql-dimensional table contains the dimension table of execution
sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache, queryConfig);
sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache, queryConfig, null);
}else{
System.out.println("----------exec sql without dimension join-----------" );
System.out.println("----------real sql exec is--------------------------");
System.out.println(result.getExecSql());
FlinkSQLExec.sqlUpdate(tableEnv, result.getExecSql(), queryConfig);
if(LOG.isInfoEnabled()){
System.out.println();
LOG.info("exec sql: " + result.getExecSql());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@
package com.dtstack.flink.sql.side;

import com.google.common.collect.HashBasedTable;
import org.apache.commons.lang3.StringUtils;

/**
* Reason:
* 用于记录转换之后的表和原来表直接字段的关联关系
* Date: 2018/8/30
* Company: www.dtstack.com
* @author xuchao
Expand All @@ -37,6 +38,8 @@ public class FieldReplaceInfo {

private String targetTableAlias = null;

private FieldReplaceInfo preNode = null;

public void setMappingTable(HashBasedTable<String, String, String> mappingTable) {
this.mappingTable = mappingTable;
}
Expand All @@ -57,7 +60,39 @@ public String getTargetTableAlias() {
return targetTableAlias;
}

public FieldReplaceInfo getPreNode() {
return preNode;
}

public void setPreNode(FieldReplaceInfo preNode) {
this.preNode = preNode;
}

public void setTargetTableAlias(String targetTableAlias) {
this.targetTableAlias = targetTableAlias;
}

/**
* 根据原始的tableName + fieldName 获取转换之后的fieldName
* @param tableName
* @param fieldName
* @return
*/
public String getTargetFieldName(String tableName, String fieldName){
String targetFieldName = mappingTable.get(tableName, fieldName);
if(StringUtils.isNotBlank(targetFieldName)){
return targetFieldName;
}

if(preNode == null){
return null;
}

String preNodeTargetFieldName = preNode.getTargetFieldName(tableName, fieldName);
if(StringUtils.isBlank(preNodeTargetFieldName)){
return null;
}

return mappingTable.get(preNode.getTargetTableAlias(), preNodeTargetFieldName);
}
}
46 changes: 27 additions & 19 deletions core/src/main/java/com/dtstack/flink/sql/side/JoinInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ public class JoinInfo implements Serializable {

//左表是否是维表
private boolean leftIsSideTable;
//左表是 转换后的中间表
private boolean leftIsMidTable;

private boolean leftIsTmpTable = false;

//右表是否是维表
private boolean rightIsSideTable;
Expand All @@ -66,8 +66,6 @@ public class JoinInfo implements Serializable {
private SqlNode selectNode;

private JoinType joinType;
// 左边是中间转换表,做表映射关系,给替换属性名称使用
private Map<String, String> leftTabMapping;

public String getSideTableName(){
if(leftIsSideTable){
Expand All @@ -92,21 +90,6 @@ public String getNewTableName(){
return leftStr + "_" + rightTableName;
}

public boolean isLeftIsMidTable() {
return leftIsMidTable;
}

public void setLeftIsMidTable(boolean leftIsMidTable) {
this.leftIsMidTable = leftIsMidTable;
}

public Map<String, String> getLeftTabMapping() {
return leftTabMapping;
}

public void setLeftTabMapping(Map<String, String> leftTabMapping) {
this.leftTabMapping = leftTabMapping;
}

public String getNewTableAlias(){
return leftTableAlias + "_" + rightTableAlias;
Expand Down Expand Up @@ -211,4 +194,29 @@ public JoinType getJoinType() {
public void setJoinType(JoinType joinType) {
this.joinType = joinType;
}

public boolean isLeftIsTmpTable() {
return leftIsTmpTable;
}

public void setLeftIsTmpTable(boolean leftIsTmpTable) {
this.leftIsTmpTable = leftIsTmpTable;
}

@Override
public String toString() {
return "JoinInfo{" +
"leftIsSideTable=" + leftIsSideTable +
", leftIsTmpTable=" + leftIsTmpTable +
", rightIsSideTable=" + rightIsSideTable +
", leftTableName='" + leftTableName + '\'' +
", leftTableAlias='" + leftTableAlias + '\'' +
", rightTableName='" + rightTableName + '\'' +
", rightTableAlias='" + rightTableAlias + '\'' +
", condition=" + condition +
", selectFields=" + selectFields +
", selectNode=" + selectNode +
", joinType=" + joinType +
'}';
}
}
Loading

0 comments on commit a83e544

Please sign in to comment.