diff --git a/docs/book/24-Concurrent-Programming.md b/docs/book/24-Concurrent-Programming.md index 06856a79..b69811d5 100644 --- a/docs/book/24-Concurrent-Programming.md +++ b/docs/book/24-Concurrent-Programming.md @@ -279,7 +279,7 @@ Java 8 CompletableFuture是一个更好的解决方案:它允许您将操作 ## 并行流 -Java 8流的一个显着优点是,在某些情况下,它们可以很容易地并行化。这来自仔细的库设计,特别是流使用内部迭代的方式 - 也就是说,它们控制着自己的迭代器。特别是,它们使用一种特殊的迭代器,称为Spliterator,它被限制为易于自动分类。这产生了相当神奇的结果,只能说.parallel(),并且你的流中的所有东西都是作为一组并行任务运行的。如果您的代码是使用Streams编写的,那么并行化以提高速度似乎微不足道。 +Java 8流的一个显着优点是,在某些情况下,它们可以很容易地并行化。这来自仔细的库设计,特别是流使用内部迭代的方式 - 也就是说,它们控制着自己的迭代器。特别是,他们使用一种特殊的迭代器,称为Spliterator,它被限制为易于自动分割。这产生了相当神奇的结果,即能够简单用parallel()然后流中的所有内容都作为一组并行任务运行。如果您的代码是使用Streams编写的,那么并行化以提高速度似乎是一种琐事 例如,考虑来自Streams的Prime.java。查找质数可能是一个耗时的过程,我们可以看到该程序的计时: @@ -317,7 +317,190 @@ public class ParallelPrime { */ ``` -请注意,这不是微基准测试,因为我们计时整个程序。我们将数据保存在磁盘上以防止激进的优化;如果我们没有对结果做任何事情,那么一个狡猾的编译器可能会观察到程序没有意义并且消除了计算(这不太可能,但并非不可能)。请注意使用nio2库编写文件的简单性(在[文件](./17-Files.md)一章中有描述)。 +请注意,这不是微基准测试,因为我们计时整个程序。我们将数据保存在磁盘上以防止过激的优化;如果我们没有对结果做任何事情,那么一个高级的编译器可能会观察到程序没有意义并且消除了计算(这不太可能,但并非不可能)。请注意使用nio2库编写文件的简单性(在[文件](./17-Files.md)一章中有描述)。 + +当我注释掉[1] parallel()行时,我的结果大约是parallel()的三倍。 + +并行流似乎是一个甜蜜的交易。您所需要做的就是将编程问题转换为流,然后插入parallel()以加快速度。实际上,有时候这很容易。但遗憾的是,有许多陷阱。 + +- parallel()不是灵丹妙药 + +作为对流和并行流的不确定性的探索,让我们看一个看似简单的问题:求和数字的增量序列。事实证明这是一个令人惊讶的数量,并且我将冒险将它们进行比较 - 试图小心,但承认我可能会在计时代码执行时遇到许多基本陷阱之一。结果可能有一些缺陷(例如JVM没有“升温”),但我认为它仍然提供了一些有用的指示。 + +我将从一个计时方法rigorously 开始,它采用**LongSupplier**,测量**getAsLong()**调用的长度,将结果与**checkValue**进行比较并显示结果。 + +请注意,一切都必须严格使用**long**;我花了一些时间发现隐蔽的溢出,然后才意识到在重要的地方错过了**long**。 + +所有关于时间和内存的数字和讨论都是指“我的机器”。你的经历可能会有所不同。 + +```java +// concurrent/Summing.java +import java.util.stream.*; +import java.util.function.*; +import onjava.Timer; +public class Summing { + static void timeTest(String id, long checkValue, LongSupplier operation){ + System.out.print(id + ": "); + Timer timer = newTimer(); + long result = operation.getAsLong(); + if(result == checkValue) + System.out.println(timer.duration() + "ms"); + else + System.out.format("result: %d%ncheckValue: %d%n", result, checkValue); + } + public static final int SZ = 100_000_000;// This even works:// + public static final int SZ = 1_000_000_000; + public static final long CHECK = (long)SZ * ((long)SZ + 1)/2; // Gauss's formula + public static void main(String[] args){ + System.out.println(CHECK); + timeTest("Sum Stream", CHECK, () -> + LongStream.rangeClosed(0, SZ).sum()); + timeTest("Sum Stream Parallel", CHECK, () -> + LongStream.rangeClosed(0, SZ).parallel().sum()); + timeTest("Sum Iterated", CHECK, () -> + LongStream.iterate(0, i -> i + 1) + .limit(SZ+1).sum()); + // Slower & runs out of memory above 1_000_000: + // timeTest("Sum Iterated Parallel", CHECK, () -> + // LongStream.iterate(0, i -> i + 1) + // .parallel() + // .limit(SZ+1).sum()); + } +} +/* Output:5000000050000000 +Sum Stream: 167ms +Sum Stream Parallel: 46ms +Sum Iterated: 284ms +*/ +``` + +**CHECK**值是使用Carl Friedrich Gauss在1700年代后期仍在小学时创建的公式计算出来的. + + **main()** 的第一个版本使用直接生成 **Stream** 并调用 **sum()** 的方法。我们看到流的好处在于十亿分之一的SZ在没有溢出的情况下处理(我使用较小的数字,因此程序运行时间不长)。使用 **parallel()** 的基本范围操跟快。 + +如果使用**iterate()**来生成序列,则减速是戏剧性的,可能是因为每次生成数字时都必须调用lambda。但是如果我们尝试并行化,那么结果通常比非并行版本花费的时间更长,但是当**SZ**超过一百万时,它也会耗尽内存(在某些机器上)。当然,当你可以使用**range()**时,你不会使用**iterate()**,但如果你生成的东西不是简单的序列,你必须使用**iterate()**。应用**parallel()**是一个合理的尝试,但会产生令人惊讶的结果。我们将在后面的部分中探讨内存限制的原因,但我们可以对流并行算法进行初步观察: + +- 流并行性将输入数据分成多个部分,因此算法可以应用于那些单独的部分。 +- 阵列分割成本低廉,均匀且具有完美的分裂知识。 +- 链接列表没有这些属性;“拆分”一个链表仅仅意味着把它分成“第一元素”和“其余列表”,这相对无用。 +- 无状态生成器的行为类似于数组;使用上述范围是无可争议的。 +- 迭代生成器的行为类似于链表; **iterate()** 是一个迭代生成器。 + +现在让我们尝试通过在数组中填充值来填充数组来解决问题。因为数组只分配了一次,所以我们不太可能遇到垃圾收集时序问题。 + +首先我们将尝试一个充满原始**long**的数组: + +```java +// concurrent/Summing2.java +// {ExcludeFromTravisCI}import java.util.*; +public class Summing2 { + static long basicSum(long[] ia) { + long sum = 0; + int size = ia.length; + for(int i = 0; i < size; i++) + sum += ia[i];return sum; + } + // Approximate largest value of SZ before + // running out of memory on mymachine: + public static final int SZ = 20_000_000; + public static final long CHECK = (long)SZ * ((long)SZ + 1)/2; + public static void main(String[] args) { + System.out.println(CHECK); + long[] la = newlong[SZ+1]; + Arrays.parallelSetAll(la, i -> i); + Summing.timeTest("Array Stream Sum", CHECK, () -> + Arrays.stream(la).sum()); + Summing.timeTest("Parallel", CHECK, () -> + Arrays.stream(la).parallel().sum()); + Summing.timeTest("Basic Sum", CHECK, () -> + basicSum(la));// Destructive summation: + Summing.timeTest("parallelPrefix", CHECK, () -> { + Arrays.parallelPrefix(la, Long::sum) + return la[la.length - 1]; + }); + } +} +/* Output:200000010000000 +Array Stream +Sum: 104ms +Parallel: 81ms +Basic Sum: 106ms +parallelPrefix: 265ms +*/ +``` + +第一个限制是内存大小;因为数组是预先分配的,所以我们不能创建几乎与以前版本一样大的任何东西。并行化可以加快速度,甚至比使用 **basicSum()** 循环更快。有趣的是, **Arrays.parallelPrefix()** 似乎实际上减慢了速度。但是,这些技术中的任何一种在其他条件下都可能更有用 - 这就是为什么你不能做出任何确定性的声明,除了“你必须尝试一下”。” + +最后,考虑使用盒装**Long**的效果: + +```java +// concurrent/Summing3.java +// {ExcludeFromTravisCI} +import java.util.*; +public class Summing3 { + static long basicSum(Long[] ia) { + long sum = 0; + int size = ia.length; + for(int i = 0; i < size; i++) + sum += ia[i]; + return sum; + } + // Approximate largest value of SZ before + // running out of memory on my machine: + public static final int SZ = 10_000_000; + public static final long CHECK = (long)SZ * ((long)SZ + 1)/2; + public static void main(String[] args) { + System.out.println(CHECK); + Long[] aL = newLong[SZ+1]; + Arrays.parallelSetAll(aL, i -> (long)i); + Summing.timeTest("Long Array Stream Reduce", CHECK, () -> + Arrays.stream(aL).reduce(0L, Long::sum)); + Summing.timeTest("Long Basic Sum", CHECK, () -> + basicSum(aL)); + // Destructive summation: + Summing.timeTest("Long parallelPrefix",CHECK, ()-> { + Arrays.parallelPrefix(aL, Long::sum); + return aL[aL.length - 1]; + }); + } +} +/* Output:50000005000000 +Long Array +Stream Reduce: 1038ms +Long Basic +Sum: 21ms +Long parallelPrefix: 3616ms +*/ +``` + +现在可用的内存量大约减半,并且所有情况下所需的时间都会很长,除了**basicSum()**,它只是循环遍历数组。令人惊讶的是, **Arrays.parallelPrefix()** 比任何其他方法都要花费更长的时间。 + +我将 **parallel()** 版本分开了,因为在上面的程序中运行它导致了一个冗长的垃圾收集,扭曲了结果: + +```java +// concurrent/Summing4.java +// {ExcludeFromTravisCI} +import java.util.*; +public class Summing4 { + public static void main(String[] args) { + System.out.println(Summing3.CHECK); + Long[] aL = newLong[Summing3.SZ+1]; + Arrays.parallelSetAll(aL, i -> (long)i); + Summing.timeTest("Long Parallel", + Summing3.CHECK, () -> + Arrays.stream(aL) + .parallel() + .reduce(0L,Long::sum)); + } +} +/* Output:50000005000000 +Long Parallel: 1014ms +*/ +``` + +它比非parallel()版本略快,但并不显着。 + +这种时间增加的一个重要原因是处理器内存缓存。使用**Summing2.java**中的原始**long**,数组**la**是连续的内存。处理器可以更容易地预测该阵列的使用,并使缓存充满下一个需要的阵列元素。访问缓存比访问主内存快得多。似乎 **Long parallelPrefix** 计算受到影响,因为它为每个计算读取两个数组元素,并将结果写回到数组中,并且每个都为**Long**生成一个超出缓存的引用。 ## 创建和运行任务