1
1
package com .igeeksky .xtool .core .concurrent ;
2
2
3
+ import java .util .ArrayList ;
3
4
import java .util .concurrent .ExecutionException ;
4
5
import java .util .concurrent .Future ;
5
6
import java .util .concurrent .TimeUnit ;
11
12
*/
12
13
public class Futures {
13
14
15
+ /**
16
+ * 等待所有任务完成(无时间限制)
17
+ *
18
+ * @param futures 任务列表
19
+ */
20
+ public static void awaitAll (Future <?>[] futures ) {
21
+ int i = 0 , len = futures .length ;
22
+ try {
23
+ for (; i < len ; i ++) {
24
+ Future <?> future = futures [i ];
25
+ if (future != null ) {
26
+ if (future .isDone ()) {
27
+ continue ;
28
+ }
29
+ future .get ();
30
+ }
31
+ }
32
+ } catch (InterruptedException | ExecutionException e ) {
33
+ throw new ConcurrentException (e );
34
+ }
35
+ }
36
+
37
+ /**
38
+ * 等待所有任务完成(无时间限制)
39
+ *
40
+ * @param futures 任务列表
41
+ */
42
+ public static void awaitAll (ArrayList <Future <?>> futures ) {
43
+ int i = 0 , len = futures .size ();
44
+ try {
45
+ for (; i < len ; i ++) {
46
+ Future <?> future = futures .get (i );
47
+ if (future != null ) {
48
+ if (future .isDone ()) {
49
+ continue ;
50
+ }
51
+ future .get ();
52
+ }
53
+ }
54
+ } catch (InterruptedException | ExecutionException e ) {
55
+ throw new ConcurrentException (e );
56
+ }
57
+ }
58
+
59
+ /**
60
+ * 等待所有任务完成(有时间限制)
61
+ *
62
+ * @param timeout 超时时间
63
+ * @param unit 时间单位
64
+ * @param start 起始位置,从此位置开始检查任务是否已完成,如未完成,则等待给定的时间
65
+ * @param futures 任务列表
66
+ * @return 剩余未完成任务的起始位置
67
+ */
14
68
public static int awaitAll (long timeout , TimeUnit unit , int start , Future <?>[] futures ) {
15
69
int i = start , len = futures .length ;
16
70
try {
17
71
long nanos = unit .toNanos (timeout );
18
- long time = System .nanoTime ();
72
+ long endTime = System .nanoTime () + nanos ;
19
73
20
74
for (; i < len ; i ++) {
21
75
if (nanos < 0 ) {
22
76
return i ;
23
77
}
24
78
25
79
Future <?> future = futures [i ];
26
-
27
80
if (future != null ) {
28
81
if (future .isDone ()) {
29
82
continue ;
30
83
}
31
-
32
84
future .get (nanos , TimeUnit .NANOSECONDS );
85
+ nanos = endTime - System .nanoTime ();
86
+ }
87
+ }
33
88
34
- long now = System .nanoTime ();
35
- nanos -= now - time ;
36
- time = now ;
89
+ return i ;
90
+ } catch (TimeoutException e ) {
91
+ return i ;
92
+ } catch (Exception e ) {
93
+ throw new ConcurrentException (e );
94
+ }
95
+ }
96
+
97
+ /**
98
+ * 等待所有任务完成(有时间限制)
99
+ *
100
+ * @param timeout 超时时间
101
+ * @param unit 时间单位
102
+ * @param start 起始位置,从此位置开始检查任务是否已完成,如未完成,则等待给定的时间
103
+ * @param futures 任务列表
104
+ * @return 剩余未完成的任务的起始位置
105
+ */
106
+ public static int awaitAll (long timeout , TimeUnit unit , int start , ArrayList <Future <?>> futures ) {
107
+ int i = start , len = futures .size ();
108
+ try {
109
+ long nanos = unit .toNanos (timeout );
110
+ long endTime = System .nanoTime () + nanos ;
111
+
112
+ for (; i < len ; i ++) {
113
+ if (nanos < 0 ) {
114
+ return i ;
115
+ }
116
+
117
+ Future <?> future = futures .get (i );
118
+ if (future != null ) {
119
+ if (future .isDone ()) {
120
+ continue ;
121
+ }
122
+ future .get (nanos , TimeUnit .NANOSECONDS );
123
+ nanos = endTime - System .nanoTime ();
37
124
}
38
125
}
39
126
@@ -45,6 +132,54 @@ public static int awaitAll(long timeout, TimeUnit unit, int start, Future<?>[] f
45
132
}
46
133
}
47
134
135
+ /**
136
+ * 检查是否还有未完成的任务
137
+ *
138
+ * @param start 起始位置,从此位置开始检查任务是否已完成
139
+ * @param futures 任务列表
140
+ * @return 剩余未完成的任务的起始位置
141
+ */
142
+ public static int checkAll (int start , ArrayList <Future <?>> futures ) {
143
+ int i = start , len = futures .size ();
144
+ for (; i < len ; i ++) {
145
+ Future <?> future = futures .get (i );
146
+ if (future != null ) {
147
+ if (future .isDone ()) {
148
+ continue ;
149
+ }
150
+ return i ;
151
+ }
152
+ }
153
+ return i ;
154
+ }
155
+
156
+ /**
157
+ * 检查是否还有未完成的任务
158
+ *
159
+ * @param start 起始位置,从此位置开始检查任务是否已完成
160
+ * @param futures 任务列表
161
+ * @return 剩余未完成的任务的起始位置
162
+ */
163
+ public static int checkAll (int start , Future <?>[] futures ) {
164
+ int i = start , len = futures .length ;
165
+ for (; i < len ; i ++) {
166
+ Future <?> future = futures [i ];
167
+ if (future != null ) {
168
+ if (future .isDone ()) {
169
+ continue ;
170
+ }
171
+ return i ;
172
+ }
173
+ }
174
+ return i ;
175
+ }
176
+
177
+ /**
178
+ * 取消所有未完成的任务
179
+ *
180
+ * @param start 起始位置,从此位置开始取消未完成的任务
181
+ * @param futures 任务列表
182
+ */
48
183
public static void cancelAll (int start , Future <?>[] futures ) {
49
184
int i = start , len = futures .length ;
50
185
for (; i < len ; i ++) {
@@ -58,20 +193,22 @@ public static void cancelAll(int start, Future<?>[] futures) {
58
193
}
59
194
}
60
195
61
- public static void awaitAll (Future <?>[] futures ) {
62
- int i = 0 , len = futures .length ;
63
- try {
64
- for (; i < len ; i ++) {
65
- Future <?> future = futures [i ];
66
- if (future != null ) {
67
- if (future .isDone ()) {
68
- continue ;
69
- }
70
- future .get ();
196
+ /**
197
+ * 取消所有未完成的任务
198
+ *
199
+ * @param start 起始位置,从此位置开始取消未完成的任务
200
+ * @param futures 任务列表
201
+ */
202
+ public static void cancelAll (int start , ArrayList <Future <?>> futures ) {
203
+ int i = start , len = futures .size ();
204
+ for (; i < len ; i ++) {
205
+ Future <?> future = futures .get (i );
206
+ if (future != null ) {
207
+ if (future .isDone ()) {
208
+ continue ;
71
209
}
210
+ future .cancel (true );
72
211
}
73
- } catch (InterruptedException | ExecutionException e ) {
74
- throw new ConcurrentException (e );
75
212
}
76
213
}
77
214
0 commit comments