Skip to content

Commit 76b10af

Browse files
author
gituser
committed
Merge branch 'hotfix_1.8_3.10.x_26972' into 1.8_release_3.10.x
2 parents bf1f766 + 16398a1 commit 76b10af

File tree

11 files changed

+16
-24
lines changed

11 files changed

+16
-24
lines changed

clickhouse/clickhouse-side/clickhouse-async-side/src/main/java/com/dtstack/flink/sql/side/clickhouse/ClickhouseAsyncReqRow.java

-2
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,6 @@ public void open(Configuration parameters) throws Exception {
6767
vo.setFileResolverCachingEnabled(false);
6868
Vertx vertx = Vertx.vertx(vo);
6969
setRdbSqlClient(JDBCClient.createNonShared(vertx, clickhouseClientConfig));
70-
setExecutor(new ThreadPoolExecutor(50, 50, 0, TimeUnit.MILLISECONDS,
71-
new LinkedBlockingQueue<>(10000), new DTThreadFactory("clickhouseAsyncExec"), new ThreadPoolExecutor.CallerRunsPolicy()));
7270
}
7371

7472
}

core/src/main/java/com/dtstack/flink/sql/side/BaseAsyncReqRow.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import com.dtstack.flink.sql.enums.ECacheContentType;
2424
import com.dtstack.flink.sql.enums.ECacheType;
25+
import com.dtstack.flink.sql.factory.DTThreadFactory;
2526
import com.dtstack.flink.sql.metric.MetricConstant;
2627
import com.dtstack.flink.sql.side.cache.AbstractSideCache;
2728
import com.dtstack.flink.sql.side.cache.CacheObj;
@@ -68,6 +69,7 @@ public abstract class BaseAsyncReqRow extends RichAsyncFunction<CRow, CRow> impl
6869
private int timeOutNum = 0;
6970
protected BaseSideInfo sideInfo;
7071
protected transient Counter parseErrorRecords;
72+
private transient ThreadPoolExecutor cancelExecutor;
7173

7274
public BaseAsyncReqRow(BaseSideInfo sideInfo){
7375
this.sideInfo = sideInfo;
@@ -82,6 +84,8 @@ public void open(Configuration parameters) throws Exception {
8284
super.open(parameters);
8385
initCache();
8486
initMetric();
87+
cancelExecutor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(100000),
88+
new DTThreadFactory("cancel-timer-executor"));
8589
LOG.info("async dim table config info: {} ", sideInfo.getSideTableInfo().toString());
8690
}
8791

@@ -248,12 +252,11 @@ public void onProcessingTime(long timestamp) throws Exception {
248252
}
249253

250254
protected void cancelTimerWhenComplete(ResultFuture<CRow> resultFuture, ScheduledFuture<?> timerFuture){
251-
ThreadPoolExecutor executors = new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
252255
if(resultFuture instanceof StreamRecordQueueEntry){
253256
StreamRecordQueueEntry streamRecordBufferEntry = (StreamRecordQueueEntry) resultFuture;
254257
streamRecordBufferEntry.onComplete((Object value) -> {
255258
timerFuture.cancel(true);
256-
},executors);
259+
}, cancelExecutor);
257260
}
258261
}
259262

db2/db2-side/db2-async-side/src/main/java/com/dtstack/flink/sql/side/db2/Db2AsyncReqRow.java

-3
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,6 @@ public void open(Configuration parameters) throws Exception {
7676
vo.setFileResolverCachingEnabled(false);
7777
Vertx vertx = Vertx.vertx(vo);
7878
setRdbSqlClient(JDBCClient.createNonShared(vertx, db2lientConfig));
79-
setExecutor(new ThreadPoolExecutor(50, 50, 0, TimeUnit.MILLISECONDS,
80-
new LinkedBlockingQueue<>(10000), new DTThreadFactory("dbAsyncExec"), new ThreadPoolExecutor.CallerRunsPolicy()));
81-
8279
}
8380

8481
}

impala/impala-side/impala-async-side/src/main/java/com/dtstack/flink/sql/side/impala/ImpalaAsyncReqRow.java

-2
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,6 @@ public void open(Configuration parameters) throws Exception {
8383
vo.setFileResolverCachingEnabled(false);
8484
Vertx vertx = Vertx.vertx(vo);
8585
setRdbSqlClient(JDBCClient.createNonShared(vertx, impalaClientConfig));
86-
setExecutor(new ThreadPoolExecutor(50, 50, 0, TimeUnit.MILLISECONDS,
87-
new LinkedBlockingQueue<>(10000), new DTThreadFactory("impalaAsyncExec"), new ThreadPoolExecutor.CallerRunsPolicy()));
8886
}
8987

9088

launcher/src/main/test/java/com/dtstack/flink/sql/launcher/PluginLoadModeTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public static void testClasspathMode() throws Exception {
5050

5151

5252
public static void testRocSql() throws Exception{
53-
String[] sql = new String[]{"-mode", "local", "-sql", "/Users/roc/Documents/flink_sql/sql/zy_sql/hbase_side.sql", "-name", "roc",
53+
String[] sql = new String[]{"-mode", "local", "-sql", "/Users/roc/Documents/flink_sql/sql/impala_sink.sql", "-name", "roc",
5454
"-localSqlPluginPath", "/Users/roc/workspace/git_code/flinkStreamSQL/plugins",
5555
"-remoteSqlPluginPath", "/Users/roc/workspace/git_code/flinkStreamSQL/plugins",
5656
"-flinkconf", "/Users/roc/Documents/flink_sql/flinkconf",

mysql/mysql-side/mysql-async-side/src/main/java/com/dtstack/flink/sql/side/mysql/MysqlAsyncReqRow.java

-2
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,6 @@ public void open(Configuration parameters) throws Exception {
7676
vo.setFileResolverCachingEnabled(false);
7777
Vertx vertx = Vertx.vertx(vo);
7878
setRdbSqlClient(JDBCClient.createNonShared(vertx, mysqlClientConfig));
79-
setExecutor(new ThreadPoolExecutor(50, 50, 0, TimeUnit.MILLISECONDS,
80-
new LinkedBlockingQueue<>(10000), new DTThreadFactory("mysqlAsyncExec"), new ThreadPoolExecutor.CallerRunsPolicy()));
8179
}
8280

8381
}

oracle/oracle-side/oracle-async-side/src/main/java/com/dtstack/flink/sql/side/oracle/OracleAsyncReqRow.java

-2
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,5 @@ public void open(Configuration parameters) throws Exception {
6868
vo.setFileResolverCachingEnabled(false);
6969
Vertx vertx = Vertx.vertx(vo);
7070
setRdbSqlClient(JDBCClient.createNonShared(vertx, oracleClientConfig));
71-
setExecutor(new ThreadPoolExecutor(50, 50, 0, TimeUnit.MILLISECONDS,
72-
new LinkedBlockingQueue<>(10000), new DTThreadFactory("oracleAsyncExec"), new ThreadPoolExecutor.CallerRunsPolicy()));
7371
}
7472
}

polardb/polardb-side/polardb-async-side/src/main/java/com/dtstack/flink/sql/side/polardb/PolardbAsyncReqRow.java

-2
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,5 @@ public void open(Configuration parameters) throws Exception {
7575
vo.setFileResolverCachingEnabled(false);
7676
Vertx vertx = Vertx.vertx(vo);
7777
setRdbSqlClient(JDBCClient.createNonShared(vertx, mysqlClientConfig));
78-
setExecutor(new ThreadPoolExecutor(50, 50, 0, TimeUnit.MILLISECONDS,
79-
new LinkedBlockingQueue<>(10000), new DTThreadFactory("polardbAsyncExec"), new ThreadPoolExecutor.CallerRunsPolicy()));
8078
}
8179
}

postgresql/postgresql-side/postgresql-async-side/src/main/java/com/dtstack/flink/sql/side/postgresql/PostgresqlAsyncReqRow.java

-2
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,6 @@ public void open(Configuration parameters) throws Exception {
7676
vo.setWorkerPoolSize(rdbSideTableInfo.getAsyncPoolSize());
7777
Vertx vertx = Vertx.vertx(vo);
7878
setRdbSqlClient(JDBCClient.createNonShared(vertx, pgClientConfig));
79-
setExecutor(new ThreadPoolExecutor(50, 50, 0, TimeUnit.MILLISECONDS,
80-
new LinkedBlockingQueue<>(10000), new DTThreadFactory("postgresqlAsyncExec"), new ThreadPoolExecutor.CallerRunsPolicy()));
8179
}
8280

8381
}

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncReqRow.java

+10-4
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import io.vertx.ext.sql.SQLClient;
3636
import io.vertx.ext.sql.SQLConnection;
3737
import org.apache.commons.lang3.StringUtils;
38+
import org.apache.flink.configuration.Configuration;
3839
import org.apache.flink.streaming.api.functions.async.ResultFuture;
3940
import org.apache.flink.table.runtime.types.CRow;
4041
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
@@ -86,6 +87,15 @@ public class RdbAsyncReqRow extends BaseAsyncReqRow {
8687

8788
private transient ThreadPoolExecutor executor;
8889

90+
private final static int MAX_TASK_QUEUE_SIZE = 100000;
91+
92+
@Override
93+
public void open(Configuration parameters) throws Exception {
94+
super.open(parameters);
95+
executor = new ThreadPoolExecutor(MAX_DB_CONN_POOL_SIZE_LIMIT, MAX_DB_CONN_POOL_SIZE_LIMIT, 0, TimeUnit.MILLISECONDS,
96+
new LinkedBlockingQueue<>(MAX_TASK_QUEUE_SIZE), new DTThreadFactory("rdbAsyncExec"), new ThreadPoolExecutor.CallerRunsPolicy());
97+
}
98+
8999
public RdbAsyncReqRow(BaseSideInfo sideInfo) {
90100
super(sideInfo);
91101
init(sideInfo);
@@ -248,10 +258,6 @@ public void setRdbSqlClient(SQLClient rdbSqlClient) {
248258
this.rdbSqlClient = rdbSqlClient;
249259
}
250260

251-
public void setExecutor(ThreadPoolExecutor executor) {
252-
this.executor = executor;
253-
}
254-
255261
private void handleQuery(SQLConnection connection, Map<String, Object> inputParams, CRow input, ResultFuture<CRow> resultFuture){
256262
String key = buildCacheKey(inputParams);
257263
JsonArray params = new JsonArray(Lists.newArrayList(inputParams.values()));

sqlserver/sqlserver-side/sqlserver-async-side/src/main/java/com/dtstack/flink/sql/side/sqlserver/SqlserverAsyncReqRow.java

-2
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,5 @@ public void open(Configuration parameters) throws Exception {
7878
vo.setFileResolverCachingEnabled(false);
7979
Vertx vertx = Vertx.vertx(vo);
8080
setRdbSqlClient(JDBCClient.createNonShared(vertx, sqlserverClientConfig));
81-
setExecutor(new ThreadPoolExecutor(50, 50, 0, TimeUnit.MILLISECONDS,
82-
new LinkedBlockingQueue<>(10000), new DTThreadFactory("sqlServerAsyncExec"), new ThreadPoolExecutor.CallerRunsPolicy()));
8381
}
8482
}

0 commit comments

Comments
 (0)