Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/v1.8.0_dev' into feat_1.8_asyncE…
Browse files Browse the repository at this point in the history
…xception_mergeDev

# Conflicts:
#	cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncReqRow.java
#	core/src/main/java/com/dtstack/flink/sql/side/AbstractSideTableInfo.java
#	core/src/main/java/com/dtstack/flink/sql/side/BaseAsyncReqRow.java
#	core/src/main/java/com/dtstack/flink/sql/table/AbstractSideTableParser.java
#	rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncReqRow.java
#	redis5/redis5-side/redis-async-side/src/main/java/com/dtstack/flink/sql/side/redis/RedisAsyncReqRow.java
  • Loading branch information
dapeng committed Apr 8, 2020
2 parents caa8824 + fe59047 commit f05e92b
Show file tree
Hide file tree
Showing 119 changed files with 3,027 additions and 1,827 deletions.
10 changes: 10 additions & 0 deletions .gitlab-ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
build:
stage: test
script:
- mvn clean org.jacoco:jacoco-maven-plugin:0.7.8:prepare-agent package -Dmaven.test.failure.ignore=true -q
- mvn sonar:sonar -Dsonar.projectKey="dt-insight-engine/flinkStreamSQL" -Dsonar.login=11974c5e9a29625efa09fdc3c3fdc031efb1aab1 -Dsonar.host.url=http://172.16.100.198:9000 -Dsonar.jdbc.url=jdbc:postgresql://172.16.100.198:5432/sonar -Dsonar.java.binaries=target/sonar
- sh ci/sonar_notify.sh
only:
- v1.8.0_dev
tags:
- dt-insight-engine
24 changes: 1 addition & 23 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,34 +7,11 @@
> > * 支持原生FLinkSQL所有的语法
> > * 扩展了输入和输出的性能指标到promethus
## 新特性:
* 1.kafka源表支持not null语法,支持字符串类型的时间转换。
* 2.rdb维表与DB建立连接时,周期进行连接,防止连接断开。rdbsink写入时,对连接进行检查。
* 3.异步维表支持非等值连接,比如:<>,<,>。
* 4.增加kafka数组解析
* 5.增加kafka1.0以上版本的支持
* 6.增加postgresql、kudu、clickhouse维表、结果表的支持
* 7.支持插件的依赖方式,参考pluginLoadMode参数
* 8.支持cep处理
* 9.支持udaf
* 10.支持谓词下移
* 11.支持状态的ttl

## BUG修复:
* 1.修复不能解析sql中orderby,union语法。
* 2.修复yarnPer模式提交失败的异常。
* 3.一些bug的修复

# 已支持
* 源表:kafka 0.9、0.10、0.11、1.x版本
* 维表:mysql, SQlServer,oracle, hbase, mongo, redis, cassandra, serversocket, kudu, postgresql, clickhouse, impala, db2, sqlserver
* 结果表:mysql, SQlServer, oracle, hbase, elasticsearch5.x, mongo, redis, cassandra, console, kudu, postgresql, clickhouse, impala, db2, sqlserver

# 后续开发计划
* 维表快照
* kafka avro格式
* topN

## 1 快速起步
### 1.1 运行模式

Expand Down Expand Up @@ -205,6 +182,7 @@ sh submit.sh -sql D:\sideSql.txt -name xctest -remoteSqlPluginPath /opt/dtstack
* [impala 结果表插件](docs/impalaSink.md)
* [db2 结果表插件](docs/db2Sink.md)
* [sqlserver 结果表插件](docs/sqlserverSink.md)
* [kafka 结果表插件](docs/kafkaSink.md)

### 2.3 维表插件
* [hbase 维表插件](docs/hbaseSide.md)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ private void loadData(Map<String, List<Map<String, Object>>> tmpCache) throws SQ
String connInfo = "address:" + tableInfo.getAddress() + ";userName:" + tableInfo.getUserName()
+ ",pwd:" + tableInfo.getPassword();
LOG.warn("get conn fail, wait for 5 sec and try again, connInfo:" + connInfo);
Thread.sleep(5 * 1000);
Thread.sleep(LOAD_DATA_ERROR_SLEEP_TIME);
} catch (InterruptedException e1) {
LOG.error("", e1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ public void handleAsyncInvoke(Map<String, Object> inputParams, CRow input, Resul
connCassandraDB(cassandraSideTableInfo);

String sqlCondition = sideInfo.getSqlCondition() + " " + buildWhereCondition(inputParams) + " ALLOW FILTERING ";
LOG.info("sqlCondition:{}" + sqlCondition);

ListenableFuture<ResultSet> resultSet = Futures.transformAsync(session,
new AsyncFunction<Session, ResultSet>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.apache.calcite.sql.SqlNode;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

Expand All @@ -42,6 +44,8 @@
public class CassandraAsyncSideInfo extends BaseSideInfo {

private static final long serialVersionUID = -4403313049809013362L;
private static final Logger LOG = LoggerFactory.getLogger(CassandraAsyncSideInfo.class.getSimpleName());


public CassandraAsyncSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, AbstractSideTableInfo sideTableInfo) {
super(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
Expand All @@ -63,9 +67,9 @@ public void buildEqualInfo(JoinInfo joinInfo, AbstractSideTableInfo sideTableInf
}

sqlCondition = "select ${selectField} from ${tableName}";

sqlCondition = sqlCondition.replace("${tableName}", cassandraSideTableInfo.getDatabase()+"."+cassandraSideTableInfo.getTableName()).replace("${selectField}", sideSelectFields);
System.out.println("---------side_exe_sql-----\n" + sqlCondition);

LOG.info("---------side_exe_sql-----\n{}" + sqlCondition);
}


Expand Down
14 changes: 14 additions & 0 deletions ci/sonar_notify.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#!/bin/bash
#参考钉钉文档 https://open-doc.dingtalk.com/microapp/serverapi2/qf2nxq
sonarreport=$(curl -s http://172.16.100.198:8082/?projectname=dt-insight-engine/flinkStreamSQL)
curl -s "https://oapi.dingtalk.com/robot/send?access_token=71555061297a53d3ac922a6f4d94285d8e23bccdca0c00b4dc6df0a2d49da724" \
-H "Content-Type: application/json" \
-d "{
\"msgtype\": \"markdown\",
\"markdown\": {
\"title\":\"sonar代码质量\",
\"text\": \"## sonar代码质量报告: \n
> [sonar地址](http://172.16.100.198:9000/dashboard?id=dt-insight-engine/flinkStreamSQL) \n
> ${sonarreport} \n\"
}
}"
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,18 @@ public void open(Configuration parameters) throws Exception {
RdbSideTableInfo rdbSideTableInfo = (RdbSideTableInfo) sideInfo.getSideTableInfo();
clickhouseClientConfig.put("url", rdbSideTableInfo.getUrl())
.put("driver_class", CLICKHOUSE_DRIVER)
.put("max_pool_size", DEFAULT_MAX_DB_CONN_POOL_SIZE)
.put("max_pool_size", rdbSideTableInfo.getAsyncPoolSize())
.put("user", rdbSideTableInfo.getUserName())
.put("password", rdbSideTableInfo.getPassword())
.put("provider_class", DT_PROVIDER_CLASS);
.put("provider_class", DT_PROVIDER_CLASS)
.put("preferred_test_query", PREFERRED_TEST_QUERY_SQL)
.put("idle_connection_test_period", DEFAULT_IDLE_CONNECTION_TEST_PEROID)
.put("test_connection_on_checkin", DEFAULT_TEST_CONNECTION_ON_CHECKIN);

System.setProperty("vertx.disableFileCPResolving", "true");
VertxOptions vo = new VertxOptions();
vo.setEventLoopPoolSize(DEFAULT_VERTX_EVENT_LOOP_POOL_SIZE);
vo.setWorkerPoolSize(DEFAULT_VERTX_WORKER_POOL_SIZE);
vo.setEventLoopPoolSize(rdbSideTableInfo.getAsyncPoolSize());
vo.setWorkerPoolSize(rdbSideTableInfo.getAsyncPoolSize());
vo.setFileResolverCachingEnabled(false);
Vertx vertx = Vertx.vertx(vo);
setRdbSqlClient(JDBCClient.createNonShared(vertx, clickhouseClientConfig));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public void open(int taskNumber, int numTasks) throws IOException {

@Override
public void writeRecord(Tuple2 tuple2) throws IOException {
System.out.println("received oriainal data:" + tuple2);
LOG.info("received oriainal data:{}" + tuple2);
Tuple2<Boolean, Row> tupleTrans = tuple2;
Boolean retract = tupleTrans.getField(0);
if (!retract) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public static <T> TablePrintUtil build(List<T> data) {
try {
value = obj.getClass().getMethod(colList.get(j).getMethodName).invoke(data.get(i)).toString();
} catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
e.printStackTrace();
LOG.error("", e);
}
item[j] = value == null ? "null" : value;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,13 @@ public JobExecutionResult execute(String jobName) throws Exception {
LOG.info("Running job on local embedded Flink mini cluster");
}

MiniCluster exec = new MiniCluster(configBuilder.build());
try {
try (MiniCluster exec = new MiniCluster(configBuilder.build());) {
exec.start();
return exec.executeJobBlocking(jobGraph);
}
finally {
JobExecutionResult jobExecutionResult = exec.executeJobBlocking(jobGraph);
transformations.clear();
exec.closeAsync();
return jobExecutionResult;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,8 @@ public class ExecuteProcessHelper {

public static ParamsInfo parseParams(String[] args) throws Exception {
LOG.info("------------program params-------------------------");
System.out.println("------------program params-------------------------");
Arrays.stream(args).forEach(arg -> LOG.info("{}", arg));
Arrays.stream(args).forEach(System.out::println);
LOG.info("-------------------------------------------");
System.out.println("----------------------------------------");

OptionParser optionParser = new OptionParser(args);
Options options = optionParser.getOptions();
Expand Down Expand Up @@ -206,7 +203,6 @@ private static void sqlTranslation(String localSqlPluginPath,
if (LOG.isInfoEnabled()) {
LOG.info("exe-sql:\n" + result.getExecSql());
}

boolean isSide = false;
for (String tableName : result.getTargetTableList()) {
if (sqlTree.getTmpTableMap().containsKey(tableName)) {
Expand All @@ -228,12 +224,10 @@ private static void sqlTranslation(String localSqlPluginPath,
//sql-dimensional table contains the dimension table of execution
sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache, queryConfig, null);
} else {
System.out.println("----------exec sql without dimension join-----------");
System.out.println("----------real sql exec is--------------------------");
System.out.println(result.getExecSql());
LOG.info("----------exec sql without dimension join-----------");
LOG.info("----------real sql exec is--------------------------\n{}", result.getExecSql());
FlinkSQLExec.sqlUpdate(tableEnv, result.getExecSql(), queryConfig);
if (LOG.isInfoEnabled()) {
System.out.println();
LOG.info("exec sql: " + result.getExecSql());
}
}
Expand Down Expand Up @@ -355,6 +349,7 @@ public static StreamExecutionEnvironment getStreamExeEnv(Properties confProperti
return env;
}


public static void setLogLevel(ParamsInfo paramsInfo){
String logLevel = paramsInfo.getConfProp().getProperty(ConfigConstrant.LOG_LEVEL_KEY);
if(org.apache.commons.lang3.StringUtils.isBlank(logLevel)){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.MeterView;
import org.apache.flink.types.Row;
import org.apache.flink.table.runtime.types.CRow;


/**
Expand All @@ -34,11 +34,11 @@
* author: toutian
* create: 2019/12/24
*/
public class SerializationMetricWrapper implements SerializationSchema<Row> {
public class SerializationMetricWrapper implements SerializationSchema<CRow> {

private static final long serialVersionUID = 1L;

private SerializationSchema<Row> serializationSchema;
private SerializationSchema<CRow> serializationSchema;

private transient RuntimeContext runtimeContext;

Expand All @@ -47,7 +47,7 @@ public class SerializationMetricWrapper implements SerializationSchema<Row> {
protected transient Meter dtNumRecordsOutRate;


public SerializationMetricWrapper(SerializationSchema<Row> serializationSchema) {
public SerializationMetricWrapper(SerializationSchema<CRow> serializationSchema) {
this.serializationSchema = serializationSchema;
}

Expand All @@ -57,7 +57,7 @@ public void initMetric() {
}

@Override
public byte[] serialize(Row element) {
public byte[] serialize(CRow element) {
beforeSerialize();
byte[] row = serializationSchema.serialize(element);
afterSerialize();
Expand All @@ -79,7 +79,7 @@ public void setRuntimeContext(RuntimeContext runtimeContext) {
this.runtimeContext = runtimeContext;
}

public SerializationSchema<Row> getSerializationSchema() {
public SerializationSchema<CRow> getSerializationSchema() {
return serializationSchema;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.*;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.NullNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.types.Row;

import java.io.IOException;
Expand Down
21 changes: 7 additions & 14 deletions core/src/main/java/com/dtstack/flink/sql/option/OptionParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@
import java.util.List;
import java.util.Map;
import java.io.File;
import java.io.FileInputStream;
import java.net.URLEncoder;

import org.apache.commons.codec.Charsets;
import org.apache.flink.util.FileUtils;


/**
Expand Down Expand Up @@ -92,29 +93,21 @@ public Options getOptions(){
}

public List<String> getProgramExeArgList() throws Exception {
Map<String,Object> mapConf = PluginUtil.objectToMap(properties);
Map<String, Object> mapConf = PluginUtil.objectToMap(properties);
List<String> args = Lists.newArrayList();
for(Map.Entry<String, Object> one : mapConf.entrySet()){
for (Map.Entry<String, Object> one : mapConf.entrySet()) {
String key = one.getKey();
Object value = one.getValue();
if(value == null){
if (value == null) {
continue;
}else if(OPTION_SQL.equalsIgnoreCase(key)){
} else if (OPTION_SQL.equalsIgnoreCase(key)) {
File file = new File(value.toString());
FileInputStream in = new FileInputStream(file);
byte[] filecontent = new byte[(int) file.length()];
in.read(filecontent);
String content = new String(filecontent, Charsets.UTF_8.name());
String content = FileUtils.readFile(file, "UTF-8");
value = URLEncoder.encode(content, Charsets.UTF_8.name());
}
args.add("-" + key);
args.add(value.toString());
}
return args;
}

public static void main(String[] args) throws Exception {
OptionParser optionParser = new OptionParser(args);
System.out.println(optionParser.getOptions());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@

import com.dtstack.flink.sql.util.DtStringUtil;
import org.apache.calcite.config.Lex;
import org.apache.calcite.sql.*;
import org.apache.calcite.sql.SqlBasicCall;
import org.apache.calcite.sql.SqlJoin;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlSelect;
import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.calcite.sql.parser.SqlParser;
import com.google.common.collect.Lists;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,14 @@
package com.dtstack.flink.sql.parser;

import org.apache.calcite.config.Lex;
import org.apache.calcite.sql.*;
import org.apache.calcite.sql.SqlBasicCall;
import org.apache.calcite.sql.SqlInsert;
import org.apache.calcite.sql.SqlJoin;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlMatchRecognize;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlOrderBy;
import org.apache.calcite.sql.SqlSelect;
import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.calcite.sql.parser.SqlParser;
import org.apache.commons.lang3.StringUtils;
Expand Down
Loading

0 comments on commit f05e92b

Please sign in to comment.