序
- 项目中利用了Java 8 的并行流(parallelStream)来优化程序处理性能:
- public static LinkedList<CycleCanSequenceDto> batchParseCloudMessageToCycleSequences(
- List<byte []> cloudMessageBytesList
- , CanHeaderConfigDto cloudMessageHeaderConfig
- ) {
- List<LinkedList<CycleCanSequenceDto>> cycleCanSequenceDtoListList = cloudMessageBytesList.parallelStream().map(cloudMessageBytes -> {//并行处理
- LinkedList<CycleCanSequenceDto> canSequenceDtos = null;
- try {
- canSequenceDtos = parseCloudMessageToCycleSequences(cloudMessageBytes, cloudMessageHeaderConfig);
- } catch (IOException e) {
- String errorMessage = "Parse cloud message to cycle sequences fail!cloudMessageBytesHex:" + BytesUtils.bytesToHexString(cloudMessageBytes);
- log.error( errorMessage );
- throw new RuntimeException(errorMessage);
- }
- return canSequenceDtos;
- } ).collect(Collectors.toCollection(LinkedList::new));
- LinkedList<CycleCanSequenceDto> cycleCanSequenceDtoList = cycleCanSequenceDtoListList.parallelStream().flatMap(cycleCanSequenceDtoListElement -> {//并行处理
- return cycleCanSequenceDtoListElement.stream();
- }).collect(Collectors.toCollection(LinkedList::new));
- return cycleCanSequenceDtoList;
- }
复制代码- @Setter
- @Getter
- public class CycleCanSequenceDto extends CycleMessageSequenceDto {
- /**
- * 获取 MessagePayloadDto 的总个数
- * @param cycleCanSequences
- * @return
- */
- public static Long getMessagePayloadSize(List<CycleCanSequenceDto> cycleCanSequences){
- AtomicLong messagePayloadSize = new AtomicLong(0);
- if(cycleCanSequences==null) {
- return -1L;
- }
- cycleCanSequences.parallelStream().forEach(cycleCanSequenceDto -> {
- Integer currentCycleCanSequenceDtoMessagePayloadSize = cycleCanSequenceDto.getContent().size();
- messagePayloadSize.addAndGet( currentCycleCanSequenceDtoMessagePayloadSize );
- });
- return messagePayloadSize.get();
- }
- }
复制代码 概述:Java 并行流(parallelStream)[JDK8 - ]
- 并行流(parallelStream)是Java 8引入的强大特性,它能够自动将流操作【并行化】,以利用多核处理器的优势。
java.util.Collection#parallelStream()
Java 8引入了流的概念去对数据进行复杂的操作,而且使用并行流(Parallel Steams)支持并发,大大加快了运行效率。
- //顺序流
- list.stream()
- .filter(i -> i > 10)
- .collect( Collectors.toList() );
- //并行流
- list.parallelStream()
- .filter(i -> i > 10)
- .collect( Collectors.toList() );
复制代码下面我们将全面探讨parallelStream的使用方法、原理和最佳实践。
并行流基础
创建并行流
- // 从集合创建并行流
- List<String> list = Arrays.asList("a", "b", "c");
- Stream<String> parallelStream = list.parallelStream();
- // 将顺序流转为并行流
- Stream<String> parallelStream2 = Stream.of("a", "b", "c").parallel();
复制代码 基本使用示例
- List<Integer> numbers = IntStream.rangeClosed(1, 100).boxed().collect(Collectors.toList());
- // 并行计算平方和
- long sum = numbers.parallelStream()
- .mapToLong(i -> i * i)
- .sum();
复制代码 并行流工作原理
底层机制
- 将任务分割为多个子任务(fork)
- 并行执行这些子任务
- 合并结果(join)
算法思想: 分治
- 案例讲解: 以代码list.parallelStream().filter(...).collect(...)为例
- Stage链构建:通过Head节点(Stage0)和中间操作(如filter、sorted)形成双向链表,每个阶段(Stage)封装操作逻辑。
- 任务拆分:Spliterator将数据分割为多个子任务,分发到ForkJoinPool的线程队列。
- 并行执行:各线程独立处理子任务,通过opWrapSink方法将操作链应用到数据流。
- 结果合并:终端操作(如collect)调用combiner合并子任务结果。
- // 示例:ArrayList的Spliterator实现
- public Spliterator<E> spliterator() {
- return new ArrayListSpliterator<>(this, 0, -1, 0); // 初始范围[0, size)
- }
复制代码 底层框架:Fork/Join 框架
- 并行流基于Java 7引入的Fork/Join框架实现,其核心是ForkJoinPool线程池,采用工作窃取算法(Work-Stealing) 优化任务分配。
每个线程维护一个双端队列,优先处理自己的任务,空闲时窃取其他线程队列尾部的任务,最大化CPU利用率。
- ForkJoinTask:任务基类,子类包括RecursiveTask(有返回值)和RecursiveAction(无返回值)。
- Spliterator:数据拆分器,负责将数据源分割为可并行处理的子块。
例如: ArrayListSpliterator支持高效随机访问分割。
源码级关键机制解析
1) 数据拆分与合并
- Spliterator特性:通过characteristics()方法返回特性值(如ORDERED、SIZED),影响拆分策略。
例如: ArrayList支持高效平均分割,而LinkedList拆分成本高。
- 任务链构造:中间操作(如filter、map)通过StatelessOp或StatefulOp节点构建操作链,StatefulOp(如sorted)需缓存中间数据。
2) 并行流线程模型
- 默认线程池:使用ForkJoinPool.commonPool() (JVM内共享的公共线程池, 被【整个应用程序】所使用)
- 默认的线程数为: Runtime.getRuntime().availableProcessors() - 1 即: CPU核心数-1。
-1是因为还有 JVM 的主线程需要占用1个线程
- 可自定义系统属性: java.util.concurrent.ForkJoinPool.common.parallelism
最佳实践: 由于主线程也会参与任务抢占CPU,所以 ForkJoinPool.commonPool 的线程数尽量设置为 (CPU核心数*N - 1)
- // 设置全局并行度
- System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "8");
复制代码
- 自定义线程池:可通过自定义ForkJoinPool提交任务,但需注意避免资源竞争。
支持通过 ForkJoinPool 定义私有线程池:
- ForkJoinPool forkJoinPool = new ForkJoinPool(8);
- List<Long> longs = forkJoinPool.submit(() -> aList.parallelStream().map( e -> {
- return e + 1;
- }).collect(Collectors.toList())).get();
复制代码 适用场景
适合使用并行流的场景
- 数据量大:通常超过10,000个元素
- 计算密集型操作(CPU):如复杂的数学运算
- 无状态操作:如map、filter、flatMap等
- 独立操作:元素处理不依赖其他元素
不适合的场景
- 顺序依赖操作:如limit、findFirst等
- 有状态操作:如sorted、distinct
- I/O密集型操作:可能导致线程阻塞 (补充意见:但也不绝对不适合,有些情况下顺序执行,反而更慢)
- 小数据集:并行开销可能超过收益
性能优化技巧
正确测量性能
- long start = System.nanoTime();
- result = list.parallelStream().[...].collect(Collectors.toList());
- long duration = (System.nanoTime() - start) / 1_000_000;
- System.out.println("耗时: " + duration + " ms");
复制代码 选择合适的并行度
- // 自定义线程池
- ForkJoinPool customPool = new ForkJoinPool(4);
- customPool.submit(() -> {
- list.parallelStream().[...].collect(Collectors.toList());
- }).get();
复制代码 避免共享可变状态
- // 错误示例 - 存在竞态条件
- List<String> result = new ArrayList<>();
- list.parallelStream().forEach(s -> result.add(s.toUpperCase())); // 可能抛出异常
- // 正确做法
- List<String> safeResult = list.parallelStream()
- .map(String::toUpperCase)
- .collect(Collectors.toList());
复制代码 高级应用
自定义Spliterator
- class CustomSpliterator<T> implements Spliterator<T> {
- // 实现方法...
- }
- Spliterator<String> spliterator = new CustomSpliterator<>(data);
- Stream<String> parallelStream = StreamSupport.stream(spliterator, true);
复制代码 并行收集器
- // 使用线程安全的收集器
- Map<String, List<Student>> studentsByClass = students.parallelStream()
- .collect(Collectors.groupingByConcurrent(Student::getClassName));
复制代码 FAQ: 并行流的常见陷阱与解决方案
Q:并行流与顺序流的性能对比?
- List<Integer> numbers = IntStream.rangeClosed(1, 10_000_000).boxed().collect(Collectors.toList());
- // 顺序流
- long seqTime = measureTime(() -> numbers.stream().reduce(0, Integer::sum));
- // 并行流
- long parTime = measureTime(() -> numbers.parallelStream().reduce(0, Integer::sum));
- System.out.println("顺序流: " + seqTime + "ms");
- System.out.println("并行流: " + parTime + "ms");
复制代码 操作数据量顺序流耗时并行流耗时求和100万15ms8ms过滤1000万120ms45ms排序100万650ms750msQ:线程安全问题
- int[] counter = new int[1];
- list.parallelStream().forEach(e -> counter[0]++); // 竞态条件
复制代码- // 使用原子类
- AtomicInteger counter = new AtomicInteger();
- list.parallelStream().forEach(e -> counter.incrementAndGet());
- // 或使用归约操作
- int sum = list.parallelStream().mapToInt(e -> 1).sum();
复制代码 Q:顺序敏感操作
- // 并行流中findFirst可能不如预期
- Optional<Integer> first = list.parallelStream()
- .filter(i -> i > 10)
- .findFirst();
复制代码- // 如需顺序保证,使用【顺序流】,而非并行流
- Optional<Integer> first = list.stream()
- .filter(i -> i > 10)
- .findFirst();
复制代码 Q:性能层面的考量:是否需要单独构建线程池?
- ✅ 建议单独构建线程池的场景
| 场景 | 原因 |
| ------------- | ------------------------------------------------------------------ |
| I/O 密集型任务 | 默认线程数较少(CPU-1),不适合阻塞操作(如 DB、HTTP),容易拖慢整个 commonPool(),影响其他并行任务 。 |
| 任务隔离需求 | 避免与其他模块共享线程池,防止任务间资源竞争、死锁或阻塞 。 |
| 需要精确控制并发度 | 自定义线程池可设置合适的线程数,避免过度切换或资源浪费 。 |
- ❌ 可不单独构建线程池的场景
场景原因CPU 密集型任务默认 commonPool() 的线程数已接近 CPU 核心数,适合计算密集型任务 。简单一次性任务代码简洁、无需复杂控制,使用默认线程池即可 。Q:最佳实践经验
- 先测试后优化:不要假设并行一定更快,实际测量性能
- 避免副作用:确保lambda表达式没有副作用
- 考虑顺序性:需要顺序保证时使用顺序流
- 合理设置并行度:根据CPU核心数和任务特性调整
- 注意数据结构:ArrayList比LinkedList更适合并行处理
- 避免自动装箱:使用原始类型流(IntStream等)提升性能
- 是否需要单独创建线程池来执行并行流?
- CPU 密集型任务:可直接使用 parallelStream(),无需额外线程池。
- I/O 密集型或关键业务:建议如下方式使用自定义 ForkJoinPool:
若任务为 I/O 密集型或对隔离性、并发度有要求,有必要单独构建线程池以提升性能与稳定性 。
- ForkJoinPool customPool = new ForkJoinPool(20); // 自定义线程数
- customPool.submit(() ->
- list.parallelStream().forEach(item -> doSomething(item))
- ).get();
- customPool.shutdown();
复制代码并行流是强大的工具,但需要谨慎使用。正确使用时可以显著提升性能,错误使用则可能导致潜在问题。理解其工作原理和适用场景是有效使用并行流的关键。
X 参考文献
- Java并行流(parallelStream)深度解析 - CSDN
- Java8并行流parallelStream原理深度解析 - CSDN
- 如何自定义ForkJoinPool提升并行流 ParallelStream执行速度 - 亿速云/大数据
- java8中修改parallelStream默认并发数 - CSDN
本文作者: 千千寰宇
本文链接: https://www.cnblogs.com/johnnyzen
关于博文:评论和私信会在第一时间回复,或直接私信我。
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!
日常交流:大数据与软件开发-QQ交流群: 774386015 【入群二维码】参见左下角。您的支持、鼓励是博主技术写作的重要动力!
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |