600字范文,内容丰富有趣,生活中的好帮手!
600字范文 > Java-多线程-Future FutureTask CompletionService CompletableFuture解决多线程并发中归集问题的效率对比

Java-多线程-Future FutureTask CompletionService CompletableFuture解决多线程并发中归集问题的效率对比

时间:2022-10-01 02:47:13

相关推荐

Java-多线程-Future FutureTask CompletionService CompletableFuture解决多线程并发中归集问题的效率对比

转载声明

本文大量内容系转载自以下文章,有删改,并参考其他文档资料加入了一些内容:

【小家Java】Future、FutureTask、CompletionService、CompletableFuture解决多线程并发中归集问题的效率对比

作者:YourBatman

出处:CSDN

CompletableFuture使用大全,简单易懂

作者:未知

出处:拉钩

Java CompletableFuture 使用详解和例子

作者:鸟窝

出处:colobu

Java 8 CompletableFuture 小教程

作者:yexiaobai

出处:SegmentFault

CompletableFuture 原理浅析

作者:luoxn28

出处:CSDN

转载仅为方便学习查看,一切权利属于原作者,本人只是做了整理和排版,如果带来不便请联系我删除。

摘要

开启线程执行任务,不管是使用Runnable(无返回值不支持上报异常)还是Callable(有返回值支持上报异常)接口,都可以轻松实现。那么如果是开启线程池并需要获取结果归集的情况下,如何实现,以及优劣?

本文将分别以这四种方式解决归集的问题,然后看看效率和使用的方便程度即可

1 Future

1.1 概述

Future接口封装了:

取消,获取线程结果,使用状态判断是否取消,使用状态判断是否完成

等几个方法,很实用

1.2 Demo

使用线程池提交Callable接口任务,返回Future接口,添加进list,最后遍历该List且内部使用while轮询,并发获取结果,代码如下

/*** 使用Futrue来实现多线程执行归集操作** @author fangshixiang@* @description //* @date /10/31 11:02*/public class FutureDemo {public static void main(String[] args) {Long start = Instant.now().toEpochMilli();//定义一个线程池 方便开启和执行多线程 此处为了方便,直接使用 newFixedThreadPoolExecutorService exs = Executors.newFixedThreadPool(10);//结果集 装载在list里面List<Integer> list = new ArrayList<>();List<Future<Integer>> futureList = new ArrayList<>();try {//1.提交10个任务,每个任务返回一个Future入futureList 装载起来 // 这样10个线程就并行去处理和计算了for (int i = 0; i < 10; i++) {futureList.add(exs.submit(new CallableTask(i + 1)));}Long getResultStart = Instant.now().toEpochMilli();System.out.println("结果归集开始时间=" + LocalDateTime.now());//2.结果归集,用迭代器遍历futureList,高速轮询(模拟实现了并发),任务完成就移除while (futureList.size() > 0) {Iterator<Future<Integer>> iterable = futureList.iterator();//遍历 轮询while (iterable.hasNext()) {Future<Integer> future = iterable.next();//如果任务完成就立马取结果,并且,并且把该任务直接从futureList移除掉 否则判断下一个任务是否完成if (future.isDone() && !future.isCancelled()) {//获取结果Integer i = future.get();System.out.println("任务i=" + i + "获取完成,移出任务队列!" + LocalDateTime.now());//把结果装入进去 然后把futrue任务移除list.add(i);iterable.remove();} else {Thread.sleep(1);//避免CPU高速运转(这就是轮询的弊端),这里休息1毫秒,CPU纳秒级别}}}System.out.println("list=" + list); //任务的处理结果System.out.println("总耗时=" + (System.currentTimeMillis() - start) + ",取结果归集耗时=" + (System.currentTimeMillis() - getResultStart));} catch (Exception e) {e.printStackTrace();} finally {exs.shutdown();}}// 任务 采用sleep模拟处理任务需要消耗的时间static class CallableTask implements Callable<Integer> {Integer i; //用来编号任务 方便日志里输出识别public CallableTask(Integer i) {super();this.i = i;}@Overridepublic Integer call() throws Exception {if (i == 1) {Thread.sleep(3000);//任务1耗时3秒} else if (i == 5) {Thread.sleep(5000);//任务5耗时5秒} else {Thread.sleep(1000);//其它任务耗时1秒}System.out.println("task线程:" + Thread.currentThread().getName() + "任务i=" + i + ",完成!" + LocalDateTime.now());return i;}}}

如上图,开启定长为10的线程池:ExecutorService exs = Executors.newFixedThreadPool(10);+任务1耗时3秒,任务5耗时5秒,其他1秒。控制台打印如下:

结果归集开始时间=-08-28T13:03:23.364task线程:pool-1-thread-6任务i=6,完成!-08-28T13:03:24.252task线程:pool-1-thread-4任务i=4,完成!-08-28T13:03:24.252task线程:pool-1-thread-2任务i=2,完成!-08-28T13:03:24.252task线程:pool-1-thread-7任务i=7,完成!-08-28T13:03:24.252task线程:pool-1-thread-8任务i=8,完成!-08-28T13:03:24.252task线程:pool-1-thread-9任务i=9,完成!-08-28T13:03:24.252task线程:pool-1-thread-10任务i=10,完成!-08-28T13:03:24.252task线程:pool-1-thread-3任务i=3,完成!-08-28T13:03:24.252任务i=8获取完成,移出任务队列!-08-28T13:03:24.252任务i=9获取完成,移出任务队列!-08-28T13:03:24.253任务i=10获取完成,移出任务队列!-08-28T13:03:24.253任务i=2获取完成,移出任务队列!-08-28T13:03:24.254任务i=3获取完成,移出任务队列!-08-28T13:03:24.254任务i=4获取完成,移出任务队列!-08-28T13:03:24.254任务i=6获取完成,移出任务队列!-08-28T13:03:24.255任务i=7获取完成,移出任务队列!-08-28T13:03:24.256task线程:pool-1-thread-1任务i=1,完成!-08-28T13:03:26.254任务i=1获取完成,移出任务队列!-08-28T13:03:26.256task线程:pool-1-thread-5任务i=5,完成!-08-28T13:03:28.254任务i=5获取完成,移出任务队列!-08-28T13:03:28.256list=[8, 9, 10, 2, 3, 4, 6, 7, 1, 5]总耗时=5016,取结果归集耗时=5005

可观察到list最后总是1和5,因为代码里面模拟这两个任务耗时最长。

1.3 小结

直接使用Futrue可实现基本目标,任务并行且按照完成顺序获取结果。

使用很普遍,老少皆宜,就是CPU有消耗,可以使用!

2 FutureTask

2.1 概述

FutureTask是接口RunnableFuture的实现类,实现了Future+Runnable

Runnable接口,可开启单个线程执行。Future接口,可接受Callable接口的返回值,futureTask.get()阻塞获取结果。

2.2 Demo

2.2.1 Demo1

开启单个线程执行任务阻塞等待执行结果

分离这两步骤,可在这两步中间穿插别的相关业务逻辑。

/*** FutureTask弥补了Future必须用线程池提交返回Future的缺陷,实现功能如下:* 这两个步骤:一个开启线程执行任务,一个阻塞等待执行结果,分离这两步骤,可在这两步中间穿插别的相关业务逻辑。** @author fangshixiang@* @description //* @date /10/31 11:15*/public class FutureTaskContorlDemo {public static void main(String[] args) {try {System.out.println("=====例如一个统计公司总部和分部的总利润是否达标100万==========");//利润 记录总公司的利润综合Integer count = 0;//1.定义一个futureTask,假设去远程http获取各个分公司业绩(任务都比较耗时).FutureTask<Integer> futureTask = new FutureTask<>(new CallableTask());Thread futureTaskThread = new Thread(futureTask);futureTaskThread.start();System.out.println("futureTaskThread start!" + new Date());//2.主线程先做点别的事System.out.println("主线程查询总部公司利润开始时间:" + new Date());Thread.sleep(5000);count += 10; //10表示北京集团总部利润。System.out.println("主线程查询总部公司利润结果时间:" + new Date());//总部已达标100万利润,就不再继续执行获取分公司业绩任务了if (count >= 100) {System.out.println("总部公司利润达标,取消futureTask!" + new Date());futureTask.cancel(true);//不需要再去获取结果,那么直接取消即可} else {System.out.println("总部公司利润未达标,进入阻塞查询分公司利润!" + new Date());// 3总部未达标.阻塞获取,各个分公司结果 然后分别去获取分公司的利润Integer i = futureTask.get();//真正执行CallableTaskSystem.out.println("i=" + i + "获取到结果!" + new Date());}} catch (Exception e) {e.printStackTrace();}}// 模拟一个十分耗时的任务 去所有的分公司里去获取利润结果static class CallableTask implements Callable<Integer> {@Overridepublic Integer call() throws Exception {System.out.println("CallableTask-call,查询分公司利润,执行开始!" + new Date());Thread.sleep(10000);System.out.println("CallableTask-call,查询分公司利润,执行完毕!" + new Date());return 10;}}}

FutureTask这个任务,是Thread.start()的时候就开始执行了的。

而结果是futureTask#get()的时候才会给你(如果提前完成,结果也会先给你缓存;否则get就会阻塞直到有结果了)

注意:倘若你的任务里抛出了异常。那么get方法就会报错从而中断主线程(相当于不需要返回值的异步执行~),但如果你不调用get方法,主线程是不会中断的。

输出:

=====例如一个统计公司总部和分部的总利润是否达标100万==========futureTaskThread start!Fri Aug 28 13:30:48 CST CallableTask-call,查询分公司利润,执行开始!Fri Aug 28 13:30:48 CST 主线程查询总部公司利润开始时间:Fri Aug 28 13:30:48 CST 主线程查询总部公司利润结果时间:Fri Aug 28 13:30:53 CST 总部公司利润未达标,进入阻塞查询分公司利润!Fri Aug 28 13:30:53 CST CallableTask-call,查询分公司利润,执行完毕!Fri Aug 28 13:30:58 CST i=10获取到结果!Fri Aug 28 13:30:58 CST

如上,分离之后,futureTaskThread耗时10秒,在此期间主线程利用间隙执行了耗时5秒的查询总部公司利润操作,大大减少了总耗时。且可根据业务逻辑实时判断是否需要继续执行futureTask。

2.2.Demo2-归集多任务

FutureTask一样可以并发执行任务并获取结果,如下:

/*** FutureTask实现多线程并发执行任务并取结果归集** @author fangshixiang@* @description //* @date /10/31 11:26*/public class FutureTaskDemo {public static void main(String[] args) {Long start = System.currentTimeMillis();ExecutorService exs = Executors.newFixedThreadPool(10);//结果集List<Integer> list = new ArrayList<>();List<FutureTask<Integer>> futureList = new ArrayList<>();try {//启动线程池 和上面Futrue对比,只有这块有点不一样for (int i = 0; i < 10; i++) {FutureTask<Integer> futureTask = new FutureTask<>(new CallableTask(i + 1));//提交任务,添加返回,Runnable特性exs.submit(futureTask);//Future特性 提交任务后 把futureTask添加进futureListfutureList.add(futureTask);}Long getResultStart = System.currentTimeMillis();System.out.println("结果归集开始时间=" + new Date());//结果归集while (futureList.size() > 0) {Iterator<FutureTask<Integer>> iterable = futureList.iterator();//遍历一遍while (iterable.hasNext()) {Future<Integer> future = iterable.next();if (future.isDone() && !future.isCancelled()) {//Future特性Integer i = future.get();System.out.println("任务i=" + i + "获取完成,移出任务队列!" + new Date());list.add(i);//任务完成移除任务iterable.remove();} else {//避免CPU高速轮循,可以休息一下。Thread.sleep(1);}}}System.out.println("list=" + list);System.out.println("总耗时=" + (System.currentTimeMillis() - start) + ",取结果归集耗时=" + (System.currentTimeMillis() - getResultStart));} catch (Exception e) {e.printStackTrace();} finally {exs.shutdown();}}static class CallableTask implements Callable<Integer> {Integer i;public CallableTask(Integer i) {super();this.i = i;}@Overridepublic Integer call() throws Exception {if (i == 1) {Thread.sleep(3000);//任务1耗时3秒} else if (i == 5) {Thread.sleep(5000);//任务5耗时5秒} else {Thread.sleep(1000);//其它任务耗时1秒}System.out.println("task线程:" + Thread.currentThread().getName() + "任务i=" + i + ",完成!" + new Date());return i;}}}

输出:

结果归集开始时间=Fri Aug 28 13:40:55 CST task线程:pool-1-thread-3任务i=3,完成!Fri Aug 28 13:40:56 CST task线程:pool-1-thread-4任务i=4,完成!Fri Aug 28 13:40:56 CST task线程:pool-1-thread-2任务i=2,完成!Fri Aug 28 13:40:56 CST task线程:pool-1-thread-7任务i=7,完成!Fri Aug 28 13:40:56 CST task线程:pool-1-thread-6任务i=6,完成!Fri Aug 28 13:40:56 CST task线程:pool-1-thread-9任务i=9,完成!Fri Aug 28 13:40:56 CST task线程:pool-1-thread-10任务i=10,完成!Fri Aug 28 13:40:56 CST task线程:pool-1-thread-8任务i=8,完成!Fri Aug 28 13:40:56 CST 任务i=7获取完成,移出任务队列!Fri Aug 28 13:40:56 CST 任务i=8获取完成,移出任务队列!Fri Aug 28 13:40:56 CST 任务i=9获取完成,移出任务队列!Fri Aug 28 13:40:56 CST 任务i=10获取完成,移出任务队列!Fri Aug 28 13:40:56 CST 任务i=2获取完成,移出任务队列!Fri Aug 28 13:40:56 CST 任务i=3获取完成,移出任务队列!Fri Aug 28 13:40:56 CST 任务i=4获取完成,移出任务队列!Fri Aug 28 13:40:56 CST 任务i=6获取完成,移出任务队列!Fri Aug 28 13:40:56 CST task线程:pool-1-thread-1任务i=1,完成!Fri Aug 28 13:40:58 CST 任务i=1获取完成,移出任务队列!Fri Aug 28 13:40:58 CST task线程:pool-1-thread-5任务i=5,完成!Fri Aug 28 13:41:00 CST 任务i=5获取完成,移出任务队列!Fri Aug 28 13:41:00 CST list=[7, 8, 9, 10, 2, 3, 4, 6, 1, 5]总耗时=5017,取结果归集耗时=5005

2.3 小结

demo1在特定场合例如有十分耗时的业务但有依赖于其他业务不一定非要执行的,可以尝试使用。

demo2多线程并发执行并结果归集,这里多套一层FutureTask比较鸡肋(直接返回Future简单明了)不建议使用。

3 CompletionService

3.1 概述

如果你向Executor提交了一个批处理任务,并且希望在它们完成后获得结果,怎么办呢?

Java8之前的做法是返回Futrue,然后调用其get阻塞方法即可。这样做固然可以,但却相当乏味。幸运的是,Java8提供了一个更好的方法:完成服务 (CompletionService)。

CompletionService整合了Executor和BlockingQueue的功能。你可以将Callable任务提交给它去执行,然后使用类似于队列中的take和poll方法,在结果完整可用时获得这个结果,像一个使用BlockingQueue打包的Future。

CompletionService是Java8的新增接口,JDK为其提供了一个实现ExecutorCompletionService

原理:

内部通过BlockingQueue+FutureTask,实现了任务先完成可优先获取到,即结果按照完成先后顺序排序。

3.2 Demo

/*** CompletionService多线程并发任务结果归集** @author fangshixiang@* @description //* @date /10/31 11:29*/public class CompletionServiceDemo {public static void main(String[] args) {Long start = System.currentTimeMillis();//开启10个线程ExecutorService exs = Executors.newFixedThreadPool(10);//结果集List<Integer> list = new ArrayList<>();List<Future<Integer>> futureList = new ArrayList<>();try {int taskCount = 10;//1.定义CompletionService ExecutorCompletionService是此接口的唯一实现类 需要把线程池传进去CompletionService<Integer> completionService = new ExecutorCompletionService<>(exs);//2.添加任务(向CompletionService添加任务 然后把返回的futrue添加到futureList即可)for (int i = 0; i < taskCount; i++) {futureList.add(completionService.submit(new Task(i + 1)));}//==================结果归集===================//方法1:future是提交时返回的,遍历queue则按照任务提交顺序,获取结果 (若是按照提交顺序,那和Futrue的Demo结果将一样,没啥优势可言)// for (Future<Integer> future : futureList) {//System.out.println("====================");//Integer result = future.get();//线程在这里阻塞等待该任务执行完毕,按照//System.out.println("任务result="+result+"获取到结果!"+new Date());//list.add(result);// }// //方法2.使用内部阻塞队列的take():内部维护阻塞队列,任务先完成的先获取到for (int i = 0; i < taskCount; i++) {Integer result = completionService.take().get();//采用completionService.take(),System.out.println("任务i==" + result + "完成!" + new Date());list.add(result);}System.out.println("list=" + list);System.out.println("总耗时=" + (System.currentTimeMillis() - start));} catch (Exception e) {e.printStackTrace();} finally {exs.shutdown();//关闭线程池}}static class Task implements Callable<Integer> {Integer i;public Task(Integer i) {super();this.i = i;}@Overridepublic Integer call() throws Exception {if (i == 5) {Thread.sleep(5000);} else {Thread.sleep(1000);}System.out.println("线程:" + Thread.currentThread().getName() + "任务i=" + i + ",执行完成!");return i;}}}

输出:

线程:pool-1-thread-3任务i=3,执行完成!线程:pool-1-thread-1任务i=1,执行完成!线程:pool-1-thread-2任务i=2,执行完成!任务i==3完成!Fri Aug 28 15:18:28 CST 任务i==2完成!Fri Aug 28 15:18:28 CST 任务i==1完成!Fri Aug 28 15:18:28 CST 线程:pool-1-thread-4任务i=4,执行完成!线程:pool-1-thread-6任务i=6,执行完成!任务i==4完成!Fri Aug 28 15:18:28 CST 线程:pool-1-thread-8任务i=8,执行完成!任务i==6完成!Fri Aug 28 15:18:28 CST 任务i==8完成!Fri Aug 28 15:18:28 CST 线程:pool-1-thread-7任务i=7,执行完成!线程:pool-1-thread-10任务i=10,执行完成!线程:pool-1-thread-9任务i=9,执行完成!任务i==7完成!Fri Aug 28 15:18:28 CST 任务i==10完成!Fri Aug 28 15:18:28 CST 任务i==9完成!Fri Aug 28 15:18:28 CST 线程:pool-1-thread-5任务i=5,执行完成!任务i==5完成!Fri Aug 28 15:18:32 CST list=[3, 2, 1, 4, 6, 8, 7, 10, 9, 5]总耗时=5017

3.3 小结

CompletionService使用率也挺高,而且能按照线程完成先后排序。建议如果有排序需求的优先使用。

只是多线程并发执行任务结果归集,也可以使用,类似Future。

4 CompletableFuture

4.1 基本概念

4.1.1 概述

CompletableFuture主要是用于异步调用,内部封装了线程池,可以将请求或者处理过程,进行异步处理。

CompletableFuture是JDK1.8才新加入的一个实现类,实现了Future,CompletionStage2个接口(该接口也是1.8才提供)

当一个Future可能需要显示地完成时,使用CompletionStage接口去支持完成时触发的函数和操作。当2个以上线程同时尝试 正常完成、异常完成、取消一个CompletableFuture时,只有一个线程能成功。

以生活中的一个例子来说明异步行为:电饭煲蒸饭:

Runnable-同步阻塞

以前呀,都是大锅饭,放上米,放上水,然后需要不断地加柴火,人要看着火,具体什么时候煮熟,也得偶尔打开看看,看看开没开锅,煮没煮熟。

这种就是没有任何通知方式,没有返回值的Runnable,只管煮饭,煮没煮熟需要自己判断。

Future-同步非阻塞

一个老板发现了这个商机,说能不能做一个东西,不用人一直看着,自动就能把米饭做好,所以电饭煲就出现了。 初代电饭煲的出现,算是解放了人力,再也不用看着火了,方便了很多,自己可以去做点其他的事情,热个牛奶,剪个鸡蛋什么的。

但是至于饭什么时候熟,还得自己隔一段时间就得过去看一看。

这就是Future的方式,虽然任务是异步执行的,但是要想获得这个结果,还得需要自己取。

CompletableFuture-异步非阻塞

时间继续推进,这个老板又有了新的想法,每隔一段时间,看看饭熟没熟还是有点浪费我看电视的时间,这个电饭煲能不能做好了直接告诉我呢,这样我就直接来吃就行了。

因此就有了这种可以预约、可以定时、可以保温、可以好了后通知的高级电饭煲。

这个就对应着CompletableFuture,所有事情都是可以自动完成,即可以在完成之后回调通知,也可以自己去等待。

4.1.2 实现接口

4.1.2.1 CompletionStage

CompletableFuture实现了CompletionStage接口的如下策略:

为了完成当前的CompletableFuture接口或者其他完成方法的回调函数的线程,提供了非异步的完成操作没有显式入参Executor的所有async方法都使用monPool(),这是为了简化监视、调试和跟踪,所有生成的异步任务都是标记接口AsynchronousCompletionTask的实例所有的CompletionStage方法都是独立于其他共有方法实现的,因此一个方法的行为不会受到子类中其他方法的覆盖

4.1.2.2 Future

Runnable就是没有返回结果的行为。Callable是有返回结果的行为。Future 异步封装Callable和Runnable,委托给线程池执行后,需要取回执行的结果CompletableFuture 封装了Future,使其拥有了回调的功能,在某个行为完成之后,可以继续进行下一个动作。

public static void main(String[] args){CompletableFuture future = CompletableFuture.supplyAsync(() -> {System.out.println("电饭煲开始做饭");try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}return "白米饭";}).thenAccept(result -> {System.out.println("开始吃米饭");});System.out.println("我先去搞点牛奶和鸡蛋");future.join();}

输出结果如下:

电饭煲开始做饭我先去搞点牛奶和鸡蛋开始吃米饭

这样使用CompletableFuture就可以一边等待米饭煮熟,一边去做其他事情。而且一旦米饭好了,会自动开始通知吃米饭。

CompletableFuture实现了Future接口的如下策略:

CompletableFuture无法直接控制完成,所以cancel操作被视为是另一种异常完成形式。方法isCompletedExceptionally可以用来确定一个CompletableFuture是否以任何异常的方式完成。以一个CompletionException为例,方法get()get(long,TimeUnit)抛出一个ExecutionException,对应CompletionException。为了在大多数上下文中简化用法,这个类还定义了方法join()getNow,而不是直接在这些情况中直接抛出CompletionException

4.1.3 方法介绍

4.1.3.1 概述

JDK8新增接口,此接口包含38个方法…是的,你没看错,就是38个方法。这些方法主要是为了支持函数式编程中流式处理。

将其划分为以下几类:

创建类

以下方法可以创建CompletableFuture<T>,T为返回值类型

completeFuture 可以用于创建默认返回值runAsync 异步执行,无返回值supplyAsync 异步执行,有返回值anyOf

当任意一个CompletableFuture执行完后,就可以进行下一步动作allOf

当所有的CompletableFuture都执行完后,才可以进行下一步任务completedFuture

得到一个拥有传入的U value的已完成的CompletableFuture<U>

状态取值类

join 合并结果,等待get 合并等待结果,可以增加超时时间;get和join区别,join只会抛出unchecked异常,get会返回具体的异常getNow 如果结果计算完成或者异常了,则返回结果或异常;否则,返回valueIfAbsent的值isCancelledisCompletedExceptionallyisDone

控制类

用于主动控制CompletableFuture的完成行为

complete

如果CompletableFuture未完成,则设定get等方法的返回值为指定值

返回值为boolean,true时代表本方法调用使得CompletableFuture转为了completed状态

completeExceptionally

如果CompletableFuture未完成,则设定get等方法调用时抛出指定异常

返回值为boolean,true时代表本方法调用使得CompletableFuture转为了completed状态

cancel

如果CompletableFuture未完成,则设定get等方法调用时抛出CancellationException

注意:参数并没有实际意义

接续类

CompletableFuture 最重要的特性,没有这个的话,CompletableFuture就没意义了,用于注入回调行为。

thenApply, thenApplyAsync

thenApply接受一个函数,thenCompose是接受一个future实例,更适合处理流操作。thenApply()和thenCompose()的异同见 /p/62d93b249628

thenAccept, thenAcceptAsync

thenRun, thenRunAsync

thenCombine, thenCombineAsync

连接两个CompletableFuture,当两个CompletionStage都执行完成后,把结果一块交给thenCombineAsync来进行消耗

还需要传入BiFunction<? super T,? super U,? extends V> fn)T为首个CompletableFuture返回泛型,CompletableFuture的参数,U为要连接的CompletableFuture的泛型,V为连接后返回的CompletableFuture<V>

thenAcceptBoth, thenAcceptBothAsync

连接两个CompletableFuture,当两个CompletionStage都执行完成后,把结果一块交给thenAcceptBoth来进行消耗

还需要传入BiConsumer<? super T, ? super U> actionT为首个CompletableFuture返回泛型,CompletableFuture的参数,U为要连接的CompletableFuture的泛型,无返回值(CompletionStage<Void>

runAfterBoth, runAfterBothAsync

两个CompletionStage,都完成了计算才会执行下一步的操作(Runnable)

applyToEither, applyToEitherAsync

两个CompletionStage,谁执行返回的结果快,我就用那个CompletionStage的结果进行下一步的转化操作

参数为CompletionStage<? extends T> other, Function<? super T, U> fn,返回值为CompletableFuture<U>

acceptEither, acceptEitherAsync

两个CompletionStage,谁执行返回的结果快,我就用那个CompletionStage的结果进行下一步的消耗操作

参数为CompletionStage<? extends T> other, Consumer<? super T> action,返回值为CompletableFuture<Void>

runAfterEither, runAfterEitherAsync

两个CompletionStage,谁执行返回的结果快,就开始进行下一步的Runnable操作

参数为CompletionStage<?> other, Runnable action,返回值为CompletableFuture<Void>

thenCompose, thenComposeAsync

允许你对多个 CompletionStage 进行流水线操作,第一个操作完成时,将其结果作为参数传递给第二个操作。

参数为Function<? super T, ? extends CompletionStage<U>> fn,返回值为CompletableFuture<U>

whenComplete, whenCompleteAsync

参数为有两个参数的BiConsumer<? super T, ? super Throwable> action(注意第一个泛型为调用者CompletableFuture<T>的泛型T;第二个泛型为异常Throwable,但没有返回值),返回值为CompletableFuture<T>

handle, handleAsync

参数为有两个参数的BiFunction<? super T, Throwable, ? extends U> fn)(注意第一个泛型为调用者CompletableFuture<T>的泛型T;第二个泛型为异常Throwable,第三个泛型为返回值CompletableFuture<U>的泛型U),返回值为CompletableFuture<U>

exceptionally

参数为Function<Throwable, ? extends T> fn第一个泛型为前驱CF抛出的异常(如果抛出了异常),第二个泛型为前驱CF已经定义的返回值泛型。

当前一个CF因为抛出异常完成时,就触发本方法定义的function,可对异常进行处理,返回指定值;否则exceptionally方法内定义的内容无效,仍然返回前驱cf的返回值。

前置动作必须的是一个有返回值的操作,不能是那种CompletableFuture<Void>返回值的.

方法命名规则:

以Async结尾的方法,都是异步方法,即使用额外的线程执行而不是当前线程;对应的没有Async则是同步方法即该方法将继续在已有的线程中执行;一般都是一个异步方法对应一个同步方法。

并且,以Async后缀结尾的方法,都有两个重载的方法,一个是使用默认的forkjoin线程池(monPool()),一种是使用用户传入的自定义线程池

以run开头或者结尾的方法,其入口参数一定是无参的(Runnable),没有返回值(CompletableFuture<Void>),类似于执行Runnable方法。

以supply开头的方法,其参数一定是无入参只有出参的(Supplier<U>),有返回值(CompletableFuture<U>

以Accept开头或者结尾的方法,入口参数为Consumer<? super T>即只有入参类型无出参,但是没有返回值(CompletableFuture<Void>

以Apply开头或者结尾的方法,参数有入参和出参(Function<? super T, U>),有返回值(CompletableFuture<U>

带有either后缀的方法,表示谁先完成就消费谁

4.1.3.2 创建CompletableFuture

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {return asyncSupplyStage(asyncPool, supplier);}public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor) {return asyncSupplyStage(screenExecutor(executor), supplier);}public static CompletableFuture<Void> runAsync(Runnable runnable) {return asyncRunStage(asyncPool, runnable);}public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor) {return asyncRunStage(screenExecutor(executor), runnable);}

如上,其中supplyAsync用于有返回值的任务,runAsync则用于没有返回值的任务。Executor参数可以手动指定线程池,否则默认monPool()系统级公共线程池,注意:这些线程都是Daemon线程。

使用例子:

// 异步任务,无返回值,采用内部的forkjoin线程池CompletableFuture c1 = CompletableFuture.runAsync(()->{System.out.println("打开开关,开始制作,就不用管了")});// 异步任务,无返回值,使用自定义的线程池CompletableFuture c11 = CompletableFuture.runAsync(()->{System.out.println("打开开关,开始制作,就不用管了")},newSingleThreadExecutor());// 异步任务,有返回值,使用内部默认的线程池CompletableFuture<String> c2 = CompletableFuture.supplyAsync(()->{System.out.println("清洗米饭");return "干净的米饭";});// 只要有一个完成,则完成,有一个抛出异常,则携带异常CompletableFuture<Object> c3 = CompletableFuture.anyOf(c1,c2);// 必须等待所有的future全部完成才可以CompletableFuture<Void> c4 = CompletableFuture.allOf(c1,c2);

4.1.3.3 取值与状态

常用的是下面的这几种// 阻塞等待获取结果// 有异常时会抛出T result = future.join()// 阻塞的等待,无限等待// 有异常则抛出异常T result = future.get()// 有异常则抛出异常,// 以下指定最长等待获取结果1个小时,一个小时之后,如果还没有数据,则抛出TimeoutException异常。T result = future.get(1,TimeUnit.Hours)

4.1.3.4 控制CompletableFuture执行

// 如果CompletableFuture未完成,则设定get等方法的返回值为指定值// 返回值为boolean,true时代表本方法调用使得CompletableFuture转为了completed状态plete("米饭");// 如果CompletableFuture未完成,则设定get等方法调用时抛出指定异常// 返回值为boolean,true时代表本方法调用使得CompletableFuture转为了completed状态pleteExceptionally(new Throwable(...));// 如果CompletableFuture未完成,则设定get等方法调用时抛出CancellationException// 参数并没有实际意义future.cancel(false);

4.1.3.5 接续行为

4.1.3.5.1 概述

用来描述上一件事做完之后,该做什么。

可以总结为三类:

CompletableFuture + (Runnable,Consumer,Function)CompletableFuture + CompletableFutureCompletableFuture + 处理结果

4.1.3.5.2 接续方式1

// supplyAsync 无入参只有出参CompletableFuture future = CompletableFuture.supplyAsync(()->{System.out.println("投放和清洗制作米饭的材料");return "干净的没有新冠病毒的大米";}).thenAcceptAsync(result->{// thenAcceptAsync 只有入参无出参,且放在另外线程执行System.out.println("通电,设定模式,开始煮米饭");}).thenRunAsync(()->{// 传入Runnable,无返回(CompletableFuture<Void>)System.out.println("米饭做好了,可以吃了");})

4.1.3.5.3 接续方式2

假如蒸米饭和、热牛奶、炒菜等已经是3个不同的CompletableFuture,可以使用接续方式2,将两个或者多个CompletableFuture组合在一起使用。

CompletableFuture rice = CompletableFuture.supplyAsync(()->{System.out.println("开始制作米饭,并获得煮熟的米饭");return "煮熟的米饭";})//煮米饭的同时呢,我又做了牛奶CompletableFuture<String> mike = CompletableFuture.supplyAsync(()->{System.out.println("开始热牛奶,并获得加热的牛奶");return "加热的牛奶";});// 我想两个都好了,才吃早饭,// thenCombineAsync有入参(两个CompletableFuture),有返回值CompletableFuture<V>mike.thenCombineAsync(rice,(m,r)->{System.out.println("早饭全都做好了:"+m+","+r);return m+r;})// 有入参,无返回值 CompletableFuture<Void>mike.thenAcceptBothAsync(rice,(m,r)->{System.out.println("早饭做好了:"+m+","+r); });// 无入参和出参 CompletableFuture<Void>mike.runAfterBothAsync(rice,()->{System.out.println("早饭做好了"); });// 或者直接连接两个CompletableFuture// 将第一个stage结果作为参数给第二个stage,// ,返回值为`CompletableFuture<U>`,U为第二个Stage决定rice.thenComposeAsync(r->CompletableFuture.supplyAsync(()->{System.out.println("开始煮牛奶");System.out.println("同时开始煮米饭");return "mike";}))

4.1.3.5.4 接续方式3

如果我们只想做结果处理,也没有其他的接续动作,并且我们想要判断异常的情况,那么可以用接续方式3

whenComplete, whenCompleteAsync

处理完成或异常,参数为有两个参数的BiConsumer<? super T, ? super Throwable> action(注意第一个泛型为调用者CompletableFuture<T>的泛型T;第二个泛型为异常Throwable,但没有返回值),返回值为CompletableFuture<T>handle, handleAsync

参数为有两个参数的BiFunction<? super T, Throwable, ? extends U> fn)(注意第一个泛型为调用者CompletableFuture<T>的泛型T;第二个泛型为异常Throwable,第三个泛型为返回值CompletableFuture<U>的泛型U),返回值为CompletableFuture<U>

// 不能处理返回值CompletableFuture.supplyAsync(()->{System.out.println("开始蒸米饭");return "煮熟的米饭";}).whenCompleteAsync((rich,exception)->{if (exception!=null){System.out.println("电饭煲坏了,米饭没做熟");}else{System.out.println("米饭熟了,可以吃了");}})// 可处理返回值CompletableFuture.supplyAsync(()->{System.out.println("开始蒸米饭");return "煮熟的米饭";}).handleAsync((rich,exception)->{if (exception!=null){System.out.println("电饭煲坏了,米饭没做熟");}else{System.out.println("米饭熟了,可以吃了");}return "准备冷一冷再吃米饭";})// 异常处理CompletableFuture.supplyAsync(()->{System.out.println("开始蒸米饭");return "煮熟的米饭";}).handleAsync((rich,exception)->{if (exception!=null){System.out.println("电饭煲坏了,米饭没做熟");}else{System.out.println("米饭熟了,可以吃了");}return "准备冷一冷再吃米饭";}).exceptionally((exception)->{// 前置动作必须的是一个有返回值的操作,不能是那种CompletableFuture<Void>返回值的return "";});

4.1.3.3 组合CompletableFuture

thenCombine():

先完成当前CompletionStage和other CompletionStage 2个任务,然后把结果传参给BiFunction进行结果合并操作

三个重载方法如下:

public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn) {return biApplyStage(null, other, fn);}public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn) {return biApplyStage(asyncPool, other, fn);}public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn, Executor executor) {return biApplyStage(screenExecutor(executor), other, fn);}

thenCompose()

第一个CompletableFuture执行完毕后,传递给下一个CompletionStage作为入参进行操作。

三个重载方法如下:

public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn) {return uniComposeStage(null, fn);}public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) {return uniComposeStage(asyncPool, fn);}public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn,Executor executor) {return uniComposeStage(screenExecutor(executor), fn);}

更多可参考【小家java】Java8新特性之—CompletableFuture的系统讲解和实例演示(使用CompletableFuture构建异步应用)

4.2 Demo

4.2.1 allOf anyOf

JDK8的CompletableFuture 自带多任务组合方法allOf和anyOf

allOf是等待所有任务完成,构造后CompletableFuture完成anyOf是只要有一个任务完成,构造后CompletableFuture就完成

4.2.1.1 Demo1

Random rand = new Random();CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(10000 + rand.nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}return 100;});CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(10000 + rand.nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}return "abc";});//CompletableFuture<Void> f = CompletableFuture.allOf(future1,future2);CompletableFuture<Object> f = CompletableFuture.anyOf(future1,future2);System.out.println(f.get());

上面的代码运行结果有时是100,有时是”abc”。但是anyOf和applyToEither不同。anyOf接受任意多的CompletableFuture但是applyToEither只是判断两个CompletableFuture,anyOf返回值的计算结果是参数中其中一个CompletableFuture的计算结果,applyToEither返回值的计算结果却是要经过fn处理的。当然还有静态方法的区别,线程池的选择等。

4.2.1.2 Demo2

方式一:循环创建CompletableFuture list,调用sequence()组装返回一个有返回值的CompletableFuture,返回结果get()获取

package demos.concurrent.pare;import java.util.ArrayList;import java.util.Arrays;import java.util.Date;import java.util.List;import java.pletableFuture;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.stream.Collectors;/*** 多线程并发任务,取结果归集** @author fangshixiang@* @description //* @date /10/31 11:53*/public class CompletableFutureDemo {public static void main(String[] args) {Long start = System.currentTimeMillis();//定长10线程池ExecutorService exs = Executors.newFixedThreadPool(10);// 存放的是按线程执行顺序完成的结果List<String> list2 = new ArrayList<>();List<CompletableFuture<String>> futureList = new ArrayList<>();final List<Integer> taskList = Arrays.asList(2, 1, 3, 4, 5, 6, 7, 8, 9, 10);try {//方式一:循环创建CompletableFuture list, 然后组装 组装返回一个有返回值的CompletableFuture,返回结果get()获取for (int i = 0; i < taskList.size(); i++) {final int j = i;//异步执行 拿到每个有返回值的CompletableFuture对象CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> calc(taskList.get(j)), exs)// 完成后将返回值传入thenApply指定的Function.thenApply(String::valueOf)//如需获取任务完成先后顺序,此处代码即可.whenComplete((v, e) -> {System.out.println("任务" + v + "完成!result=" + v + ",异常 e=" + e + "," + new Date());// 存放的是按线程执行顺序完成的结果list2.add(v);});futureList.add(future);}// 此时任务没执行完,所以list2为空System.out.println(list2);//流式获取结果:此处是根据任务添加顺序获取的结果========================// 1.构造一个返回空的CompletableFuture// 将所有Future都放入allDoneFuture// allOf是等待所有任务完成CompletableFuture<Void> allDoneFuture = CompletableFuture.allOf(futureList.stream().filter(f -> f != null).collect(Collectors.toList()).toArray(new CompletableFuture[futureList.size()]));//2. 等待总任务完成,然后使用join等待每个子任务取结果,最后转换为list,放入结果集List<String> list = allDoneFuture.thenApply(voidReturn -> futureList.stream().map(CompletableFuture::join).collect(Collectors.toList()))// 这里是返回thenApply内部的函数执行结果,即List<String>.get();//流式获取结果:此处是根据任务添加顺序获取的结果========================System.out.println("任务完成先后顺序,结果list2=" + list2 + ";任务提交顺序,结果list=" + list + ",耗时=" + (System.currentTimeMillis() - start));} catch (Exception e) {e.printStackTrace();} finally {exs.shutdown();}}//模拟任务的耗时方法public static Integer calc(Integer i) {try {if (i == 1) {//任务1耗时3秒Thread.sleep(3000);} else if (i == 5) {//任务5耗时5秒Thread.sleep(5000);} else {//其它任务耗时1秒Thread.sleep(1000);}System.out.println("task线程:" + Thread.currentThread().getName() + "任务i=" + i + ",完成!+" + new Date());} catch (InterruptedException e) {e.printStackTrace();}return i;}}

输出:

[]task线程:pool-1-thread-1任务i=2,完成!+Fri Aug 28 16:14:40 CST 任务2完成!result=2,异常 e=null,Fri Aug 28 16:14:40 CST task线程:pool-1-thread-4任务i=4,完成!+Fri Aug 28 16:14:40 CST task线程:pool-1-thread-3任务i=3,完成!+Fri Aug 28 16:14:40 CST 任务4完成!result=4,异常 e=null,Fri Aug 28 16:14:40 CST 任务3完成!result=3,异常 e=null,Fri Aug 28 16:14:40 CST task线程:pool-1-thread-6任务i=6,完成!+Fri Aug 28 16:14:40 CST task线程:pool-1-thread-8任务i=8,完成!+Fri Aug 28 16:14:40 CST task线程:pool-1-thread-7任务i=7,完成!+Fri Aug 28 16:14:40 CST 任务8完成!result=8,异常 e=null,Fri Aug 28 16:14:40 CST 任务6完成!result=6,异常 e=null,Fri Aug 28 16:14:40 CST task线程:pool-1-thread-10任务i=10,完成!+Fri Aug 28 16:14:40 CST task线程:pool-1-thread-9任务i=9,完成!+Fri Aug 28 16:14:40 CST 任务10完成!result=10,异常 e=null,Fri Aug 28 16:14:40 CST 任务7完成!result=7,异常 e=null,Fri Aug 28 16:14:40 CST 任务9完成!result=9,异常 e=null,Fri Aug 28 16:14:40 CST task线程:pool-1-thread-2任务i=1,完成!+Fri Aug 28 16:14:42 CST 任务1完成!result=1,异常 e=null,Fri Aug 28 16:14:42 CST task线程:pool-1-thread-5任务i=5,完成!+Fri Aug 28 16:14:44 CST 任务5完成!result=5,异常 e=null,Fri Aug 28 16:14:44 CST 任务完成先后顺序,结果list2=[2, 4, 3, 8, 6, 10, 7, 9, 1, 5];任务提交顺序,结果list=[2, 1, 3, 4, 5, 6, 7, 8, 9, 10],耗时=5107

4.2.1.3 Demo3

全流式处理转换成CompletableFuture[]+allOf组装成一个无返回值CompletableFuture,join等待执行完毕。返回结果whenComplete获取。

推荐使用

import java.util.ArrayList;import java.util.Arrays;import java.util.Date;import java.util.List;import java.pletableFuture;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;/*** 多线程并发任务,取结果归集* <p>* 推荐使用此方法** @author fangshixiang@* @description //* @date /10/31 11:53*/public class CompletableFutureDemo2 {public static void main(String[] args) {Long start = System.currentTimeMillis();//定长10线程池ExecutorService exs = Executors.newFixedThreadPool(10);//结果集List<String> list = new ArrayList<>();final List<Integer> taskList = Arrays.asList(2, 1, 3, 4, 5, 6, 7, 8, 9, 10);try {//方式二:全流式处理转换成CompletableFuture[]+组装成一个无返回值CompletableFuture,// join等待执行完毕。// 返回结果whenComplete获取CompletableFuture<Integer>[] cfs = taskList.stream().map(i ->//把计算任务 交给CompletableFuture异步去处理执行CompletableFuture.supplyAsync(() -> calc(i), exs)// 把计算完成结果做Function处理:此处是转换成了字符串.thenApply(String::valueOf)//如需获取任务完成先后顺序,此处代码即可 会先处理先完成的任务 后处理后完成的任务 使用起来比CompletionService确实方便不少.whenComplete((v, e) -> {System.out.println("任务" + v + "完成!result=" + v + ",异常 e=" + e + "," + new Date());list.add(v);})).toArray(CompletableFuture[]::new); //此处直接toArray 不toList了//等待总任务完成,但是封装后无返回值,必须自己whenComplete()获取 此处使用join来获取结果CompletableFuture.allOf(cfs).join();System.out.println("任务完成先后顺序,结果list=" + list + ";任务提交顺序,耗时=" + (System.currentTimeMillis() - start));} catch (Exception e) {e.printStackTrace();} finally {exs.shutdown();}}//模拟任务的耗时方法public static Integer calc(Integer i) {try {if (i == 1) {//任务1耗时3秒Thread.sleep(3000);} else if (i == 5) {//任务5耗时5秒Thread.sleep(5000);} else {//其它任务耗时1秒Thread.sleep(1000);}System.out.println("task线程:" + Thread.currentThread().getName() + "任务i=" + i + ",完成!+" + new Date());} catch (InterruptedException e) {e.printStackTrace();}return i;}}

输出:

task线程:pool-1-thread-1任务i=2,完成!+Fri Aug 28 16:24:00 CST 任务2完成!result=2,异常 e=null,Fri Aug 28 16:24:00 CST task线程:pool-1-thread-3任务i=3,完成!+Fri Aug 28 16:24:00 CST task线程:pool-1-thread-9任务i=9,完成!+Fri Aug 28 16:24:00 CST 任务3完成!result=3,异常 e=null,Fri Aug 28 16:24:00 CST task线程:pool-1-thread-7任务i=7,完成!+Fri Aug 28 16:24:00 CST task线程:pool-1-thread-4任务i=4,完成!+Fri Aug 28 16:24:00 CST task线程:pool-1-thread-6任务i=6,完成!+Fri Aug 28 16:24:00 CST task线程:pool-1-thread-10任务i=10,完成!+Fri Aug 28 16:24:00 CST task线程:pool-1-thread-8任务i=8,完成!+Fri Aug 28 16:24:00 CST 任务10完成!result=10,异常 e=null,Fri Aug 28 16:24:00 CST 任务6完成!result=6,异常 e=null,Fri Aug 28 16:24:00 CST 任务4完成!result=4,异常 e=null,Fri Aug 28 16:24:00 CST 任务7完成!result=7,异常 e=null,Fri Aug 28 16:24:00 CST 任务9完成!result=9,异常 e=null,Fri Aug 28 16:24:00 CST 任务8完成!result=8,异常 e=null,Fri Aug 28 16:24:00 CST task线程:pool-1-thread-2任务i=1,完成!+Fri Aug 28 16:24:02 CST 任务1完成!result=1,异常 e=null,Fri Aug 28 16:24:02 CST task线程:pool-1-thread-5任务i=5,完成!+Fri Aug 28 16:24:04 CST 任务5完成!result=5,异常 e=null,Fri Aug 28 16:24:04 CST 任务完成先后顺序,结果list=[2, 3, 10, 6, 4, 7, 9, 8, 1, 5];任务提交顺序,耗时=5082

4.2.2 Either

thenAcceptBoth和runAfterBoth是当两个CompletableFuture都计算完成,而我们下面要了解的方法是当任意一个CompletableFuture计算完成的时候就会执行

Random rand = new Random();CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(10000 + rand.nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}return 100;});CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(10000 + rand.nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}return 200;});CompletableFuture<String> f = future.applyToEither(future2,i -> i.toString());

上面这个例子有时会输出100,有时候会输出200,哪个Future先完成就会根据它的结果计算。

4.2.3 煮饭

4.2.3.1 概述

创建CompletableFuture,其实就是将我们要煮的米饭,委托给电饭煲;

要煮米饭,我们要准备这么几件事情:

我们要指定制作米饭的方式,我们要指定电饭煲。除此之外,我们也可以委托其他的事情,最后可以通过all或者any进行组合。

4.3 原理

4.3.1 前置概念

4.3.1.1 UnSafe的应用

Unsafe是位于sun.misc包下的一个类,主要提供一些用于执行低级别、不安全操作的方法,如直接访问系统内存资源、自主管理内存资源等。在CompletableFuture中则主要用到了Unsafe提供的CAS操作,来为CompletableFuture的结果赋值,或执行出入栈等操作,来保证其线程安全。关于UnSafe具体可参考Java Magic. Part 4: sun.misc.Unsafe

Unsafe相关私有方法主要用于采用CAS的方式为result引用,以及Treiber stack模型中的stacknext引用赋值:

其中给result赋值的方法有internalCompletecompleteNullcompleteValuecompleteThrowable;给stack赋值的有casStacktryPushStackpushStackpushbipush。给next赋值的为lazySetNext方法。

以下介绍几个给stack赋值的重要的方法:

casStack

casStack是直接给stack赋值。

final boolean casStack(Completion cmp, Completion val) {return pareAndSwapObject(this, STACK, cmp, val);}

tryPushStack

tryPushStack则是尝试把原stack值设为新stack值的next,也即入栈,所以tryPushStack中用到了给next赋值的lazySetNext方法。lazySetNext方法即Treiber stack模型在CompletableFuture中实现的体现。

/** Returns true if successfully pushed c onto stack. */final boolean tryPushStack(Completion c) {Completion h = stack;lazySetNext(c, h);//把stack的旧值h设为stack的新值c的nextreturn pareAndSwapObject(this, STACK, h, c);//cas的把stack设为c}static void lazySetNext(Completion c, Completion next) {UNSAFE.putOrderedObject(c, NEXT, next);}

pushStack

是一直调用tryPushStack直到成功为止。

/** Unconditionally pushes c onto stack, retrying if necessary. */final void pushStack(Completion c) {do {} while (!tryPushStack(c));}

push

push会先判断依赖的上一个CompletableFuture是否已得到结果,若得到结果了就不需要把下一个任务放入栈中等待异步执行了。

/** Pushes the given completion (if it exists) unless done. */final void push(UniCompletion<?,?> c) {if (c != null) {while (result == null && !tryPushStack(c))lazySetNext(c, null); // clear on failure 失败时回滚tryPushStack中的lazySetNext}}

bipush

bipush则会把任务放入两个CompletableFuture的任务栈中,两个CompletableFuture获得结果后都会触发一次是否执行该任务的判断。若两个CompletableFuture都已得到结果则直接返回,后面会同步执行该任务。

/** Pushes completion to this and b unless both done. */final void bipush(CompletableFuture<?> b, BiCompletion<?,?,?> c) {if (c != null) {Object r;while ((r = result) == null && !tryPushStack(c)) //放入this的任务栈中lazySetNext(c, null); // clear on failureif (b != null && b != this && b.result == null) {Completion q = (r != null) ? c : new CoCompletion(c);while (b.result == null && !b.tryPushStack(q)) //放入b的任务栈中lazySetNext(q, null); // clear on failure}}}

4.3.1.2 Treiber stack

Treiber stack是一种用CAS实现的线程安全的无锁栈。CompletableFuture采用Treiber stack的原理实现了一个异步任务栈,用于保管CompletableFuture执行完成后将要触发的所有异步Completion任务。

CompletableFuture的result引用指向CompletableFuture的包装结果,stack引用指向Treiber stack的栈顶。

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {...volatile Object result; // Either the result or boxed AltResultvolatile Completion stack; // Top of Treiber stack of dependent actions,...}

内部类Completion继承了ForkJoinTask,可以看作一个异步任务,存放在Treiber stack中。其包含了一个next引用,即指向了栈中的下一个Completion可执行元素。关于Treiber stack可参考

abstract static class Completion extends ForkJoinTask<Void>implements Runnable, AsynchronousCompletionTask {volatile Completion next;// Treiber stack link...}

4.3.1.3 ForkJoinPool

ForkJoinPool是Java1.7引入的一款采用Fork/Join框架的线程池。

在CompletableFuture中作为其默认采用的线程池,用来分配调度线程来执行异步任务,其生成的线程默认为守护线程

CompletableFuture的阻塞功能也由ForkJoinPool.ManagedBlocker进行管理。

private static final boolean useCommonPool = (ForkJoinPool.getCommonPoolParallelism() > 1);/*** Default executor -- monPool() unless it cannot* support parallelism.* * 默认采用ForkJoinPool作为其默认线程池* * CompletableFuture的静态代码会判断是否采用ForkJoinPool作为其默认线程池,* 否则将回退到采用ThreadPerTaskExecutor作为其默认线程池。*/private static final Executor asyncPool = useCommonPool ?monPool() : new ThreadPerTaskExecutor();/** Fallback if monPool() cannot support parallelism */static final class ThreadPerTaskExecutor implements Executor {public void execute(Runnable r) {new Thread(r).start(); }}

4.3.1.4 AltResult

AltResult是CompletableFuture的一个内部类,作为CompletableFuture结果为null或出现异常时的结果返回,是CompletableFuture非正常结果的通用包装对象。

static final class AltResult {// See abovefinal Throwable ex; // null only for NILAltResult(Throwable x) {this.ex = x; }}/** The encoding of the null value. */static final AltResult NIL = new AltResult(null);

4.3.1.5 Completion

Completion在前面已有简要介绍,它是CompletableFuture的一个内部类,其用来代表Treiber stack中的一个通用的任务。它继承了ForkJoinTask并实现了Runnable接口,可以被ForkJoinPool调度执行,也可以由一般线程执行。

// Modes for Completion.tryFire. Signedness matters.static final int SYNC = 0;static final int ASYNC = 1;static final int NESTED = -1;abstract static class Completion extends ForkJoinTask<Void>implements Runnable, AsynchronousCompletionTask {volatile Completion next;// Treiber stack link/*** Performs completion action if triggered, returning a* dependent that may need propagation, if one exists.** @param mode SYNC, ASYNC, or NESTED*/abstract CompletableFuture<?> tryFire(int mode);/** Returns true if possibly still triggerable. Used by cleanStack. */abstract boolean isLive();public final void run(){tryFire(ASYNC); }public final boolean exec() {tryFire(ASYNC); return true; }public final Void getRawResult() {return null; }public final void setRawResult(Void v) {}}

其中run方法继承自Runnable,exec、getRawResult、setRawResult方法继承自ForkJoinTask。tryFire则是Completion自带的方法,用来尝试执行任务。这里tryFire的参数mode有三个可选值SYNC、ASYNC、和NESTED,代表了tryFire是在何种情形下进行的调用。

可以看到run方法和exec方法在调用tryFire时传入的是ASYNC,代表其调用时是处于异步线程下,这样tryFire内部就不会再把任务放入栈中等待异步线程执行,而是直接执行。关于模式间不同的逻辑后面会详述。

4.3.1.6 UniCompletion

UniCompletionCompletableFuture的一个内部抽象类,继承自Completion,也是Completion最重要的一个子类。

UniCompletion的子类们实现了CompletionStage的方法们的各种逻辑。

其中:

dep代表该任务将要生成的后继CompletableFuture,src代表该任务的前置任务已生成的CompletableFuture。claim方法返回该任务是否能被运行:

当Executor为null时,意味着目前处于同步模式,会返回true,使得在claim方法的外层会直接执行该后继任务。

/** A Completion with a source, dependent, and executor. */abstract static class UniCompletion<T,V> extends Completion {Executor executor; // executor to use (null if none)CompletableFuture<V> dep;// the dependent to completeCompletableFuture<T> src;// source for actionUniCompletion(Executor executor, CompletableFuture<V> dep,CompletableFuture<T> src) {this.executor = executor; this.dep = dep; this.src = src;}/*** Returns true if action can be run. Call only when known to* be triggerable. Uses FJ tag bit to ensure that only one* thread claims ownership. If async, starts as task -- a* later call to tryFire will run action.*/final boolean claim() {Executor e = executor;if (compareAndSetForkJoinTaskTag((short)0, (short)1)) {if (e == null) return true;executor = null; // disablee.execute(this);}return false;}final boolean isLive() {return dep != null; }}

dep代表该任务将要生成的CompletableFuture,src代表该任务的前置任务已生成的CompletableFuture。claim方法返回该任务是否能被运行:当Executor为null时,意味着目前处于同步模式,在claim方法的外层会直接执行该任务。

4.3.1.7 Signaller

Signaller是CompletableFuture的一个内部类,也继承自Completion

它用于记录一个已阻塞的任务,get等待结果的阻塞功能由其实现。

此外它实现了ForkJoinPool.ManagedBlocker接口,由ForkJoinPool管理其阻塞和释放。其功能待介绍get方法原理时再详述。

/*** Completion for recording and releasing a waiting thread. This* class implements ManagedBlocker to avoid starvation when* blocking actions pile up in ForkJoinPools.*/static final class Signaller extends Completionimplements ForkJoinPool.ManagedBlocker {long nanos;// wait time if timedfinal long deadline; // non-zero if timedvolatile int interruptControl; // > 0: interruptible, < 0: interruptedvolatile Thread thread;...}

4.3.2 代码流程分析

比如有如下代码:

CompletableFuture.supplyAsync(() -> {// random n millisecondint ms = new Random().nextInt(100);LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(ms));String msg = String.format("supplyAsync %s ms", ms);System.out.println(format(msg));return msg;}).thenApply(new Function<String, Integer>() {@Overridepublic Integer apply(String s) {System.out.println(format("thenApply apply s.length()"));return s.length();}}).whenComplete(new BiConsumer<Integer, Throwable>() {@Overridepublic void accept(Integer s, Throwable throwable) {System.out.println(format("done " + s));}});

先看 supplyAsync

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {// asyncPool 就是 monPool()return asyncSupplyStage(asyncPool, supplier);}static <U> CompletableFuture<U> asyncSupplyStage(Executor e,Supplier<U> f) {if (f == null) throw new NullPointerException();// 创建了一个新的CompletableFuture,用于返回结果CompletableFuture<U> d = new CompletableFuture<U>();// 将Supply提交到该executore.execute(new AsyncSupply<U>(d, f));// 注意这里返回的是新建的cf1return d;}

接着看看AsyncSupply,可见该类为CompletableFuture的静态内部类,实现了Runnable接口,又继承了ForkJoinTask<Void>

static final class AsyncSupply<T> extends ForkJoinTask<Void>implements Runnable, AsynchronousCompletionTask {CompletableFuture<T> dep; Supplier<T> fn;// 这里就是我们刚刚新建的cf,以及用户定义的SupplierAsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) {this.dep = dep; this.fn = fn;}public final Void getRawResult() {return null; }public final void setRawResult(Void v) {}public final boolean exec() {run(); return true; }// 实现自Runnable的run方法主逻辑public void run() {CompletableFuture<T> d; Supplier<T> f;if ((d = dep) != null && (f = fn) != null) {// 注意这里把dep和fn置为空了dep = null; fn = null; if (d.result == null) {// 初始result为空try {// CAS方式将后续cf.result替换为Supplier执行结果pleteValue(f.get());} catch (Throwable ex) {pleteThrowable(ex);}}// 触发后续执行任务,如thenApplyd.postComplete();}}}

以下看看CompletableFuture#postComplete

/*** Pops and tries to trigger all reachable dependents. Call only* when known to be done.*/final void postComplete() {/** On each step, variable f holds current dependents to pop* and run. It is extended along only one path at a time,* pushing others to avoid unbounded recursion.*/CompletableFuture<?> f = this; Completion h;// 初始时,f.stack可能为null,// 也有可能因为后置任务线程先执行而导致f.stack已经有值// 将栈内while ((h = f.stack) != null ||(f != this && (h = (f = this).stack) != null)) {CompletableFuture<?> d; Completion t;// 提取栈内下一个Completion元素,并放到栈顶if (f.casStack(h, t = h.next)) {// fif (t != null) {if (f != this) {pushStack(h);continue;}h.next = null; // detach}// 触发原栈顶计算,其实就是将前驱运算结果交给后继节点做为参数运算f = (d = h.tryFire(NESTED)) == null ? this : d;}}}

这里瞅瞅CompletableFuture.stack,看名字应该是个栈,那我们看看之前的代码thenApply、whenComplete是不是放入了这个stack呢:

public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {return uniApplyStage(null, fn);}private <V> CompletableFuture<V> uniApplyStage(Executor e, Function<? super T,? extends V> f) {if (f == null) throw new NullPointerException();// 这里也是构建一个新的带返回值的CompletableFuture,泛型由fn决定CompletableFuture<V> d = new CompletableFuture<V>();// 注意这里的this是前面supplyAsync执行时新建的cf1if (e != null || !d.uniApply(this, f, null)) {// 首次执行会进入这里,因为此时this.result大概率=null,即前驱还未计算完成// 否则直接跳过这里UniApply<T,V> c = new UniApply<T,V>(e, d, this, f);// 将c放入栈顶;注意`stack`对象初始为空push(c);// 尝试触发后继计算c.tryFire(SYNC);}return d;}final <S> boolean uniApply(CompletableFuture<S> a,Function<? super S,? extends T> f,UniApply<S,T> c) {Object r; Throwable x;// 注意这里的a为src cfif (a == null || (r = a.result) == null || f == null)// 首次执行时这里 a.result很可能为空,因为是异步执行的return false;// src cf有运算结果后会走这里tryComplete: if (result == null) {// 这里的result是后继的result,为空,所以会走这里if (r instanceof AltResult) {if ((x = ((AltResult)r).ex) != null) {completeThrowable(x, r);break tryComplete;}r = null;}try {// 如果后继方法是异步方法则c为null// 同步方法时c不为null// c.claim是判断是否能运行if (c != null && !c.claim())return false;@SuppressWarnings("unchecked") S s = (S) r;// 这里将src结果传给dep的function执行// 然后cas方式把结果放入dep.resultcompleteValue(f.apply(s));} catch (Throwable ex) {completeThrowable(ex);}}return true;}/** Pushes the given completion (if it exists) unless done. */final void push(UniCompletion<?,?> c) {if (c != null) {// 初始时result为空// tryPushStack 将 c 插入到stack// 随后cas方式原来的stach h 替换为 c// 以便下次新的Completion又能将c作为他的next// 这就形成了栈while (result == null && !tryPushStack(c))// 交换失败后才会进入这里// 失败时,将c.next设为 nulllazySetNext(c, null); // clear on failure}}static final class UniApply<T,V> extends UniCompletion<T,V> {Function<? super T,? extends V> fn;UniApply(Executor executor, CompletableFuture<V> dep,CompletableFuture<T> src,Function<? super T,? extends V> fn) {super(executor, dep, src); this.fn = fn;}// 尝试触发final CompletableFuture<V> tryFire(int mode) {CompletableFuture<V> d; CompletableFuture<T> a;if ((d = dep) == null ||// 注意,由于这里后继用的是同步方法,所以mode=0,传的是this(UniApply)// 可以执行后继时,d.uniApply返回true!d.uniApply(a = src, fn, mode > 0 ? null : this))return null;// 可以执行后继时走这里,把UniApply的后继和前驱都置空dep = null; src = null; fn = null;// 触发本cf后继return d.postFire(a, mode);}}final CompletableFuture<T> postFire(CompletableFuture<?> a, int mode) {// 这里a是前驱 cf,this为后继 cfif (a != null && a.stack != null) {if (mode < 0 || a.result == null)a.cleanStack();elsea.postComplete();}// 本后继cf算出的resultif (result != null && stack != null) {if (mode < 0)// tryFire(NESTED)时走这里// 返回本后继cfreturn this;elsepostComplete();}return null;}

CompletableFuture都会对应有一个stack数据结构,也就说针对同一个CompletableFuture对象添加多个Completion,执行Completion回调是按照其逆序进行执行的;针对多个CompletableFuture的添加Completion,是按照CompletableFuture的添加次序来顺序执行的,对应的测试代码如下:

4.3.2 小结

CompletableFuture的多个操作,也就是多个CompletableFuture之间,如果上一个CompletableFuture未完成,则会将当前CompletableFuture动作添加到上一个CompletableFuture的stack数据结构中,在任务执行完毕之后,回执行对应stack中的Completion回调方法,每个操作基本上都对应有Completion处理类。

整体流程图:

看完文章示例代码,是不是还不太清楚多个CompletableFuture之间的执行流程呢,说实话笔者第一次看的时候也是这样的 😦,下面我们换个例子并给出图示来看:

CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {System.out.println("hello world f1");try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}return "f1";});CompletableFuture<String> f2 = f1.thenApply(r -> {System.out.println(r);try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}return "f2";});CompletableFuture<String> f3 = f2.thenApply(r -> {System.out.println(r);try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}return "f3";});CompletableFuture<String> f4 = f1.thenApply(r -> {System.out.println(r);try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}return "f4";});CompletableFuture<String> f5 = f4.thenApply(r -> {System.out.println(r);try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}return "f5";});CompletableFuture<String> f6 = f5.thenApply(r -> {System.out.println(r);try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}return "f6";});CompletableFuture.allOf(f3, f6).join();

上面代码对应的CompletableFuture及其Completion关系如下图:

结合上图和postComplete流程,可以看出执行回调的顺序是:f1 -> f4 -> f5 -> f6 -> f2 -> f3。(如果这里没看懂,可以回过头再看下tryFire和postComplete方法的源码~)

4.4 小结

CompletableFuture满足并发执行,可按任务完成先后顺序获取任务结果。

而且支持每个任务的异常返回,配合流式编程,用起来速度飞起。

JDK源生支持,API丰富,推荐使用。

5 Guava Future

如果你用过Guava的Future类,你就会知道它的Futures辅助类提供了很多便利方法,用来处理多个Future,而不像Java的CompletableFuture,只提供了allOf、anyOf两个方法。 比如有这样一个需求,将多个CompletableFuture组合成一个CompletableFuture,这个组合后的CompletableFuture的计算结果是个List,它包含前面所有的CompletableFuture的计算结果,guava的Futures.allAsList可以实现这样的功能,但是对于java CompletableFuture,我们需要一些辅助方法:

public static <T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> futures) {CompletableFuture<Void> allDoneFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));return allDoneFuture.thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.<T>toList()));}public static <T> CompletableFuture<Stream<T>> sequence(Stream<CompletableFuture<T>> futures) {List<CompletableFuture<T>> futureList = futures.filter(f -> f != null).collect(Collectors.toList());return sequence(futureList);}

或者Java Future转CompletableFuture:

public static <T> CompletableFuture<T> toCompletable(Future<T> future, Executor executor) {return CompletableFuture.supplyAsync(() -> {try {return future.get();} catch (InterruptedException | ExecutionException e) {throw new RuntimeException(e);}}, executor);}

github有多个项目可以实现Java CompletableFuture与其它Future (如Guava ListenableFuture)之间的转换,如spotify/futures-extra、future-converter、scala/scala-java8-compat 等。

6 总结

更多好文

java 权威指南_Java 8:CompletableFuture权威指南CompletableFuture 例子

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。