Skip to content

Parallel Streams #153

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 22, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
187 changes: 185 additions & 2 deletions docs/book/24-Concurrent-Programming.md
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ Java 8 CompletableFuture是一个更好的解决方案:它允许您将操作
<!-- Parallel Streams -->
## 并行流

Java 8流的一个显着优点是,在某些情况下,它们可以很容易地并行化。这来自仔细的库设计,特别是流使用内部迭代的方式 - 也就是说,它们控制着自己的迭代器。特别是,它们使用一种特殊的迭代器,称为Spliterator,它被限制为易于自动分类。这产生了相当神奇的结果,只能说.parallel(),并且你的流中的所有东西都是作为一组并行任务运行的。如果您的代码是使用Streams编写的,那么并行化以提高速度似乎微不足道。
Java 8流的一个显着优点是,在某些情况下,它们可以很容易地并行化。这来自仔细的库设计,特别是流使用内部迭代的方式 - 也就是说,它们控制着自己的迭代器。特别是,他们使用一种特殊的迭代器,称为Spliterator,它被限制为易于自动分割。这产生了相当神奇的结果,即能够简单用parallel()然后流中的所有内容都作为一组并行任务运行。如果您的代码是使用Streams编写的,那么并行化以提高速度似乎是一种琐事

例如,考虑来自Streams的Prime.java。查找质数可能是一个耗时的过程,我们可以看到该程序的计时:

Expand Down Expand Up @@ -317,7 +317,190 @@ public class ParallelPrime {
*/
```

请注意,这不是微基准测试,因为我们计时整个程序。我们将数据保存在磁盘上以防止激进的优化;如果我们没有对结果做任何事情,那么一个狡猾的编译器可能会观察到程序没有意义并且消除了计算(这不太可能,但并非不可能)。请注意使用nio2库编写文件的简单性(在[文件](./17-Files.md)一章中有描述)。
请注意,这不是微基准测试,因为我们计时整个程序。我们将数据保存在磁盘上以防止过激的优化;如果我们没有对结果做任何事情,那么一个高级的编译器可能会观察到程序没有意义并且消除了计算(这不太可能,但并非不可能)。请注意使用nio2库编写文件的简单性(在[文件](./17-Files.md)一章中有描述)。

当我注释掉[1] parallel()行时,我的结果大约是parallel()的三倍。

并行流似乎是一个甜蜜的交易。您所需要做的就是将编程问题转换为流,然后插入parallel()以加快速度。实际上,有时候这很容易。但遗憾的是,有许多陷阱。

- parallel()不是灵丹妙药

作为对流和并行流的不确定性的探索,让我们看一个看似简单的问题:求和数字的增量序列。事实证明这是一个令人惊讶的数量,并且我将冒险将它们进行比较 - 试图小心,但承认我可能会在计时代码执行时遇到许多基本陷阱之一。结果可能有一些缺陷(例如JVM没有“升温”),但我认为它仍然提供了一些有用的指示。

我将从一个计时方法rigorously 开始,它采用**LongSupplier**,测量**getAsLong()**调用的长度,将结果与**checkValue**进行比较并显示结果。

请注意,一切都必须严格使用**long**;我花了一些时间发现隐蔽的溢出,然后才意识到在重要的地方错过了**long**。

所有关于时间和内存的数字和讨论都是指“我的机器”。你的经历可能会有所不同。

```java
// concurrent/Summing.java
import java.util.stream.*;
import java.util.function.*;
import onjava.Timer;
public class Summing {
static void timeTest(String id, long checkValue, LongSupplier operation){
System.out.print(id + ": ");
Timer timer = newTimer();
long result = operation.getAsLong();
if(result == checkValue)
System.out.println(timer.duration() + "ms");
else
System.out.format("result: %d%ncheckValue: %d%n", result, checkValue);
}
public static final int SZ = 100_000_000;// This even works://
public static final int SZ = 1_000_000_000;
public static final long CHECK = (long)SZ * ((long)SZ + 1)/2; // Gauss's formula
public static void main(String[] args){
System.out.println(CHECK);
timeTest("Sum Stream", CHECK, () ->
LongStream.rangeClosed(0, SZ).sum());
timeTest("Sum Stream Parallel", CHECK, () ->
LongStream.rangeClosed(0, SZ).parallel().sum());
timeTest("Sum Iterated", CHECK, () ->
LongStream.iterate(0, i -> i + 1)
.limit(SZ+1).sum());
// Slower & runs out of memory above 1_000_000:
// timeTest("Sum Iterated Parallel", CHECK, () ->
// LongStream.iterate(0, i -> i + 1)
// .parallel()
// .limit(SZ+1).sum());
}
}
/* Output:5000000050000000
Sum Stream: 167ms
Sum Stream Parallel: 46ms
Sum Iterated: 284ms
*/
```

**CHECK**值是使用Carl Friedrich Gauss在1700年代后期仍在小学时创建的公式计算出来的.

**main()** 的第一个版本使用直接生成 **Stream** 并调用 **sum()** 的方法。我们看到流的好处在于十亿分之一的SZ在没有溢出的情况下处理(我使用较小的数字,因此程序运行时间不长)。使用 **parallel()** 的基本范围操跟快。

如果使用**iterate()**来生成序列,则减速是戏剧性的,可能是因为每次生成数字时都必须调用lambda。但是如果我们尝试并行化,那么结果通常比非并行版本花费的时间更长,但是当**SZ**超过一百万时,它也会耗尽内存(在某些机器上)。当然,当你可以使用**range()**时,你不会使用**iterate()**,但如果你生成的东西不是简单的序列,你必须使用**iterate()**。应用**parallel()**是一个合理的尝试,但会产生令人惊讶的结果。我们将在后面的部分中探讨内存限制的原因,但我们可以对流并行算法进行初步观察:

- 流并行性将输入数据分成多个部分,因此算法可以应用于那些单独的部分。
- 阵列分割成本低廉,均匀且具有完美的分裂知识。
- 链接列表没有这些属性;“拆分”一个链表仅仅意味着把它分成“第一元素”和“其余列表”,这相对无用。
- 无状态生成器的行为类似于数组;使用上述范围是无可争议的。
- 迭代生成器的行为类似于链表; **iterate()** 是一个迭代生成器。

现在让我们尝试通过在数组中填充值来填充数组来解决问题。因为数组只分配了一次,所以我们不太可能遇到垃圾收集时序问题。

首先我们将尝试一个充满原始**long**的数组:

```java
// concurrent/Summing2.java
// {ExcludeFromTravisCI}import java.util.*;
public class Summing2 {
static long basicSum(long[] ia) {
long sum = 0;
int size = ia.length;
for(int i = 0; i < size; i++)
sum += ia[i];return sum;
}
// Approximate largest value of SZ before
// running out of memory on mymachine:
public static final int SZ = 20_000_000;
public static final long CHECK = (long)SZ * ((long)SZ + 1)/2;
public static void main(String[] args) {
System.out.println(CHECK);
long[] la = newlong[SZ+1];
Arrays.parallelSetAll(la, i -> i);
Summing.timeTest("Array Stream Sum", CHECK, () ->
Arrays.stream(la).sum());
Summing.timeTest("Parallel", CHECK, () ->
Arrays.stream(la).parallel().sum());
Summing.timeTest("Basic Sum", CHECK, () ->
basicSum(la));// Destructive summation:
Summing.timeTest("parallelPrefix", CHECK, () -> {
Arrays.parallelPrefix(la, Long::sum)
return la[la.length - 1];
});
}
}
/* Output:200000010000000
Array Stream
Sum: 104ms
Parallel: 81ms
Basic Sum: 106ms
parallelPrefix: 265ms
*/
```

第一个限制是内存大小;因为数组是预先分配的,所以我们不能创建几乎与以前版本一样大的任何东西。并行化可以加快速度,甚至比使用 **basicSum()** 循环更快。有趣的是, **Arrays.parallelPrefix()** 似乎实际上减慢了速度。但是,这些技术中的任何一种在其他条件下都可能更有用 - 这就是为什么你不能做出任何确定性的声明,除了“你必须尝试一下”。”

最后,考虑使用盒装**Long**的效果:

```java
// concurrent/Summing3.java
// {ExcludeFromTravisCI}
import java.util.*;
public class Summing3 {
static long basicSum(Long[] ia) {
long sum = 0;
int size = ia.length;
for(int i = 0; i < size; i++)
sum += ia[i];
return sum;
}
// Approximate largest value of SZ before
// running out of memory on my machine:
public static final int SZ = 10_000_000;
public static final long CHECK = (long)SZ * ((long)SZ + 1)/2;
public static void main(String[] args) {
System.out.println(CHECK);
Long[] aL = newLong[SZ+1];
Arrays.parallelSetAll(aL, i -> (long)i);
Summing.timeTest("Long Array Stream Reduce", CHECK, () ->
Arrays.stream(aL).reduce(0L, Long::sum));
Summing.timeTest("Long Basic Sum", CHECK, () ->
basicSum(aL));
// Destructive summation:
Summing.timeTest("Long parallelPrefix",CHECK, ()-> {
Arrays.parallelPrefix(aL, Long::sum);
return aL[aL.length - 1];
});
}
}
/* Output:50000005000000
Long Array
Stream Reduce: 1038ms
Long Basic
Sum: 21ms
Long parallelPrefix: 3616ms
*/
```

现在可用的内存量大约减半,并且所有情况下所需的时间都会很长,除了**basicSum()**,它只是循环遍历数组。令人惊讶的是, **Arrays.parallelPrefix()** 比任何其他方法都要花费更长的时间。

我将 **parallel()** 版本分开了,因为在上面的程序中运行它导致了一个冗长的垃圾收集,扭曲了结果:

```java
// concurrent/Summing4.java
// {ExcludeFromTravisCI}
import java.util.*;
public class Summing4 {
public static void main(String[] args) {
System.out.println(Summing3.CHECK);
Long[] aL = newLong[Summing3.SZ+1];
Arrays.parallelSetAll(aL, i -> (long)i);
Summing.timeTest("Long Parallel",
Summing3.CHECK, () ->
Arrays.stream(aL)
.parallel()
.reduce(0L,Long::sum));
}
}
/* Output:50000005000000
Long Parallel: 1014ms
*/
```

它比非parallel()版本略快,但并不显着。

这种时间增加的一个重要原因是处理器内存缓存。使用**Summing2.java**中的原始**long**,数组**la**是连续的内存。处理器可以更容易地预测该阵列的使用,并使缓存充满下一个需要的阵列元素。访问缓存比访问主内存快得多。似乎 **Long parallelPrefix** 计算受到影响,因为它为每个计算读取两个数组元素,并将结果写回到数组中,并且每个都为**Long**生成一个超出缓存的引用。

<!-- Creating and Running Tasks -->
## 创建和运行任务
Expand Down