Skip to content

Commit d593400

Browse files
committed
并发编程 ForkJoin框架
1 parent 0cf36cf commit d593400

File tree

3 files changed

+182
-0
lines changed

3 files changed

+182
-0
lines changed
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package com.xiaolyuh;
2+
3+
import java.util.Objects;
4+
import java.util.concurrent.ForkJoinPool;
5+
import java.util.concurrent.RecursiveTask;
6+
7+
/**
8+
* 计算1+2+3+...+n的值
9+
* 使用同步执行的方式
10+
*
11+
* @author yuhao.wang3
12+
* @since 2019/6/25 17:07
13+
*/
14+
public class ForkJoinCountTask extends RecursiveTask<Long> {
15+
16+
/**
17+
* 阀值
18+
*/
19+
private int threshold = 10;
20+
21+
/**
22+
* 任务的开始值
23+
*/
24+
private long start;
25+
26+
/**
27+
* 任务的结束值
28+
*/
29+
private long end;
30+
31+
public ForkJoinCountTask(long start, long end) {
32+
this.start = start;
33+
this.end = end;
34+
}
35+
36+
@Override
37+
protected Long compute() {
38+
if (end - start <= threshold) {
39+
long count = 0;
40+
for (int i = 0; i <= end - start; i++) {
41+
count = count + start + i;
42+
}
43+
return count;
44+
} else {
45+
// 如果任务大于阈值,就分裂成三个子任务计算
46+
long slip = (end - start) / 3;
47+
ForkJoinCountTask oneTask = new ForkJoinCountTask(start, start + slip);
48+
ForkJoinCountTask twoTask = new ForkJoinCountTask(start + slip + 1, start + slip * 2);
49+
ForkJoinCountTask threeTask = new ForkJoinCountTask(start + slip * 2 + 1, end);
50+
// 提交子任务到框架去执行
51+
invokeAll(oneTask, twoTask, threeTask);
52+
// 等待子任务执行完,得到其结果,并合并子任务
53+
return oneTask.join() + twoTask.join() + threeTask.join();
54+
}
55+
}
56+
57+
public static void main(String[] args) {
58+
long start = System.currentTimeMillis();
59+
ForkJoinPool pool = new ForkJoinPool();
60+
// 生成一个计算任务,负责计算1+2+3+n
61+
ForkJoinCountTask countTask = new ForkJoinCountTask(1, 1000000);
62+
// 执行一个任务(同步执行,任务会阻塞在这里直到任务执行完成)
63+
pool.invoke(countTask);
64+
// 异常检查
65+
if (countTask.isCompletedAbnormally()) {
66+
Throwable throwable = countTask.getException();
67+
if (Objects.nonNull(throwable)) {
68+
System.out.println(throwable.getMessage());
69+
}
70+
}
71+
// join方法是一个阻塞方法,会等待任务执行完成
72+
System.out.println("计算为:" + countTask.join() + ", 耗时:" + (System.currentTimeMillis() - start) + "毫秒");
73+
}
74+
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package com.xiaolyuh;
2+
3+
import java.io.File;
4+
import java.util.ArrayList;
5+
import java.util.List;
6+
import java.util.Objects;
7+
import java.util.concurrent.ForkJoinPool;
8+
import java.util.concurrent.RecursiveAction;
9+
10+
/**
11+
* 搜索指定目录下的指定文件
12+
* 使用异步执行的方式
13+
*
14+
* @author yuhao.wang3
15+
* @since 2019/6/25 17:07
16+
*/
17+
public class ForkJoinSearchFileTask extends RecursiveAction {
18+
19+
/**
20+
* 指定目录
21+
*/
22+
private File file;
23+
24+
/**
25+
* 文件后缀
26+
*/
27+
private String suffix;
28+
29+
public ForkJoinSearchFileTask(File file, String suffix) {
30+
this.file = file;
31+
this.suffix = suffix;
32+
}
33+
34+
@Override
35+
protected void compute() {
36+
if (Objects.isNull(file)) {
37+
return;
38+
}
39+
40+
File[] files = file.listFiles();
41+
List<ForkJoinSearchFileTask> fileTasks = new ArrayList<>();
42+
if (Objects.nonNull(files)) {
43+
for (File f : files) {
44+
// 拆分任务
45+
if (f.isDirectory()) {
46+
fileTasks.add(new ForkJoinSearchFileTask(f, suffix));
47+
} else {
48+
if (f.getAbsolutePath().endsWith(suffix)) {
49+
System.out.println("文件: " + f.getAbsolutePath());
50+
}
51+
}
52+
}
53+
// 提交并执行任务
54+
invokeAll(fileTasks);
55+
for (ForkJoinSearchFileTask fileTask : fileTasks) {
56+
// 等待任务执行完成
57+
fileTask.join();
58+
}
59+
}
60+
}
61+
62+
public static void main(String[] args) throws Exception {
63+
File file = new File("d:/");
64+
ForkJoinPool pool = new ForkJoinPool();
65+
// 生成一个计算任务,负责查找指定木目录
66+
ForkJoinSearchFileTask searchFileTask = new ForkJoinSearchFileTask(file, ".txt");
67+
// 异步执行一个任务
68+
pool.execute(searchFileTask);
69+
70+
Thread.sleep(10);
71+
72+
// 做另外的事情
73+
int count = 0;
74+
for (int i = 0; i < 1000; i++) {
75+
count += i;
76+
}
77+
System.out.println("计算任务:" + count);
78+
79+
// join方法是一个阻塞方法,会等待任务执行完成
80+
searchFileTask.join();
81+
}
82+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package com.xiaolyuh;
2+
3+
/**
4+
* 守护线程
5+
*
6+
* @author yuhao.wang3
7+
*/
8+
public class JoinThread {
9+
10+
public static void main(String[] args) throws InterruptedException {
11+
Thread thread1 = new Thread(() -> System.out.println("线程1 " + Thread.currentThread().getName() + "执行完了"));
12+
13+
Thread thread2 = new Thread(() -> {
14+
System.out.println("线程2 " + Thread.currentThread().getName() + " 开始执行");
15+
try {
16+
thread1.join();
17+
} catch (InterruptedException e) {
18+
e.printStackTrace();
19+
}
20+
System.out.println("线程2 " + Thread.currentThread().getName() + "执行完了");
21+
});
22+
// thread.setDaemon(true);
23+
// thread.start();
24+
Thread.sleep(500);
25+
}
26+
}

0 commit comments

Comments
 (0)