Skip to content

Commit

Permalink
Merge branch '1.8_test_3.10.x' into 1.8_release_3.10.x_mergedTest_new
Browse files Browse the repository at this point in the history
# Conflicts:
#	core/src/main/java/com/dtstack/flink/sql/parser/CreateTmpTableParser.java
#	core/src/main/java/com/dtstack/flink/sql/util/ParseUtils.java
  • Loading branch information
xuchao committed May 11, 2020
2 parents 85aae47 + 9fe68ea commit 07c4ccd
Show file tree
Hide file tree
Showing 42 changed files with 143 additions and 178 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ private void loadData(Map<String, List<Map<String, Object>>> tmpCache) throws SQ
String connInfo = "address:" + tableInfo.getAddress() + ";userName:" + tableInfo.getUserName()
+ ",pwd:" + tableInfo.getPassword();
LOG.warn("get conn fail, wait for 5 sec and try again, connInfo:" + connInfo);
Thread.sleep(5 * 1000);
Thread.sleep(LOAD_DATA_ERROR_SLEEP_TIME);
} catch (InterruptedException e1) {
LOG.error("", e1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,13 @@ public JobExecutionResult execute(String jobName) throws Exception {
LOG.info("Running job on local embedded Flink mini cluster");
}

MiniCluster exec = new MiniCluster(configBuilder.build());
try {
try (MiniCluster exec = new MiniCluster(configBuilder.build());) {
exec.start();
return exec.executeJobBlocking(jobGraph);
}
finally {
JobExecutionResult jobExecutionResult = exec.executeJobBlocking(jobGraph);
transformations.clear();
exec.closeAsync();
return jobExecutionResult;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ public static void registerUserDefinedFunction(SqlTree sqlTree, List<URL> jarUrl
*/
public static Set<URL> registerTable(SqlTree sqlTree, StreamExecutionEnvironment env, StreamTableEnvironment tableEnv, String localSqlPluginPath,
String remoteSqlPluginPath, String pluginLoadMode, Map<String, AbstractSideTableInfo> sideTableMap, Map<String, Table> registerTableCache) throws Exception {
Set<URL> pluginClassPatshSets = Sets.newHashSet();
Set<URL> pluginClassPathSets = Sets.newHashSet();
WaterMarkerAssigner waterMarkerAssigner = new WaterMarkerAssigner();
for (AbstractTableInfo tableInfo : sqlTree.getTableInfoMap().values()) {

Expand Down Expand Up @@ -309,26 +309,26 @@ public static Set<URL> registerTable(SqlTree sqlTree, StreamExecutionEnvironment
registerTableCache.put(tableInfo.getName(), regTable);

URL sourceTablePathUrl = PluginUtil.buildSourceAndSinkPathByLoadMode(tableInfo.getType(), AbstractSourceTableInfo.SOURCE_SUFFIX, localSqlPluginPath, remoteSqlPluginPath, pluginLoadMode);
pluginClassPatshSets.add(sourceTablePathUrl);
pluginClassPathSets.add(sourceTablePathUrl);
} else if (tableInfo instanceof AbstractTargetTableInfo) {

TableSink tableSink = StreamSinkFactory.getTableSink((AbstractTargetTableInfo) tableInfo, localSqlPluginPath);
TypeInformation[] flinkTypes = FunctionManager.transformTypes(tableInfo.getFieldClasses());
tableEnv.registerTableSink(tableInfo.getName(), tableInfo.getFields(), flinkTypes, tableSink);

URL sinkTablePathUrl = PluginUtil.buildSourceAndSinkPathByLoadMode(tableInfo.getType(), AbstractTargetTableInfo.TARGET_SUFFIX, localSqlPluginPath, remoteSqlPluginPath, pluginLoadMode);
pluginClassPatshSets.add(sinkTablePathUrl);
pluginClassPathSets.add(sinkTablePathUrl);
} else if (tableInfo instanceof AbstractSideTableInfo) {
String sideOperator = ECacheType.ALL.name().equals(((AbstractSideTableInfo) tableInfo).getCacheType()) ? "all" : "async";
sideTableMap.put(tableInfo.getName(), (AbstractSideTableInfo) tableInfo);

URL sideTablePathUrl = PluginUtil.buildSidePathByLoadMode(tableInfo.getType(), sideOperator, AbstractSideTableInfo.TARGET_SUFFIX, localSqlPluginPath, remoteSqlPluginPath, pluginLoadMode);
pluginClassPatshSets.add(sideTablePathUrl);
pluginClassPathSets.add(sideTablePathUrl);
} else {
throw new RuntimeException("not support table type:" + tableInfo.getType());
}
}
return pluginClassPatshSets;
return pluginClassPathSets;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.MeterView;
import org.apache.flink.table.runtime.types.CRow;
import org.apache.flink.types.Row;


/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@
import java.util.List;
import java.util.Map;
import java.io.File;
import java.io.FileInputStream;
import java.net.URLEncoder;
import java.util.stream.Stream;

import org.apache.commons.codec.Charsets;
import org.apache.flink.util.FileUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,14 @@
package com.dtstack.flink.sql.parser;

import com.dtstack.flink.sql.util.DtStringUtil;
import org.apache.calcite.sql.*;
import org.apache.calcite.config.Lex;
import org.apache.calcite.sql.SqlBasicCall;
import org.apache.calcite.sql.SqlJoin;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlSelect;
import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.calcite.sql.parser.SqlParser;
import com.google.common.collect.Lists;
import org.apache.flink.table.calcite.FlinkPlannerImpl;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,14 @@
package com.dtstack.flink.sql.parser;

import org.apache.calcite.config.Lex;
import org.apache.calcite.sql.*;
import org.apache.calcite.sql.SqlBasicCall;
import org.apache.calcite.sql.SqlInsert;
import org.apache.calcite.sql.SqlJoin;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlMatchRecognize;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlOrderBy;
import org.apache.calcite.sql.SqlSelect;
import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.calcite.sql.parser.SqlParser;
import org.apache.commons.lang3.StringUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,11 @@ public abstract class AbstractSideTableInfo extends AbstractTableInfo implements

public static final String ASYNC_REQ_POOL_KEY = "asyncPoolSize";

private String cacheType = "none";//None or LRU or ALL
private String cacheType = "none";

private int cacheSize = 10000;

private long cacheTimeout = 60 * 1000;//
private long cacheTimeout = 60_000L;

private int asyncCapacity=100;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public abstract class BaseAllReqRow extends RichFlatMapFunction<CRow, CRow> impl

private static final Logger LOG = LoggerFactory.getLogger(BaseAllReqRow.class);

public static final long LOAD_DATA_ERROR_SLEEP_TIME = 5_000L;

protected BaseSideInfo sideInfo;

private ScheduledExecutorService es;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import com.google.common.base.Strings;

import java.io.Serializable;
import java.util.Map;

/**
* Join信息
Expand Down
Loading

0 comments on commit 07c4ccd

Please sign in to comment.