Skip to content

Commit

Permalink
Merge branch 'hotfix_1.8_3.10.x_26972' into 1.8_release_3.10.x
Browse files Browse the repository at this point in the history
  • Loading branch information
gituser committed Jun 22, 2020
2 parents bf1f766 + 16398a1 commit 76b10af
Show file tree
Hide file tree
Showing 11 changed files with 16 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,6 @@ public void open(Configuration parameters) throws Exception {
vo.setFileResolverCachingEnabled(false);
Vertx vertx = Vertx.vertx(vo);
setRdbSqlClient(JDBCClient.createNonShared(vertx, clickhouseClientConfig));
setExecutor(new ThreadPoolExecutor(50, 50, 0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(10000), new DTThreadFactory("clickhouseAsyncExec"), new ThreadPoolExecutor.CallerRunsPolicy()));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import com.dtstack.flink.sql.enums.ECacheContentType;
import com.dtstack.flink.sql.enums.ECacheType;
import com.dtstack.flink.sql.factory.DTThreadFactory;
import com.dtstack.flink.sql.metric.MetricConstant;
import com.dtstack.flink.sql.side.cache.AbstractSideCache;
import com.dtstack.flink.sql.side.cache.CacheObj;
Expand Down Expand Up @@ -68,6 +69,7 @@ public abstract class BaseAsyncReqRow extends RichAsyncFunction<CRow, CRow> impl
private int timeOutNum = 0;
protected BaseSideInfo sideInfo;
protected transient Counter parseErrorRecords;
private transient ThreadPoolExecutor cancelExecutor;

public BaseAsyncReqRow(BaseSideInfo sideInfo){
this.sideInfo = sideInfo;
Expand All @@ -82,6 +84,8 @@ public void open(Configuration parameters) throws Exception {
super.open(parameters);
initCache();
initMetric();
cancelExecutor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(100000),
new DTThreadFactory("cancel-timer-executor"));
LOG.info("async dim table config info: {} ", sideInfo.getSideTableInfo().toString());
}

Expand Down Expand Up @@ -248,12 +252,11 @@ public void onProcessingTime(long timestamp) throws Exception {
}

protected void cancelTimerWhenComplete(ResultFuture<CRow> resultFuture, ScheduledFuture<?> timerFuture){
ThreadPoolExecutor executors = new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
if(resultFuture instanceof StreamRecordQueueEntry){
StreamRecordQueueEntry streamRecordBufferEntry = (StreamRecordQueueEntry) resultFuture;
streamRecordBufferEntry.onComplete((Object value) -> {
timerFuture.cancel(true);
},executors);
}, cancelExecutor);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,6 @@ public void open(Configuration parameters) throws Exception {
vo.setFileResolverCachingEnabled(false);
Vertx vertx = Vertx.vertx(vo);
setRdbSqlClient(JDBCClient.createNonShared(vertx, db2lientConfig));
setExecutor(new ThreadPoolExecutor(50, 50, 0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(10000), new DTThreadFactory("dbAsyncExec"), new ThreadPoolExecutor.CallerRunsPolicy()));

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,6 @@ public void open(Configuration parameters) throws Exception {
vo.setFileResolverCachingEnabled(false);
Vertx vertx = Vertx.vertx(vo);
setRdbSqlClient(JDBCClient.createNonShared(vertx, impalaClientConfig));
setExecutor(new ThreadPoolExecutor(50, 50, 0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(10000), new DTThreadFactory("impalaAsyncExec"), new ThreadPoolExecutor.CallerRunsPolicy()));
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public static void testClasspathMode() throws Exception {


public static void testRocSql() throws Exception{
String[] sql = new String[]{"-mode", "local", "-sql", "/Users/roc/Documents/flink_sql/sql/zy_sql/hbase_side.sql", "-name", "roc",
String[] sql = new String[]{"-mode", "local", "-sql", "/Users/roc/Documents/flink_sql/sql/impala_sink.sql", "-name", "roc",
"-localSqlPluginPath", "/Users/roc/workspace/git_code/flinkStreamSQL/plugins",
"-remoteSqlPluginPath", "/Users/roc/workspace/git_code/flinkStreamSQL/plugins",
"-flinkconf", "/Users/roc/Documents/flink_sql/flinkconf",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,6 @@ public void open(Configuration parameters) throws Exception {
vo.setFileResolverCachingEnabled(false);
Vertx vertx = Vertx.vertx(vo);
setRdbSqlClient(JDBCClient.createNonShared(vertx, mysqlClientConfig));
setExecutor(new ThreadPoolExecutor(50, 50, 0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(10000), new DTThreadFactory("mysqlAsyncExec"), new ThreadPoolExecutor.CallerRunsPolicy()));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,5 @@ public void open(Configuration parameters) throws Exception {
vo.setFileResolverCachingEnabled(false);
Vertx vertx = Vertx.vertx(vo);
setRdbSqlClient(JDBCClient.createNonShared(vertx, oracleClientConfig));
setExecutor(new ThreadPoolExecutor(50, 50, 0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(10000), new DTThreadFactory("oracleAsyncExec"), new ThreadPoolExecutor.CallerRunsPolicy()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,5 @@ public void open(Configuration parameters) throws Exception {
vo.setFileResolverCachingEnabled(false);
Vertx vertx = Vertx.vertx(vo);
setRdbSqlClient(JDBCClient.createNonShared(vertx, mysqlClientConfig));
setExecutor(new ThreadPoolExecutor(50, 50, 0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(10000), new DTThreadFactory("polardbAsyncExec"), new ThreadPoolExecutor.CallerRunsPolicy()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,6 @@ public void open(Configuration parameters) throws Exception {
vo.setWorkerPoolSize(rdbSideTableInfo.getAsyncPoolSize());
Vertx vertx = Vertx.vertx(vo);
setRdbSqlClient(JDBCClient.createNonShared(vertx, pgClientConfig));
setExecutor(new ThreadPoolExecutor(50, 50, 0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(10000), new DTThreadFactory("postgresqlAsyncExec"), new ThreadPoolExecutor.CallerRunsPolicy()));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.vertx.ext.sql.SQLClient;
import io.vertx.ext.sql.SQLConnection;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.table.runtime.types.CRow;
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
Expand Down Expand Up @@ -86,6 +87,15 @@ public class RdbAsyncReqRow extends BaseAsyncReqRow {

private transient ThreadPoolExecutor executor;

private final static int MAX_TASK_QUEUE_SIZE = 100000;

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
executor = new ThreadPoolExecutor(MAX_DB_CONN_POOL_SIZE_LIMIT, MAX_DB_CONN_POOL_SIZE_LIMIT, 0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(MAX_TASK_QUEUE_SIZE), new DTThreadFactory("rdbAsyncExec"), new ThreadPoolExecutor.CallerRunsPolicy());
}

public RdbAsyncReqRow(BaseSideInfo sideInfo) {
super(sideInfo);
init(sideInfo);
Expand Down Expand Up @@ -248,10 +258,6 @@ public void setRdbSqlClient(SQLClient rdbSqlClient) {
this.rdbSqlClient = rdbSqlClient;
}

public void setExecutor(ThreadPoolExecutor executor) {
this.executor = executor;
}

private void handleQuery(SQLConnection connection, Map<String, Object> inputParams, CRow input, ResultFuture<CRow> resultFuture){
String key = buildCacheKey(inputParams);
JsonArray params = new JsonArray(Lists.newArrayList(inputParams.values()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,5 @@ public void open(Configuration parameters) throws Exception {
vo.setFileResolverCachingEnabled(false);
Vertx vertx = Vertx.vertx(vo);
setRdbSqlClient(JDBCClient.createNonShared(vertx, sqlserverClientConfig));
setExecutor(new ThreadPoolExecutor(50, 50, 0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(10000), new DTThreadFactory("sqlServerAsyncExec"), new ThreadPoolExecutor.CallerRunsPolicy()));
}
}

0 comments on commit 76b10af

Please sign in to comment.