600字范文,内容丰富有趣,生活中的好帮手!
600字范文 > futuretask java_Java并发编程一(FutureTask)

futuretask java_Java并发编程一(FutureTask)

时间:2024-02-01 12:59:38

相关推荐

futuretask java_Java并发编程一(FutureTask)

一、前言

创建线程有几种方式?

继承 Thread 类

实现 Runnable 接口

但这两种方式创建的线程是属于”三无产品“:

没有参数

没有返回值

没办法抛出异常

用着 “三无产品” 总是有一些弊端,其中没办法拿到返回值是最让人不能忍的,于是 Callable 就诞生了

二、Callable

@FunctionalInterface

public interface Callable {

V call() throws Exception;

}

@FunctionalInterface

public interface Runnable {

public abstract void run();

}

两者有啥区别么?除了显而易见的 Exception 以外,还有最重要的是 Callable 有返回值。那这个返回值怎么用呢?就涉及到标题中的 Future 了

先看一下 ExecutorService 中的方法

Future submit(Callable task);

Future submit(Runnable task, T result);

Future> submit(Runnable task);

submit都会阻塞主线程,等到执行完毕都会返回一个 Future 对象。区别就在于第一个方法用的是 Callable 的返回值。第二第三个方法用的是 result 或者 null

三、Future

Future 基本方法(这些方法都很好理解,不做过多说明)

public interface Future {

boolean cancel(boolean mayInterruptIfRunning);

boolean isCancelled();

boolean isDone();

V get() throws InterruptedException, ExecutionException;

V get(long timeout, TimeUnit unit)

throws InterruptedException, ExecutionException, TimeoutException;

}

Future 的类图结构

根据上面的类图,接下来就简单分析一下 FutureTask 和 CompletableFuture(CompletableFuture 放到下一篇)

四、FutureTask

1、重要变量

/**

* 有以下四种状态变化

* NEW -> COMPLETING -> NORMAL

* NEW -> COMPLETING -> EXCEPTIONAL

* NEW -> CANCELLED

* NEW -> INTERRUPTING -> INTERRUPTED

*/

private volatile int state;

private static final int NEW = 0;

private static final int COMPLETING = 1;

private static final int NORMAL = 2;

private static final int EXCEPTIONAL = 3;

private static final int CANCELLED = 4;

private static final int INTERRUPTING = 5;

private static final int INTERRUPTED = 6;

//要运行的Callable

private Callable callable;

//Callable运行完后的结果

private Object outcome;

//执行任务的线程

private volatile Thread runner;

//get方法阻塞的线程队列

private volatile WaitNode waiters;

FutureTask 其实就是一个 Runnable,因此主要看一下两个方法 run 和 get

2、run 方法

public void run() {

if (state != NEW ||

!pareAndSwapObject(this, runnerOffset,

null, Thread.currentThread()))

return;

try {

Callable c = callable;

if (c != null && state == NEW) {

V result;

boolean ran;

try {

// 执行 call,并返回结果

result = c.call();

ran = true;

} catch (Throwable ex) {

result = null;

ran = false;

setException(ex);

}

if (ran)

// 保存结果

set(result);

}

} finally {

// runner must be non-null until state is settled to

// prevent concurrent calls to run()

runner = null;

// state must be re-read after nulling runner to prevent

// leaked interrupts

int s = state;

if (s >= INTERRUPTING)

handlePossibleCancellationInterrupt(s);

}

}

protected void set(V v) {

if (pareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {

outcome = v;

UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state

finishCompletion();

}

}

run方法其实不难,就是运行,然后CAS保存值

3、get 方法

public V get() throws InterruptedException, ExecutionException {

int s = state;

if (s <= COMPLETING)

s = awaitDone(false, 0L);

return report(s);

}

public V get(long timeout, TimeUnit unit)

throws InterruptedException, ExecutionException, TimeoutException {

if (unit == null)

throw new NullPointerException();

int s = state;

if (s <= COMPLETING &&

(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)

throw new TimeoutException();

return report(s);

}

get 方法有两个,核心方法就是 awaitDone 和 report(report就是保存值的,比较简单)。接下来分析 awaitDone

private int awaitDone(boolean timed, long nanos) throws InterruptedException {

final long deadline = timed ? System.nanoTime() + nanos : 0L;

WaitNode q = null;

boolean queued = false;

for (;;) {

if (Thread.interrupted()) {

// 如果线程已中断,则直接将当前节点q从waiters中移出

removeWaiter(q);

throw new InterruptedException();

}

int s = state;

if (s > COMPLETING) {

// 如果state已经是最终状态了,则直接返回state

if (q != null)

q.thread = null;

return s;

}

else if (s == COMPLETING) // cannot time out yet

// 如果state是中间状态(COMPLETING),意味很快将变更过成最终状态,让出cpu时间片即可

Thread.yield();

else if (q == null)

// 如果发现尚未有节点,则创建节点

q = new WaitNode();

else if (!queued)

// 如果当前节点尚未入队,则将当前节点放到waiters中的首节点,并替换旧的waiters

queued = pareAndSwapObject(this, waitersOffset,

q.next = waiters, q);

else if (timed) {

// 线程被阻塞指定时间后再唤醒

nanos = deadline - System.nanoTime();

if (nanos <= 0L) {

removeWaiter(q);

return state;

}

LockSupport.parkNanos(this, nanos);

}

else

// 线程一直被阻塞直到被其他线程唤醒

LockSupport.park(this);

}

}

总结说来, FutureTask 的流程就是先 run 起来,等结束以后通过 CAS 赋值。(CAS是底层的锁操作)

调用 get 的时候无限循环去判断 state,如果state已经是最终状态了,则直接返回

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。