600字范文,内容丰富有趣,生活中的好帮手!
600字范文 > 【Java并发编程】Java多线程(四):FutureTask 源码分析

【Java并发编程】Java多线程(四):FutureTask 源码分析

时间:2021-04-17 22:10:00

相关推荐

【Java并发编程】Java多线程(四):FutureTask 源码分析

前言:【Java并发编程】Java多线程(三):Runnable、Callable --创建任务的方式

在上一篇文章的末尾我们通过两个问题,引出了 FutureTask 及其设计思路,先来回顾一下:

问题一:Callable 与 Future都是接口,怎么实现通过 Future 控制 Callable 呢?

答:可以创建一个中间类实现 Future接口,然后将 Callable 实例组合进来,最后通过 Future 接口中的方法实现控制。

问题二:另外,这里还要考虑一个问题,Runnable 和 Callable 都可以表示线程要执行的任务,那么这两个接口如何在不改变原有关系的基础上互相转化?

答:首先,Runnable 与 Callable 肯定不能通过 extends 实现,因为直接继承就是 is-a 的关系,而这俩显然不是。那么还有一个办法,引入一个中间类 F(满足以下两个条件)

F extends Runnable:继承 RunnableF { primary Callable c }:组合 Callable

=>最终,结合上面的所有分析,我们得到了 FutureTask 的结构:实现 Future 接口 & 实现Runnable接口 & 组合Callable

下面我们就来看看 FutureTask 的源码到底是什么样的。

1.FutureTask 结构

public class FutureTask<V> implements RunnableFuture<V> {// 任务状态private volatile int state;private static final int NEW= 0;// 线程任务创建private static final int COMPLETING = 1;// 任务执行中private static final int NORMAL = 2;// 任务执行结束private static final int EXCEPTIONAL = 3;// 任务异常private static final int CANCELLED = 4;// 任务取消成功private static final int INTERRUPTING = 5;// 任务正在被打断中private static final int INTERRUPTED = 6;// 任务被打断成功// 组合了 Callable private Callable<V> callable;// 异步线程返回的结果private Object outcome; // 当前任务所运行的线程private volatile Thread runner;// 记录调用 get 方法时被等待的线程private volatile WaitNode waiters;//---------------------------构造方法---------------------------------// 构造器中传入Callable接口实现对象,对callable成员变量进行初始化public FutureTask(Callable<V> callable) {if (callable == null)throw new NullPointerException();this.callable = callable;// 任务状态初始化this.state = NEW; // ensure visibility of callable}// 使用 Runnable 初始化,并传入 result 作为返回结果。// Runnable 是没有返回值的,所以 result 一般没有用,置为 null 就好了public FutureTask(Runnable runnable, V result) {// Executors.callable 方法把 runnable 适配成 RunnableAdapter,// RunnableAdapter 实现了 callable,所以也就是把 runnable 直接适配成了 callable。this.callable = Executors.callable(runnable, result);this.state = NEW; // ensure visibility of callable}//.......}

这里我们从第二个构造中看到,即使入参是 Runnable,最后也会转化成 callable。我们来看看 Executors.callable() 是怎么做的

可以看到,最后是创建了一个 RunnableAdapter。其实这里是适配器模式:Executors.callable() --> RunnableAdpter

// 转化 Runnable 成 Callable 的工具类// 自己实现Callable接口static final class RunnableAdapter<T> implements Callable<T> {final Runnable task;final T result;RunnableAdapter(Runnable task, T result) {this.task = task;this.result = result;}// 重写call方法,返回result// 调用路径:callable.call() => RunnableAdapter#call() => task.run() => return resultpublic T call() {task.run();return result;}}

2.继承关系分析

再来看 FutureTask 的继承关系

可以看到 FutureTask 实现了 RunnableFuture 接口,而 RunnableFuture 又继承了 Future 和 Runnable 接口(看看,是不是和上面说的对上了)。

继承 Future:实现 Future 对 Callable 的控制 在 FutureTask 中可以看到 Callable 实例已经被组合)构造器传入 Callable 实例继承 Runable:将外在变成 Runnable,所以,当构造函数传入 Callable 就直接转化为了 Runnable

public interface RunnableFuture<V> extends Runnable, Future<V> {void run();}

下面我们就分别看看 FutureTask 是如何实现 Future 接口和 Runnable 接口中的方法…

3.Future 接口方法的实现

get()

get 有无限阻塞和带超时时间两种方法,我们通常建议使用带超时时间的方法,源码如下

public V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException {if (unit == null)throw new NullPointerException();int s = state;// 如果任务已经在执行中了,并且等待一定的时间后,仍然在执行中,直接抛出异常if (s <= COMPLETING &&(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)throw new TimeoutException();// 任务执行成功,返回执行的结果return report(s);}

awaitDone(): 等待任务执行完成

private int awaitDone(boolean timed, long nanos)throws InterruptedException {// 计算等待终止时间,如果一直等待的话,终止时间为 0final long deadline = timed ? System.nanoTime() + nanos : 0L;WaitNode q = null;// 不排队boolean queued = false;// 自旋for (;;) {// 如果线程已经被打断了,删除,抛异常if (Thread.interrupted()) {removeWaiter(q);throw new InterruptedException();}// 当前任务状态int s = state;// 当前任务已经执行完了,返回if (s > COMPLETING) {// 当前任务的线程置空if (q != null)q.thread = null;return s;}// 如果正在执行,当前线程让出 cpu,重新竞争,防止 cpu 飙高else if (s == COMPLETING) // cannot time out yetThread.yield();// 如果第一次运行,新建 waitNode,当前线程就是 waitNode 的属性else if (q == null)q = new WaitNode();// 默认第一次都会执行这里,执行成功之后,queued 就为 true,就不会再执行了// 把当前 waitNode 当做 waiters 链表的第一个else if (!queued)queued = pareAndSwapObject(this, waitersOffset,q.next = waiters, q);// 如果设置了超时时间,并过了超时时间的话,从 waiters 链表中删除当前 waitelse if (timed) {nanos = deadline - System.nanoTime();if (nanos <= 0L) {removeWaiter(q);return state;}// 没有过超时时间,线程进入 TIMED_WAITING 状态LockSupport.parkNanos(this, nanos);}// 没有设置超时时间,进入 WAITING 状态elseLockSupport.park(this);}}

cancel()

取消任务,如果正在运行,尝试去打断

public boolean cancel(boolean mayInterruptIfRunning) {if (!(state == NEW &&//任务状态不是创建 并且不能把 new 状态置为取消,直接返回 pareAndSwapInt(this, stateOffset, NEW,mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))return false;// 进行取消操作,打断可能会抛出异常,选择 try finally 的结构try { // in case call to interrupt throws exceptionif (mayInterruptIfRunning) {try {Thread t = runner;if (t != null)t.interrupt();} finally { // final state// 状态设置成已打断UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);}}} finally {// 清理线程finishCompletion();}return true;}

3.Runnable接口方法的实现

run()

run 方法可以直接被调用,也可以开启新的线程进行调用

run 方法是没有返回值的,通过给 outcome 属性赋值(set(result)),get 时就能从 outcome 属性中拿到返回值;FutureTask 两种构造器,最终都转化成了 Callable,所以在 run 方法执行的时候,只需要执行 Callable 的 call 方法即可

public void run() {// 状态不是任务创建,或者当前任务已经有线程在执行了,直接返回if (state != NEW ||!pareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return;try {Callable<V> c = callable;// Callable 不为空,并且已经初始化完成if (c != null && state == NEW) {V result;boolean ran;try {// run() -> c.call() -> RunnableAdapter#call() -> task.run()result = c.call();ran = true;} catch (Throwable ex) {result = null;ran = false;setException(ex);}// 给 outcome 赋值if (ran)set(result);}} finally {runner = null;int s = state;if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}}

总结

最后,总结一下FutureTask 有什么作用:

实现了 Future 的所有方法,对任务有一定的管理功能,比如说拿到任务执行结果,取消任务,打断任务等等。组合了 Callable,实现了 Runnable,把 Callable 和 Runnnable 串联了起来。统一了有参任务和无参任务两种定义方式,方便了使用。

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。