Skip to content

Commit 16398a1

Browse files
author
dapeng
committed
统一执行线程池,调整参数
1 parent b1b56ad commit 16398a1

File tree

10 files changed

+11
-22
lines changed

10 files changed

+11
-22
lines changed

clickhouse/clickhouse-side/clickhouse-async-side/src/main/java/com/dtstack/flink/sql/side/clickhouse/ClickhouseAsyncReqRow.java

-2
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,6 @@ public void open(Configuration parameters) throws Exception {
6767
vo.setFileResolverCachingEnabled(false);
6868
Vertx vertx = Vertx.vertx(vo);
6969
setRdbSqlClient(JDBCClient.createNonShared(vertx, clickhouseClientConfig));
70-
setExecutor(new ThreadPoolExecutor(50, 50, 0, TimeUnit.MILLISECONDS,
71-
new LinkedBlockingQueue<>(10000), new DTThreadFactory("clickhouseAsyncExec"), new ThreadPoolExecutor.CallerRunsPolicy()));
7270
}
7371

7472
}

db2/db2-side/db2-async-side/src/main/java/com/dtstack/flink/sql/side/db2/Db2AsyncReqRow.java

-3
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,6 @@ public void open(Configuration parameters) throws Exception {
7676
vo.setFileResolverCachingEnabled(false);
7777
Vertx vertx = Vertx.vertx(vo);
7878
setRdbSqlClient(JDBCClient.createNonShared(vertx, db2lientConfig));
79-
setExecutor(new ThreadPoolExecutor(50, 50, 0, TimeUnit.MILLISECONDS,
80-
new LinkedBlockingQueue<>(10000), new DTThreadFactory("dbAsyncExec"), new ThreadPoolExecutor.CallerRunsPolicy()));
81-
8279
}
8380

8481
}

impala/impala-side/impala-async-side/src/main/java/com/dtstack/flink/sql/side/impala/ImpalaAsyncReqRow.java

-2
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,6 @@ public void open(Configuration parameters) throws Exception {
8383
vo.setFileResolverCachingEnabled(false);
8484
Vertx vertx = Vertx.vertx(vo);
8585
setRdbSqlClient(JDBCClient.createNonShared(vertx, impalaClientConfig));
86-
setExecutor(new ThreadPoolExecutor(50, 50, 0, TimeUnit.MILLISECONDS,
87-
new LinkedBlockingQueue<>(10000), new DTThreadFactory("impalaAsyncExec"), new ThreadPoolExecutor.CallerRunsPolicy()));
8886
}
8987

9088

launcher/src/main/test/java/com/dtstack/flink/sql/launcher/PluginLoadModeTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public static void testClasspathMode() throws Exception {
5050

5151

5252
public static void testRocSql() throws Exception{
53-
String[] sql = new String[]{"-mode", "local", "-sql", "/Users/roc/Documents/flink_sql/sql/zy_sql/hbase_side.sql", "-name", "roc",
53+
String[] sql = new String[]{"-mode", "local", "-sql", "/Users/roc/Documents/flink_sql/sql/impala_sink.sql", "-name", "roc",
5454
"-localSqlPluginPath", "/Users/roc/workspace/git_code/flinkStreamSQL/plugins",
5555
"-remoteSqlPluginPath", "/Users/roc/workspace/git_code/flinkStreamSQL/plugins",
5656
"-flinkconf", "/Users/roc/Documents/flink_sql/flinkconf",

mysql/mysql-side/mysql-async-side/src/main/java/com/dtstack/flink/sql/side/mysql/MysqlAsyncReqRow.java

-2
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,6 @@ public void open(Configuration parameters) throws Exception {
7676
vo.setFileResolverCachingEnabled(false);
7777
Vertx vertx = Vertx.vertx(vo);
7878
setRdbSqlClient(JDBCClient.createNonShared(vertx, mysqlClientConfig));
79-
setExecutor(new ThreadPoolExecutor(50, 50, 0, TimeUnit.MILLISECONDS,
80-
new LinkedBlockingQueue<>(10000), new DTThreadFactory("mysqlAsyncExec"), new ThreadPoolExecutor.CallerRunsPolicy()));
8179
}
8280

8381
}

oracle/oracle-side/oracle-async-side/src/main/java/com/dtstack/flink/sql/side/oracle/OracleAsyncReqRow.java

-2
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,5 @@ public void open(Configuration parameters) throws Exception {
6868
vo.setFileResolverCachingEnabled(false);
6969
Vertx vertx = Vertx.vertx(vo);
7070
setRdbSqlClient(JDBCClient.createNonShared(vertx, oracleClientConfig));
71-
setExecutor(new ThreadPoolExecutor(50, 50, 0, TimeUnit.MILLISECONDS,
72-
new LinkedBlockingQueue<>(10000), new DTThreadFactory("oracleAsyncExec"), new ThreadPoolExecutor.CallerRunsPolicy()));
7371
}
7472
}

polardb/polardb-side/polardb-async-side/src/main/java/com/dtstack/flink/sql/side/polardb/PolardbAsyncReqRow.java

-2
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,5 @@ public void open(Configuration parameters) throws Exception {
7575
vo.setFileResolverCachingEnabled(false);
7676
Vertx vertx = Vertx.vertx(vo);
7777
setRdbSqlClient(JDBCClient.createNonShared(vertx, mysqlClientConfig));
78-
setExecutor(new ThreadPoolExecutor(50, 50, 0, TimeUnit.MILLISECONDS,
79-
new LinkedBlockingQueue<>(10000), new DTThreadFactory("polardbAsyncExec"), new ThreadPoolExecutor.CallerRunsPolicy()));
8078
}
8179
}

postgresql/postgresql-side/postgresql-async-side/src/main/java/com/dtstack/flink/sql/side/postgresql/PostgresqlAsyncReqRow.java

-2
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,6 @@ public void open(Configuration parameters) throws Exception {
7676
vo.setWorkerPoolSize(rdbSideTableInfo.getAsyncPoolSize());
7777
Vertx vertx = Vertx.vertx(vo);
7878
setRdbSqlClient(JDBCClient.createNonShared(vertx, pgClientConfig));
79-
setExecutor(new ThreadPoolExecutor(50, 50, 0, TimeUnit.MILLISECONDS,
80-
new LinkedBlockingQueue<>(10000), new DTThreadFactory("postgresqlAsyncExec"), new ThreadPoolExecutor.CallerRunsPolicy()));
8179
}
8280

8381
}

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncReqRow.java

+10-4
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import io.vertx.ext.sql.SQLClient;
3636
import io.vertx.ext.sql.SQLConnection;
3737
import org.apache.commons.lang3.StringUtils;
38+
import org.apache.flink.configuration.Configuration;
3839
import org.apache.flink.streaming.api.functions.async.ResultFuture;
3940
import org.apache.flink.table.runtime.types.CRow;
4041
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
@@ -86,6 +87,15 @@ public class RdbAsyncReqRow extends BaseAsyncReqRow {
8687

8788
private transient ThreadPoolExecutor executor;
8889

90+
private final static int MAX_TASK_QUEUE_SIZE = 100000;
91+
92+
@Override
93+
public void open(Configuration parameters) throws Exception {
94+
super.open(parameters);
95+
executor = new ThreadPoolExecutor(MAX_DB_CONN_POOL_SIZE_LIMIT, MAX_DB_CONN_POOL_SIZE_LIMIT, 0, TimeUnit.MILLISECONDS,
96+
new LinkedBlockingQueue<>(MAX_TASK_QUEUE_SIZE), new DTThreadFactory("rdbAsyncExec"), new ThreadPoolExecutor.CallerRunsPolicy());
97+
}
98+
8999
public RdbAsyncReqRow(BaseSideInfo sideInfo) {
90100
super(sideInfo);
91101
init(sideInfo);
@@ -248,10 +258,6 @@ public void setRdbSqlClient(SQLClient rdbSqlClient) {
248258
this.rdbSqlClient = rdbSqlClient;
249259
}
250260

251-
public void setExecutor(ThreadPoolExecutor executor) {
252-
this.executor = executor;
253-
}
254-
255261
private void handleQuery(SQLConnection connection, Map<String, Object> inputParams, CRow input, ResultFuture<CRow> resultFuture){
256262
String key = buildCacheKey(inputParams);
257263
JsonArray params = new JsonArray(Lists.newArrayList(inputParams.values()));

sqlserver/sqlserver-side/sqlserver-async-side/src/main/java/com/dtstack/flink/sql/side/sqlserver/SqlserverAsyncReqRow.java

-2
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,5 @@ public void open(Configuration parameters) throws Exception {
7878
vo.setFileResolverCachingEnabled(false);
7979
Vertx vertx = Vertx.vertx(vo);
8080
setRdbSqlClient(JDBCClient.createNonShared(vertx, sqlserverClientConfig));
81-
setExecutor(new ThreadPoolExecutor(50, 50, 0, TimeUnit.MILLISECONDS,
82-
new LinkedBlockingQueue<>(10000), new DTThreadFactory("sqlServerAsyncExec"), new ThreadPoolExecutor.CallerRunsPolicy()));
8381
}
8482
}

0 commit comments

Comments
 (0)