Skip to content

Commit

Permalink
回滚 5f30a74 ,解决 '回滚导致的test分支变更内容'无法提交的问题
Browse files Browse the repository at this point in the history
  • Loading branch information
xuchao committed May 11, 2020
1 parent b59ef71 commit 85aae47
Show file tree
Hide file tree
Showing 256 changed files with 3,487 additions and 2,621 deletions.
10 changes: 10 additions & 0 deletions .gitlab-ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
build:
stage: test
script:
- mvn clean org.jacoco:jacoco-maven-plugin:0.7.8:prepare-agent package -Dmaven.test.failure.ignore=true -q
- mvn sonar:sonar -Dsonar.projectKey="dt-insight-engine/flinkStreamSQL" -Dsonar.branch.name="v1.8.0_dev" -Dsonar.login=11974c5e9a29625efa09fdc3c3fdc031efb1aab1 -Dsonar.host.url=http://172.16.100.198:9000 -Dsonar.jdbc.url=jdbc:postgresql://172.16.100.198:5432/sonar -Dsonar.java.binaries=target/sonar
- sh ci/sonar_notify.sh
only:
- v1.8.0_dev
tags:
- dt-insight-engine
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@
import com.datastax.driver.core.SocketOptions;
import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
import com.datastax.driver.core.policies.RetryPolicy;
import com.dtstack.flink.sql.side.AllReqRow;
import com.dtstack.flink.sql.side.BaseAllReqRow;
import com.dtstack.flink.sql.side.FieldInfo;
import com.dtstack.flink.sql.side.JoinInfo;
import com.dtstack.flink.sql.side.SideTableInfo;
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
import com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
Expand All @@ -62,14 +62,12 @@
*
* @author xuqianjin
*/
public class CassandraAllReqRow extends AllReqRow {
public class CassandraAllReqRow extends BaseAllReqRow {

private static final long serialVersionUID = 54015343561288219L;

private static final Logger LOG = LoggerFactory.getLogger(CassandraAllReqRow.class);

private static final String cassandra_DRIVER = "com.cassandra.jdbc.Driver";

private static final int CONN_RETRY_NUM = 3;

private static final int FETCH_SIZE = 1000;
Expand All @@ -79,7 +77,7 @@ public class CassandraAllReqRow extends AllReqRow {

private AtomicReference<Map<String, List<Map<String, Object>>>> cacheRef = new AtomicReference<>();

public CassandraAllReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) {
public CassandraAllReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, AbstractSideTableInfo sideTableInfo) {
super(new com.dtstack.flink.sql.side.cassandra.CassandraAllSideInfo(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

import com.dtstack.flink.sql.side.FieldInfo;
import com.dtstack.flink.sql.side.JoinInfo;
import com.dtstack.flink.sql.side.SideInfo;
import com.dtstack.flink.sql.side.SideTableInfo;
import com.dtstack.flink.sql.side.BaseSideInfo;
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
import com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo;
import com.dtstack.flink.sql.util.ParseUtils;
import org.apache.calcite.sql.SqlNode;
Expand All @@ -37,16 +37,16 @@
*
* @author xuqianjin
*/
public class CassandraAllSideInfo extends SideInfo {
public class CassandraAllSideInfo extends BaseSideInfo {

private static final long serialVersionUID = -8690814317653033557L;

public CassandraAllSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) {
public CassandraAllSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, AbstractSideTableInfo sideTableInfo) {
super(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
}

@Override
public void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo) {
public void buildEqualInfo(JoinInfo joinInfo, AbstractSideTableInfo sideTableInfo) {
CassandraSideTableInfo cassandraSideTableInfo = (CassandraSideTableInfo) sideTableInfo;

sqlCondition = "select ${selectField} from ${tableName} ";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@
import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
import com.datastax.driver.core.policies.RetryPolicy;
import com.dtstack.flink.sql.enums.ECacheContentType;
import com.dtstack.flink.sql.side.AsyncReqRow;
import com.dtstack.flink.sql.side.BaseAsyncReqRow;
import com.dtstack.flink.sql.side.CacheMissVal;
import com.dtstack.flink.sql.side.FieldInfo;
import com.dtstack.flink.sql.side.JoinInfo;
import com.dtstack.flink.sql.side.SideTableInfo;
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
import com.dtstack.flink.sql.side.cache.CacheObj;
import com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo;
import com.google.common.base.Function;
Expand All @@ -67,7 +67,7 @@
*
* @author xuqianjin
*/
public class CassandraAsyncReqRow extends AsyncReqRow {
public class CassandraAsyncReqRow extends BaseAsyncReqRow {

private static final long serialVersionUID = 6631584128079864735L;

Expand All @@ -83,7 +83,7 @@ public class CassandraAsyncReqRow extends AsyncReqRow {
private transient ListenableFuture session;
private transient CassandraSideTableInfo cassandraSideTableInfo;

public CassandraAsyncReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) {
public CassandraAsyncReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, AbstractSideTableInfo sideTableInfo) {
super(new com.dtstack.flink.sql.side.cassandra.CassandraAsyncSideInfo(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));
}

Expand Down Expand Up @@ -216,7 +216,7 @@ public void asyncInvoke(CRow input, ResultFuture<CRow> resultFuture) throws Exce
connCassandraDB(cassandraSideTableInfo);

String sqlCondition = sideInfo.getSqlCondition() + " " + sqlWhere + " ALLOW FILTERING ";
System.out.println("sqlCondition:" + sqlCondition);
LOG.info("sqlCondition:{}" + sqlCondition);

ListenableFuture<ResultSet> resultSet = Futures.transformAsync(session,
new AsyncFunction<Session, ResultSet>() {
Expand Down Expand Up @@ -265,7 +265,6 @@ public void onSuccess(List<com.datastax.driver.core.Row> rows) {
public void onFailure(Throwable t) {
LOG.error("Failed to retrieve the data: %s%n",
t.getMessage());
System.out.println("Failed to retrieve the data: " + t.getMessage());
cluster.closeAsync();
resultFuture.completeExceptionally(t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

import com.dtstack.flink.sql.side.FieldInfo;
import com.dtstack.flink.sql.side.JoinInfo;
import com.dtstack.flink.sql.side.SideInfo;
import com.dtstack.flink.sql.side.SideTableInfo;
import com.dtstack.flink.sql.side.BaseSideInfo;
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
import com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo;
import com.dtstack.flink.sql.util.ParseUtils;
import org.apache.calcite.sql.SqlBasicCall;
Expand All @@ -30,6 +30,8 @@
import org.apache.calcite.sql.SqlNode;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

Expand All @@ -39,16 +41,18 @@
*
* @author xuqianjin
*/
public class CassandraAsyncSideInfo extends SideInfo {
public class CassandraAsyncSideInfo extends BaseSideInfo {

private static final long serialVersionUID = -4403313049809013362L;
private static final Logger LOG = LoggerFactory.getLogger(CassandraAsyncSideInfo.class.getSimpleName());

public CassandraAsyncSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) {

public CassandraAsyncSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, AbstractSideTableInfo sideTableInfo) {
super(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
}

@Override
public void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo) {
public void buildEqualInfo(JoinInfo joinInfo, AbstractSideTableInfo sideTableInfo) {
CassandraSideTableInfo cassandraSideTableInfo = (CassandraSideTableInfo) sideTableInfo;

String sideTableName = joinInfo.getSideTableName();
Expand All @@ -63,9 +67,9 @@ public void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo) {
}

sqlCondition = "select ${selectField} from ${tableName}";

sqlCondition = sqlCondition.replace("${tableName}", cassandraSideTableInfo.getDatabase()+"."+cassandraSideTableInfo.getTableName()).replace("${selectField}", sideSelectFields);
System.out.println("---------side_exe_sql-----\n" + sqlCondition);

LOG.info("---------side_exe_sql-----\n{}" + sqlCondition);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,24 @@

package com.dtstack.flink.sql.side.cassandra.table;

import com.dtstack.flink.sql.table.AbsSideTableParser;
import com.dtstack.flink.sql.table.TableInfo;
import com.dtstack.flink.sql.table.AbstractSideTableParser;
import com.dtstack.flink.sql.table.AbstractTableInfo;
import com.dtstack.flink.sql.util.MathUtil;

import java.math.BigDecimal;
import java.sql.Date;
import java.sql.Timestamp;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static com.dtstack.flink.sql.table.TableInfo.PARALLELISM_KEY;
import static com.dtstack.flink.sql.table.AbstractTableInfo.PARALLELISM_KEY;

/**
* Reason:
* Date: 2018/11/22
*
* @author xuqianjin
*/
public class CassandraSideParser extends AbsSideTableParser {
public class CassandraSideParser extends AbstractSideTableParser {

private final static String SIDE_SIGN_KEY = "sideSignKey";

Expand Down Expand Up @@ -73,7 +71,7 @@ public CassandraSideParser() {
}

@Override
public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, Object> props) {
public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, Object> props) {
com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo cassandraSideTableInfo = new com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo();
cassandraSideTableInfo.setName(tableName);
parseFieldsInfo(fieldsInfo, cassandraSideTableInfo);
Expand All @@ -96,9 +94,10 @@ public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, O
return cassandraSideTableInfo;
}

private void dealSideSign(Matcher matcher, TableInfo tableInfo) {
private void dealSideSign(Matcher matcher, AbstractTableInfo tableInfo) {
}

@Override
public Class dbTypeConvertToJavaType(String fieldType) {
switch (fieldType.toLowerCase()) {
case "bigint":
Expand All @@ -121,6 +120,8 @@ public Class dbTypeConvertToJavaType(String fieldType) {
return Double.class;
case "timestamp":
return Timestamp.class;
default:
break;
}

throw new RuntimeException("不支持 " + fieldType + " 类型");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

package com.dtstack.flink.sql.side.cassandra.table;

import com.dtstack.flink.sql.side.SideTableInfo;
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
import com.google.common.base.Preconditions;

/**
Expand All @@ -28,7 +28,7 @@
*
* @author xuqianjin
*/
public class CassandraSideTableInfo extends SideTableInfo {
public class CassandraSideTableInfo extends AbstractSideTableInfo {

private static final long serialVersionUID = -5556431094535478915L;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
import com.datastax.driver.core.SocketOptions;
import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
import com.datastax.driver.core.policies.RetryPolicy;
import com.dtstack.flink.sql.outputformat.DtRichOutputFormat;
import com.dtstack.flink.sql.outputformat.AbstractDtRichOutputFormat;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -72,7 +72,7 @@
* @see Tuple
* @see DriverManager
*/
public class CassandraOutputFormat extends DtRichOutputFormat<Tuple2> {
public class CassandraOutputFormat extends AbstractDtRichOutputFormat<Tuple2> {
private static final long serialVersionUID = -7994311331389155692L;

private static final Logger LOG = LoggerFactory.getLogger(CassandraOutputFormat.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

import com.dtstack.flink.sql.sink.IStreamSinkGener;
import com.dtstack.flink.sql.sink.cassandra.table.CassandraTableInfo;
import com.dtstack.flink.sql.table.TargetTableInfo;
import com.dtstack.flink.sql.table.AbstractTargetTableInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
Expand Down Expand Up @@ -63,7 +63,7 @@ public CassandraSink() {
}

@Override
public CassandraSink genStreamSink(TargetTableInfo targetTableInfo) {
public CassandraSink genStreamSink(AbstractTargetTableInfo targetTableInfo) {
CassandraTableInfo cassandraTableInfo = (CassandraTableInfo) targetTableInfo;
this.address = cassandraTableInfo.getAddress();
this.tableName = cassandraTableInfo.getTableName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,21 @@

package com.dtstack.flink.sql.sink.cassandra.table;

import com.dtstack.flink.sql.table.AbsTableParser;
import com.dtstack.flink.sql.table.TableInfo;
import com.dtstack.flink.sql.table.AbstractTableParser;
import com.dtstack.flink.sql.table.AbstractTableInfo;
import com.dtstack.flink.sql.util.MathUtil;

import java.util.Map;

import static com.dtstack.flink.sql.table.TableInfo.PARALLELISM_KEY;
import static com.dtstack.flink.sql.table.AbstractTableInfo.PARALLELISM_KEY;

/**
* Reason:
* Date: 2018/11/22
*
* @author xuqianjin
*/
public class CassandraSinkParser extends AbsTableParser {
public class CassandraSinkParser extends AbstractTableParser {

public static final String ADDRESS_KEY = "address";

Expand All @@ -60,7 +60,7 @@ public class CassandraSinkParser extends AbsTableParser {
public static final String POOL_TIMEOUT_MILLIS_KEY = "poolTimeoutMillis";

@Override
public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, Object> props) {
public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, Object> props) {
CassandraTableInfo cassandraTableInfo = new CassandraTableInfo();
cassandraTableInfo.setName(tableName);
parseFieldsInfo(fieldsInfo, cassandraTableInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

package com.dtstack.flink.sql.sink.cassandra.table;

import com.dtstack.flink.sql.table.TargetTableInfo;
import com.dtstack.flink.sql.table.AbstractTargetTableInfo;
import com.google.common.base.Preconditions;

/**
Expand All @@ -28,7 +28,7 @@
*
* @author xuqianjin
*/
public class CassandraTableInfo extends TargetTableInfo {
public class CassandraTableInfo extends AbstractTargetTableInfo {

private static final String CURR_TYPE = "cassandra";

Expand Down
14 changes: 14 additions & 0 deletions ci/sonar_notify.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#!/bin/bash
#参考钉钉文档 https://open-doc.dingtalk.com/microapp/serverapi2/qf2nxq
sonarreport=$(curl -s http://172.16.100.198:8082/?projectname=dt-insight-engine/flinkStreamSQL)
curl -s "https://oapi.dingtalk.com/robot/send?access_token=71555061297a53d3ac922a6f4d94285d8e23bccdca0c00b4dc6df0a2d49da724" \
-H "Content-Type: application/json" \
-d "{
\"msgtype\": \"markdown\",
\"markdown\": {
\"title\":\"sonar代码质量\",
\"text\": \"## sonar代码质量报告: \n
> [sonar地址](http://172.16.100.198:9000/dashboard?id=dt-insight-engine/flinkStreamSQL) \n
> ${sonarreport} \n\"
}
}"
Loading

0 comments on commit 85aae47

Please sign in to comment.