Skip to content

Commit

Permalink
fix kudu and async quest
Browse files Browse the repository at this point in the history
  • Loading branch information
dapeng committed May 15, 2020
1 parent bbd8cf9 commit 7abe4df
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
import com.dtstack.flink.sql.side.JoinInfo;
import com.dtstack.flink.sql.side.BaseSideInfo;
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
import com.dtstack.flink.sql.side.kudu.table.KuduSideTableInfo;
import com.dtstack.flink.sql.util.ParseUtils;
import com.google.common.collect.Lists;
import org.apache.calcite.sql.SqlBasicCall;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlKind;
Expand All @@ -21,6 +24,22 @@ public KuduAsyncSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldI

@Override
public void buildEqualInfo(JoinInfo joinInfo, AbstractSideTableInfo sideTableInfo) {
KuduSideTableInfo kuduSideTableInfo = (KuduSideTableInfo) sideTableInfo;

String sideTableName = joinInfo.getSideTableName();

SqlNode conditionNode = joinInfo.getCondition();

List<SqlNode> sqlNodeList = Lists.newArrayList();
ParseUtils.parseAnd(conditionNode, sqlNodeList);

for (SqlNode sqlNode : sqlNodeList) {
dealOneEqualCon(sqlNode, sideTableName);
}

sqlCondition = "select ${selectField} from ${tableName} ";
sqlCondition = sqlCondition.replace("${tableName}", kuduSideTableInfo.getTableName()).replace("${selectField}", sideSelectFields);
System.out.println("---------side_exe_sql-----\n" + sqlCondition);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,13 +155,13 @@ private void connectWithRetry(Map<String, Object> inputParams, CRow input, Resul
}
});
try {
latch.wait();
latch.await();
} catch (InterruptedException e) {
LOG.error("", e);
}
if(!finishFlag.get()){
try {
Thread.sleep(100);
Thread.sleep(3000);
} catch (Exception e){
LOG.error("", e);
}
Expand Down

0 comments on commit 7abe4df

Please sign in to comment.