Skip to content

Commit c81614e

Browse files
author
dapeng
committed
Merge branch 'hotfix_1.8_3.10.x_26287' into 1.8_test_3.10.x
2 parents 2e0c6c7 + eb0ae65 commit c81614e

File tree

8 files changed

+17
-16
lines changed

8 files changed

+17
-16
lines changed

clickhouse/clickhouse-side/clickhouse-async-side/src/main/java/com/dtstack/flink/sql/side/clickhouse/ClickhouseAsyncReqRow.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,8 @@ public void open(Configuration parameters) throws Exception {
6767
vo.setFileResolverCachingEnabled(false);
6868
Vertx vertx = Vertx.vertx(vo);
6969
setRdbSqlClient(JDBCClient.createNonShared(vertx, clickhouseClientConfig));
70-
setExecutor(new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
71-
new LinkedBlockingQueue<>(10), new DTThreadFactory("clickhouseAsyncExec")));
70+
setExecutor(new ThreadPoolExecutor(50, 50, 0, TimeUnit.MILLISECONDS,
71+
new LinkedBlockingQueue<>(10000), new DTThreadFactory("clickhouseAsyncExec"), new ThreadPoolExecutor.CallerRunsPolicy()));
7272
}
7373

7474
}

db2/db2-side/db2-async-side/src/main/java/com/dtstack/flink/sql/side/db2/Db2AsyncReqRow.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,9 @@ public void open(Configuration parameters) throws Exception {
7676
vo.setFileResolverCachingEnabled(false);
7777
Vertx vertx = Vertx.vertx(vo);
7878
setRdbSqlClient(JDBCClient.createNonShared(vertx, db2lientConfig));
79-
setExecutor(new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
80-
new LinkedBlockingQueue<>(10), new DTThreadFactory("dbAsyncExec")));
79+
setExecutor(new ThreadPoolExecutor(50, 50, 0, TimeUnit.MILLISECONDS,
80+
new LinkedBlockingQueue<>(10000), new DTThreadFactory("dbAsyncExec"), new ThreadPoolExecutor.CallerRunsPolicy()));
81+
8182
}
8283

8384
}

impala/impala-side/impala-async-side/src/main/java/com/dtstack/flink/sql/side/impala/ImpalaAsyncReqRow.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,8 @@ public void open(Configuration parameters) throws Exception {
8383
vo.setFileResolverCachingEnabled(false);
8484
Vertx vertx = Vertx.vertx(vo);
8585
setRdbSqlClient(JDBCClient.createNonShared(vertx, impalaClientConfig));
86-
setExecutor(new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
87-
new LinkedBlockingQueue<>(10), new DTThreadFactory("impalaAsyncExec")));
86+
setExecutor(new ThreadPoolExecutor(50, 50, 0, TimeUnit.MILLISECONDS,
87+
new LinkedBlockingQueue<>(10000), new DTThreadFactory("impalaAsyncExec"), new ThreadPoolExecutor.CallerRunsPolicy()));
8888
}
8989

9090

mysql/mysql-side/mysql-async-side/src/main/java/com/dtstack/flink/sql/side/mysql/MysqlAsyncReqRow.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,8 @@ public void open(Configuration parameters) throws Exception {
7676
vo.setFileResolverCachingEnabled(false);
7777
Vertx vertx = Vertx.vertx(vo);
7878
setRdbSqlClient(JDBCClient.createNonShared(vertx, mysqlClientConfig));
79-
setExecutor(new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
80-
new LinkedBlockingQueue<>(10), new DTThreadFactory("mysqlAsyncExec")));
79+
setExecutor(new ThreadPoolExecutor(50, 50, 0, TimeUnit.MILLISECONDS,
80+
new LinkedBlockingQueue<>(10000), new DTThreadFactory("mysqlAsyncExec"), new ThreadPoolExecutor.CallerRunsPolicy()));
8181
}
8282

8383
}

oracle/oracle-side/oracle-async-side/src/main/java/com/dtstack/flink/sql/side/oracle/OracleAsyncReqRow.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public void open(Configuration parameters) throws Exception {
6868
vo.setFileResolverCachingEnabled(false);
6969
Vertx vertx = Vertx.vertx(vo);
7070
setRdbSqlClient(JDBCClient.createNonShared(vertx, oracleClientConfig));
71-
setExecutor(new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
72-
new LinkedBlockingQueue<>(10), new DTThreadFactory("oracleAsyncExec")));
71+
setExecutor(new ThreadPoolExecutor(50, 50, 0, TimeUnit.MILLISECONDS,
72+
new LinkedBlockingQueue<>(10000), new DTThreadFactory("oracleAsyncExec"), new ThreadPoolExecutor.CallerRunsPolicy()));
7373
}
7474
}

polardb/polardb-side/polardb-async-side/src/main/java/com/dtstack/flink/sql/side/polardb/PolardbAsyncReqRow.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public void open(Configuration parameters) throws Exception {
7575
vo.setFileResolverCachingEnabled(false);
7676
Vertx vertx = Vertx.vertx(vo);
7777
setRdbSqlClient(JDBCClient.createNonShared(vertx, mysqlClientConfig));
78-
setExecutor(new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
79-
new LinkedBlockingQueue<>(10), new DTThreadFactory("polardbAsyncExec")));
78+
setExecutor(new ThreadPoolExecutor(50, 50, 0, TimeUnit.MILLISECONDS,
79+
new LinkedBlockingQueue<>(10000), new DTThreadFactory("polardbAsyncExec"), new ThreadPoolExecutor.CallerRunsPolicy()));
8080
}
8181
}

postgresql/postgresql-side/postgresql-async-side/src/main/java/com/dtstack/flink/sql/side/postgresql/PostgresqlAsyncReqRow.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,8 @@ public void open(Configuration parameters) throws Exception {
7676
vo.setWorkerPoolSize(rdbSideTableInfo.getAsyncPoolSize());
7777
Vertx vertx = Vertx.vertx(vo);
7878
setRdbSqlClient(JDBCClient.createNonShared(vertx, pgClientConfig));
79-
setExecutor(new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
80-
new LinkedBlockingQueue<>(10), new DTThreadFactory("postgresqlAsyncExec")));
79+
setExecutor(new ThreadPoolExecutor(50, 50, 0, TimeUnit.MILLISECONDS,
80+
new LinkedBlockingQueue<>(10000), new DTThreadFactory("postgresqlAsyncExec"), new ThreadPoolExecutor.CallerRunsPolicy()));
8181
}
8282

8383
}

sqlserver/sqlserver-side/sqlserver-async-side/src/main/java/com/dtstack/flink/sql/side/sqlserver/SqlserverAsyncReqRow.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ public void open(Configuration parameters) throws Exception {
7878
vo.setFileResolverCachingEnabled(false);
7979
Vertx vertx = Vertx.vertx(vo);
8080
setRdbSqlClient(JDBCClient.createNonShared(vertx, sqlserverClientConfig));
81-
setExecutor(new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
82-
new LinkedBlockingQueue<>(10), new DTThreadFactory("sqlServerAsyncExec")));
81+
setExecutor(new ThreadPoolExecutor(50, 50, 0, TimeUnit.MILLISECONDS,
82+
new LinkedBlockingQueue<>(10000), new DTThreadFactory("sqlServerAsyncExec"), new ThreadPoolExecutor.CallerRunsPolicy()));
8383
}
8484
}

0 commit comments

Comments
 (0)