@@ -396,7 +396,7 @@ Sum Iterated: 284ms
396
396
397
397
** main()** 的第一个版本使用直接生成 ** Stream** 并调用 ** sum()** 的方法。我们看到流的好处在于即使SZ为十亿(1_000_000_000)程序也可以很好地处理而没有溢出(为了让程序运行得快一点,我使用了较小的数字)。使用 ** parallel()** 的基本范围操作明显更快。
398
398
399
- 如果使用** iterate()** 来生成序列,则减速是相当明显的,可能是因为每次生成数字时都必须调用lambda。但是如果我们尝试并行化,当** SZ** 超过一百万时,结果不仅比非并行版本花费的时间更长,而且也会耗尽内存(在某些机器上)。当然,当你可以使用** range()** 时,你不会使用** iterate()** ,但如果你生成的东西不是简单的序列,你必须使用** iterate()** 。应用** parallel()** 是一个合理的尝试,但会产生令人惊讶的结果。我们将在后面的部分中探讨内存限制的原因,但我们可以对流并行算法进行初步观察:
399
+ 如果使用** iterate()** 来生成序列,则减速是相当明显的,可能是因为每次生成数字时都必须调用lambda。但是如果我们尝试并行化,当** SZ** 超过一百万时,结果不仅比非并行版本花费的时间更长,而且也会耗尽内存(在某些机器上)。当然,当你可以使用** range()** 时,你不会使用** iterate()** ,但如果你生成的东西不是简单的序列,你必须使用** iterate()** 。应用** parallel()** 是一个合理的尝试,但会产生令人惊讶的结果。我们将在后面的部分中探讨内存限制的原因,但我们可以对流并行算法进行初步观察:
400
400
401
401
- 流并行性将输入数据分成多个部分,因此算法可以应用于那些单独的部分。
402
402
- 数组分割成本低,分割均匀且对分割的大小有着完美的掌控。
@@ -554,9 +554,9 @@ Long Parallel: 1008ms**
554
554
555
555
- parallel()/limit()交点
556
556
557
- 使用** parallel()** 时会有更复杂的问题。从其他语言中吸取的流机制被设计为大约是一个无限的流模型。如果你拥有有限数量的元素,则可以使用集合以及为有限大小的集合设计的关联算法。如果你使用无限流,则使用针对流优化的算法。
557
+ 使用 ** parallel()** 时会有更复杂的问题。从其他语言中吸取的流机制被设计为大约是一个无限的流模型。如果你拥有有限数量的元素,则可以使用集合以及为有限大小的集合设计的关联算法。如果你使用无限流,则使用针对流优化的算法。
558
558
559
- Java 8将两者合并起来。例如,** Collections** 没有内置的** map()** 操作。在** Collection** 和** Map** 中唯一类似流的批处理操作是** forEach()** 。如果要执行** map()** 和** reduce()** 等操作,必须首先将** Collection** 转换为存在这些操作的** Stream** :
559
+ Java 8将两者合并起来。例如,** Collections** 没有内置的** map()** 操作。在** Collection** 和** Map** 中唯一类似流的批处理操作是** forEach()** 。如果要执行** map()** 和** reduce()** 等操作,必须首先将** Collection** 转换为存在这些操作的** Stream** :
560
560
561
561
``` java
562
562
// concurrent/CollectionIntoStream.java
@@ -595,9 +595,9 @@ bynxt
595
595
:PENCUXGVGINNLOZVEWPPCPOALJLNXT
596
596
```
597
597
598
- ** Collection** 确实有一些批处理操作,如** removeAll()** ,** removeIf()** 和** retainAll()** ,但这些都是破坏性的操作。** ConcurrentHashMap** 对** forEach** 和** reduce** 操作有特别广泛的支持。
598
+ ** Collection** 确实有一些批处理操作,如** removeAll()** ,** removeIf()** 和** retainAll()** ,但这些都是破坏性的操作。** ConcurrentHashMap** 对** forEach** 和** reduce** 操作有特别广泛的支持。
599
599
600
- 在许多情况下,只在集合上调用** stream()** 或者** parallelStream()** 没有问题。但是,有时将** Stream** 与** Collection** 混合会产生意想不到的结果。这是一个有趣的难题:
600
+ 在许多情况下,只在集合上调用** stream()** 或者** parallelStream()** 没有问题。但是,有时将** Stream** 与** Collection** 混合会产生意想不到的结果。这是一个有趣的难题:
601
601
602
602
``` java
603
603
// concurrent/ParallelStreamPuzzle.java
@@ -672,11 +672,12 @@ public class ParallelStreamPuzzle2 {
672
672
[0 , 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 ]
673
673
```
674
674
675
- current是使用线程安全的 ** AtomicInteger ** 类定义的,可以防止竞争条件;** parallel()** 允许多个线程调用** get()** 。
675
+ current是使用线程安全的 ** AtomicInteger ** 类定义的,可以防止竞争条件;** parallel()** 允许多个线程调用** get()** 。
676
676
677
- 在查看 ** PSP2 . txt** . ** IntGenerator . get()** 被调用1024 次时,你可能会感到惊讶。
677
+ 在查看 ** PSP2 . txt** . ** IntGenerator . get()** 被调用1024 次时,你可能会感到惊讶。
678
678
679
- ** 0 : main
679
+ ```
680
+ 0 : main
680
681
1 : ForkJoinPool . commonPool- worker- 1
681
682
2 : ForkJoinPool . commonPool- worker- 2
682
683
3 : ForkJoinPool . commonPool- worker- 2
@@ -698,7 +699,8 @@ current是使用线程安全的 **AtomicInteger** 类定义的,可以防止竞
698
699
20 : ForkJoinPool . commonPool- worker- 110
699
700
21 : ForkJoinPool . commonPool- worker- 110
700
701
22 : ForkJoinPool . commonPool- worker- 110
701
- 23 : ForkJoinPool . commonPool- worker- 1 **
702
+ 23 : ForkJoinPool . commonPool- worker- 1
703
+ ```
702
704
703
705
这个块大小似乎是内部实现的一部分(尝试使用`limit()` 的不同参数来查看不同的块大小)。将`parallel()`与`limit()`结合使用可以预取一串值,作为流输出。
704
706
@@ -741,17 +743,17 @@ public class ParallelStreamPuzzle3 {
741
743
[0 , 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 ]
742
744
```
743
745
744
- 为了表明** parallel ()**确实有效,我添加了一个对**peek ()**的调用,这是一个主要用于调试的流函数:它从流中提取一个值并执行某些操作但不影响从流向下传递的元素。注意这会干扰线程行为,但我只是尝试在这里做一些事情,而不是实际调试任何东西。
746
+ 为了表明** parallel ()**确实有效,我添加了一个对**peek ()** 的调用,这是一个主要用于调试的流函数:它从流中提取一个值并执行某些操作但不影响从流向下传递的元素。注意这会干扰线程行为,但我只是尝试在这里做一些事情,而不是实际调试任何东西。
745
747
746
- 你还可以看到**boxed ()**的添加,它接受**int**流并将其转换为**Integer**流。
748
+ 你还可以看到**boxed ()** 的添加,它接受**int**流并将其转换为**Integer**流。
747
749
748
750
现在我们得到多个线程产生不同的值,但它只产生10个请求的值,而不是1024个产生10个值。
749
751
750
- 它更快吗?一个更好的问题是:什么时候开始有意义?当然不是这么小的一套;上下文切换的代价远远超过并行性的任何加速。很难想象什么时候用并行生成一个简单的数字序列会有意义。如果你要生成的东西需要很高的成本,它可能有意义 - 但这都是猜测。只有通过测试我们才能知道用并行是否有效。记住这句格言:“首先使它工作,然后使它更快地工作 - 只有当你必须这样做时。”将**parallel ()**和**limit ()**结合使用仅供专家操作(把话说在前面,我不认为自己是这里的专家)。
752
+ 它更快吗?一个更好的问题是:什么时候开始有意义?当然不是这么小的一套;上下文切换的代价远远超过并行性的任何加速。很难想象什么时候用并行生成一个简单的数字序列会有意义。如果你要生成的东西需要很高的成本,它可能有意义 - 但这都是猜测。只有通过测试我们才能知道用并行是否有效。记住这句格言:“首先使它工作,然后使它更快地工作 - 只有当你必须这样做时。”将**parallel ()** 和**limit ()** 结合使用仅供专家操作(把话说在前面,我不认为自己是这里的专家)。
751
753
752
754
- 并行流只看起来很容易
753
755
754
- 实际上,在许多情况下,并行流确实可以毫不费力地更快地产生结果。但正如你所见,仅仅将**parallel ()**加到你的Stream操作上并不一定是安全的事情。在使用**parallel ()**之前,你必须了解并行性如何帮助或损害你的操作。一个基本认知错误就是认为使用并行性总是一个好主意。事实上并不是。Stream意味着你不需要重写所有代码便可以并行运行它。但是流的出现并不意味着你可以不用理解并行的原理以及不用考虑并行是否真的有助于实现你的目标。
756
+ 实际上,在许多情况下,并行流确实可以毫不费力地更快地产生结果。但正如你所见,仅仅将**parallel ()** 加到你的Stream操作上并不一定是安全的事情。在使用**parallel ()** 之前,你必须了解并行性如何帮助或损害你的操作。一个基本认知错误就是认为使用并行性总是一个好主意。事实上并不是。Stream意味着你不需要重写所有代码便可以并行运行它。但是流的出现并不意味着你可以不用理解并行的原理以及不用考虑并行是否真的有助于实现你的目标。
755
757
756
758
## 创建和运行任务
757
759
@@ -788,7 +790,7 @@ public class NapTask implements Runnable {
788
790
}
789
791
```
790
792
791
- 这只是一个** Runnable**:一个包含**run ()**方法的类。它没有包含实际运行任务的机制。我们使用**Nap**类中的“sleep”:
793
+ 这只是一个** Runnable**:一个包含**run ()** 方法的类。它没有包含实际运行任务的机制。我们使用**Nap**类中的“sleep”:
792
794
793
795
```java
794
796
// onjava/Nap.java
@@ -810,9 +812,9 @@ public class Nap {
810
812
```
811
813
为了消除异常处理的视觉干扰,这被定义为实用程序。第二个构造函数在超时时显示一条消息
812
814
813
- 对** TimeUnit.MILLISECONDS.sleep ()**的调用获取“当前线程”并在参数中将其置于休眠状态,这意味着该线程被挂起。这并不意味着底层处理器停止。操作系统将其切换到其他任务,例如在你的计算机上运行另一个窗口。OS任务管理器定期检查**sleep ()**是否超时。当它执行时,线程被“唤醒”并给予更多处理时间。
815
+ 对** TimeUnit.MILLISECONDS.sleep ()** 的调用获取“当前线程”并在参数中将其置于休眠状态,这意味着该线程被挂起。这并不意味着底层处理器停止。操作系统将其切换到其他任务,例如在你的计算机上运行另一个窗口。OS任务管理器定期检查**sleep ()** 是否超时。当它执行时,线程被“唤醒”并给予更多处理时间。
814
816
815
- 你可以看到**sleep ()**抛出一个受检的**InterruptedException**;这是原始Java 设计中的一个工件,它通过突然断开它们来终止任务。因为它往往会产生不稳定的状态,所以后来不鼓励终止。但是,我们必须在需要或仍然发生终止的情况下捕获异常。
817
+ 你可以看到**sleep ()** 抛出一个受检的**InterruptedException**;这是原始Java 设计中的一个工件,它通过突然断开它们来终止任务。因为它往往会产生不稳定的状态,所以后来不鼓励终止。但是,我们必须在需要或仍然发生终止的情况下捕获异常。
816
818
817
819
要执行任务,我们将从最简单的方法-- SingleThreadExecutor 开始:
818
820
@@ -867,7 +869,7 @@ main awaiting termination
867
869
NapTask [9 ] pool- 1 - thread- 1
868
870
```
869
871
870
- 首先请注意,没有** SingleThreadExecutor ** 类。** newSingleThreadExecutor()** 是 ** Executors ** 中的工厂 ,它创建特定类型的[^ 4 ]
872
+ 首先请注意,没有** SingleThreadExecutor ** 类。** newSingleThreadExecutor()** 是 ** Executors ** 中的一个工厂方法 ,它创建特定类型的** ExecutorService ** [^ 4 ]
871
873
872
874
我创建了十个NapTasks 并将它们提交给ExecutorService ,这意味着它们开始自己运行。然而,在此期间,main()继续做事。当我运行callexec. shutdown()时,它告诉ExecutorService 完成已经提交的任务,但不接受任何新任务。此时,这些任务仍然在运行,因此我们必须等到它们在退出main()之前完成。这是通过检查exec. isTerminated()来实现的,这在所有任务完成后变为true 。
873
875
@@ -932,7 +934,7 @@ public class MoreTasksAfterShutdown {
932
934
java.util.concurrent. RejectedExecutionException : TaskNapTask [99 ] rejected from java.util.concurrent. ThreadPoolExecutor @4e25154f [Shutting down, pool size = 1 ,active threads = 1 , queued tasks = 0 , completed tasks = 0 ]NapTask [1 ] pool- 1 - thread- 1
933
935
```
934
936
935
- ** exec. shutdown()** 的替代方法是** exec. shutdownNow()** ,它除了不接受新任务外,还会尝试通过中断任务来停止任何当前正在运行的任务。同样,中断是错误的,容易出错并且不鼓励。
937
+ ** exec. shutdown()** 的替代方法是** exec. shutdownNow()** ,它除了不接受新任务外,还会尝试通过中断任务来停止任何当前正在运行的任务。同样,中断是错误的,容易出错并且不鼓励。
936
938
937
939
- 使用更多线程
938
940
0 commit comments