Skip to content

Commit

Permalink
统一执行线程池,调整参数
Browse files Browse the repository at this point in the history
  • Loading branch information
dapeng committed Jun 22, 2020
1 parent b1b56ad commit 16398a1
Show file tree
Hide file tree
Showing 10 changed files with 11 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,6 @@ public void open(Configuration parameters) throws Exception {
vo.setFileResolverCachingEnabled(false);
Vertx vertx = Vertx.vertx(vo);
setRdbSqlClient(JDBCClient.createNonShared(vertx, clickhouseClientConfig));
setExecutor(new ThreadPoolExecutor(50, 50, 0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(10000), new DTThreadFactory("clickhouseAsyncExec"), new ThreadPoolExecutor.CallerRunsPolicy()));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,6 @@ public void open(Configuration parameters) throws Exception {
vo.setFileResolverCachingEnabled(false);
Vertx vertx = Vertx.vertx(vo);
setRdbSqlClient(JDBCClient.createNonShared(vertx, db2lientConfig));
setExecutor(new ThreadPoolExecutor(50, 50, 0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(10000), new DTThreadFactory("dbAsyncExec"), new ThreadPoolExecutor.CallerRunsPolicy()));

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,6 @@ public void open(Configuration parameters) throws Exception {
vo.setFileResolverCachingEnabled(false);
Vertx vertx = Vertx.vertx(vo);
setRdbSqlClient(JDBCClient.createNonShared(vertx, impalaClientConfig));
setExecutor(new ThreadPoolExecutor(50, 50, 0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(10000), new DTThreadFactory("impalaAsyncExec"), new ThreadPoolExecutor.CallerRunsPolicy()));
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public static void testClasspathMode() throws Exception {


public static void testRocSql() throws Exception{
String[] sql = new String[]{"-mode", "local", "-sql", "/Users/roc/Documents/flink_sql/sql/zy_sql/hbase_side.sql", "-name", "roc",
String[] sql = new String[]{"-mode", "local", "-sql", "/Users/roc/Documents/flink_sql/sql/impala_sink.sql", "-name", "roc",
"-localSqlPluginPath", "/Users/roc/workspace/git_code/flinkStreamSQL/plugins",
"-remoteSqlPluginPath", "/Users/roc/workspace/git_code/flinkStreamSQL/plugins",
"-flinkconf", "/Users/roc/Documents/flink_sql/flinkconf",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,6 @@ public void open(Configuration parameters) throws Exception {
vo.setFileResolverCachingEnabled(false);
Vertx vertx = Vertx.vertx(vo);
setRdbSqlClient(JDBCClient.createNonShared(vertx, mysqlClientConfig));
setExecutor(new ThreadPoolExecutor(50, 50, 0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(10000), new DTThreadFactory("mysqlAsyncExec"), new ThreadPoolExecutor.CallerRunsPolicy()));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,5 @@ public void open(Configuration parameters) throws Exception {
vo.setFileResolverCachingEnabled(false);
Vertx vertx = Vertx.vertx(vo);
setRdbSqlClient(JDBCClient.createNonShared(vertx, oracleClientConfig));
setExecutor(new ThreadPoolExecutor(50, 50, 0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(10000), new DTThreadFactory("oracleAsyncExec"), new ThreadPoolExecutor.CallerRunsPolicy()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,5 @@ public void open(Configuration parameters) throws Exception {
vo.setFileResolverCachingEnabled(false);
Vertx vertx = Vertx.vertx(vo);
setRdbSqlClient(JDBCClient.createNonShared(vertx, mysqlClientConfig));
setExecutor(new ThreadPoolExecutor(50, 50, 0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(10000), new DTThreadFactory("polardbAsyncExec"), new ThreadPoolExecutor.CallerRunsPolicy()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,6 @@ public void open(Configuration parameters) throws Exception {
vo.setWorkerPoolSize(rdbSideTableInfo.getAsyncPoolSize());
Vertx vertx = Vertx.vertx(vo);
setRdbSqlClient(JDBCClient.createNonShared(vertx, pgClientConfig));
setExecutor(new ThreadPoolExecutor(50, 50, 0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(10000), new DTThreadFactory("postgresqlAsyncExec"), new ThreadPoolExecutor.CallerRunsPolicy()));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.vertx.ext.sql.SQLClient;
import io.vertx.ext.sql.SQLConnection;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.table.runtime.types.CRow;
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
Expand Down Expand Up @@ -86,6 +87,15 @@ public class RdbAsyncReqRow extends BaseAsyncReqRow {

private transient ThreadPoolExecutor executor;

private final static int MAX_TASK_QUEUE_SIZE = 100000;

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
executor = new ThreadPoolExecutor(MAX_DB_CONN_POOL_SIZE_LIMIT, MAX_DB_CONN_POOL_SIZE_LIMIT, 0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(MAX_TASK_QUEUE_SIZE), new DTThreadFactory("rdbAsyncExec"), new ThreadPoolExecutor.CallerRunsPolicy());
}

public RdbAsyncReqRow(BaseSideInfo sideInfo) {
super(sideInfo);
init(sideInfo);
Expand Down Expand Up @@ -248,10 +258,6 @@ public void setRdbSqlClient(SQLClient rdbSqlClient) {
this.rdbSqlClient = rdbSqlClient;
}

public void setExecutor(ThreadPoolExecutor executor) {
this.executor = executor;
}

private void handleQuery(SQLConnection connection, Map<String, Object> inputParams, CRow input, ResultFuture<CRow> resultFuture){
String key = buildCacheKey(inputParams);
JsonArray params = new JsonArray(Lists.newArrayList(inputParams.values()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,5 @@ public void open(Configuration parameters) throws Exception {
vo.setFileResolverCachingEnabled(false);
Vertx vertx = Vertx.vertx(vo);
setRdbSqlClient(JDBCClient.createNonShared(vertx, sqlserverClientConfig));
setExecutor(new ThreadPoolExecutor(50, 50, 0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(10000), new DTThreadFactory("sqlServerAsyncExec"), new ThreadPoolExecutor.CallerRunsPolicy()));
}
}

0 comments on commit 16398a1

Please sign in to comment.