22
22
23
23
import com .dtstack .flink .sql .enums .ECacheContentType ;
24
24
import com .dtstack .flink .sql .enums .ECacheType ;
25
+ import com .dtstack .flink .sql .factory .DTThreadFactory ;
25
26
import com .dtstack .flink .sql .metric .MetricConstant ;
26
27
import com .dtstack .flink .sql .side .cache .AbstractSideCache ;
27
28
import com .dtstack .flink .sql .side .cache .CacheObj ;
@@ -68,6 +69,7 @@ public abstract class BaseAsyncReqRow extends RichAsyncFunction<CRow, CRow> impl
68
69
private int timeOutNum = 0 ;
69
70
protected BaseSideInfo sideInfo ;
70
71
protected transient Counter parseErrorRecords ;
72
+ private transient ThreadPoolExecutor cancelExecutor ;
71
73
72
74
public BaseAsyncReqRow (BaseSideInfo sideInfo ){
73
75
this .sideInfo = sideInfo ;
@@ -82,6 +84,8 @@ public void open(Configuration parameters) throws Exception {
82
84
super .open (parameters );
83
85
initCache ();
84
86
initMetric ();
87
+ cancelExecutor = new ThreadPoolExecutor (1 , 1 , 0 , TimeUnit .MILLISECONDS , new LinkedBlockingQueue <>(100000 ),
88
+ new DTThreadFactory ("cancel-timer-executor" ));
85
89
LOG .info ("async dim table config info: {} " , sideInfo .getSideTableInfo ().toString ());
86
90
}
87
91
@@ -248,12 +252,11 @@ public void onProcessingTime(long timestamp) throws Exception {
248
252
}
249
253
250
254
protected void cancelTimerWhenComplete (ResultFuture <CRow > resultFuture , ScheduledFuture <?> timerFuture ){
251
- ThreadPoolExecutor executors = new ThreadPoolExecutor (1 , 1 ,0L , TimeUnit .MILLISECONDS , new LinkedBlockingQueue <Runnable >());
252
255
if (resultFuture instanceof StreamRecordQueueEntry ){
253
256
StreamRecordQueueEntry streamRecordBufferEntry = (StreamRecordQueueEntry ) resultFuture ;
254
257
streamRecordBufferEntry .onComplete ((Object value ) -> {
255
258
timerFuture .cancel (true );
256
- },executors );
259
+ }, cancelExecutor );
257
260
}
258
261
}
259
262
0 commit comments