600字范文,内容丰富有趣,生活中的好帮手!
600字范文 > terminated 线程_Java【多线程系列】JUC线程池—2. 原理(二) Callable和Future

terminated 线程_Java【多线程系列】JUC线程池—2. 原理(二) Callable和Future

时间:2019-05-18 21:10:47

相关推荐

terminated 线程_Java【多线程系列】JUC线程池—2. 原理(二) Callable和Future

在"Java多线程系列--“基础篇”01之 基本概念"中,我们介绍过,线程有5种状态:新建状态,就绪状态,运行状态,阻塞状态,死亡状态。线程池也有5种状态;然而,线程池不同于线程,线程池的5种状态是:Running, SHUTDOWN, STOP, TIDYING, TERMINATED

线程池状态定义代码如下:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));private static final int COUNT_BITS = Integer.SIZE - 3;private static final int CAPACITY = (1 << COUNT_BITS) - 1;private static final int RUNNING = -1 << COUNT_BITS;private static final int SHUTDOWN = 0 << COUNT_BITS;private static final int STOP = 1 << COUNT_BITS;private static final int TIDYING = 2 << COUNT_BITS;private static final int TERMINATED = 3 << COUNT_BITS;private static int ctlOf(int rs, int wc) { return rs | wc; }

说明:

ctl是一个AtomicInteger类型的原子对象。ctl记录了"线程池中的任务数量"和"线程池状态"2个信息。

ctl共包括32位。其中,高3位表示"线程池状态",低29位表示"线程池中的任务数量"。

线程池各个状态之间的切换如下图所示:

1. RUNNING

(01) 状态说明:线程池处在RUNNING状态时,能够接收新任务,以及对已添加的任务进行处理。

(02) 状态切换:线程池的初始化状态是RUNNING。换句话说,线程池被一旦被创建,就处于RUNNING状态!道理很简单,在ctl的初始化代码中(如下),就将它初始化为RUNNING状态,并且"任务数量"初始化为0。

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

2. SHUTDOWN

(01) 状态说明:线程池处在SHUTDOWN状态时,不接收新任务,但能处理已添加的任务。

(02) 状态切换:调用线程池的shutdown()接口时,线程池由RUNNING -> SHUTDOWN。

3. STOP

(01) 状态说明:线程池处在STOP状态时,不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。

(02) 状态切换:调用线程池的shutdownNow()接口时,线程池由(RUNNING or SHUTDOWN ) -> STOP。

4. TIDYING

(01) 状态说明:当所有的任务已终止,ctl记录的"任务数量"为0,线程池会变为TIDYING状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理;可以通过重载terminated()函数来实现。

(02) 状态切换:当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行的任务也为空时,就会由 SHUTDOWN -> TIDYING。

当线程池在STOP状态下,线程池中执行的任务为空时,就会由STOP -> TIDYING。

5. TERMINATED

(01) 状态说明:线程池彻底终止,就变成TERMINATED状态。

(02) 状态切换:线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING -> TERMINATED。

6. 拒绝策略介绍

线程池的拒绝策略,是指当任务添加到线程池中被拒绝,而采取的处理措施。

当任务添加到线程池中之所以被拒绝,可能是由于:

第一,线程池异常关闭。

第二,任务数量超过线程池的最大限制。

线程池共包括4种拒绝策略,它们分别是:AbortPolicy, CallerRunsPolicy, DiscardOldestPolicy和DiscardPolicy

线程池默认的处理策略是AbortPolicy

7. 拒绝策略对比和示例

下面通过示例,分别演示线程池的4种拒绝策略。

(01) DiscardPolicy 示例

(02) DiscardOldestPolicy 示例

(03) AbortPolicy 示例

(04) CallerRunsPolicy 示例

7.1 DiscardPolicy 示例

import java.lang.reflect.Field;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;import java.util.concurrent.ThreadPoolExecutor.DiscardPolicy;public class DiscardPolicyDemo {private static final int THREADS_SIZE = 1;private static final int CAPACITY = 1;public static void main(String[] args) throws Exception {// 创建线程池。线程池的"最大池大小"和"核心池大小"都为1(THREADS_SIZE),"线程池"的阻塞队列容量为1(CAPACITY)。ThreadPoolExecutor pool = new ThreadPoolExecutor(THREADS_SIZE, THREADS_SIZE, 0, TimeUnit.SECONDS,new ArrayBlockingQueue(CAPACITY));// 设置线程池的拒绝策略为"丢弃"pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());// 新建10个任务,并将它们添加到线程池中。for (int i = 0; i < 10; i++) {Runnable myrun = new MyRunnable("task-"+i);pool.execute(myrun);}// 关闭线程池pool.shutdown();}}class MyRunnable implements Runnable {private String name;public MyRunnable(String name) {this.name = name;}@Overridepublic void run() {try {System.out.println(this.name + " is running.");Thread.sleep(100);} catch (Exception e) {e.printStackTrace();}}}

运行结果:

task-0 is running.task-1 is running.

结果说明:线程池pool的"最大池大小"和"核心池大小"都为1(THREADS_SIZE),这意味着"线程池能同时运行的任务数量最大只能是1"。

线程池pool的阻塞队列是ArrayBlockingQueue,ArrayBlockingQueue是一个有界的阻塞队列,ArrayBlockingQueue的容量为1。这也意味着线程池的阻塞队列只能有一个线程池阻塞等待。

根据""中分析的execute()代码可知:线程池中共运行了2个任务。第1个任务直接放到Worker中,通过线程去执行;第2个任务放到阻塞队列中等待。其他的任务都被丢弃了!

7.2 DiscardOldestPolicy 示例

import java.lang.reflect.Field;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;import java.util.concurrent.ThreadPoolExecutor.DiscardOldestPolicy;public class DiscardOldestPolicyDemo {private static final int THREADS_SIZE = 1; private static final int CAPACITY = 1; public static void main(String[] args) throws Exception {// 创建线程池。线程池的"最大池大小"和"核心池大小"都为1(THREADS_SIZE),"线程池"的阻塞队列容量为1(CAPACITY)。 ThreadPoolExecutor pool = new ThreadPoolExecutor(THREADS_SIZE, THREADS_SIZE, 0, TimeUnit.SECONDS,new ArrayBlockingQueue(CAPACITY)); // 设置线程池的拒绝策略为"DiscardOldestPolicy" pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy()); // 新建10个任务,并将它们添加到线程池中。 for (int i = 0; i < 10; i++) {Runnable myrun = new MyRunnable("task-"+i); pool.execute(myrun); } // 关闭线程池 pool.shutdown(); }}class MyRunnable implements Runnable {private String name; public MyRunnable(String name) {this.name = name; } @Override public void run() {try {System.out.println(this.name + " is running."); Thread.sleep(200); } catch (Exception e) {e.printStackTrace(); } }}

运行结果:

task-0 is running.task-9 is running.

结果说明:将"线程池的拒绝策略"由DiscardPolicy修改为DiscardOldestPolicy之后,当有任务添加到线程池被拒绝时,线程池会丢弃阻塞队列中末尾的任务,然后将被拒绝的任务添加到末尾。

7.3 AbortPolicy 示例

import java.lang.reflect.Field;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;import java.util.concurrent.ThreadPoolExecutor.AbortPolicy;import java.util.concurrent.RejectedExecutionException;public class AbortPolicyDemo {private static final int THREADS_SIZE = 1;private static final int CAPACITY = 1;public static void main(String[] args) throws Exception {// 创建线程池。线程池的"最大池大小"和"核心池大小"都为1(THREADS_SIZE),"线程池"的阻塞队列容量为1(CAPACITY)。ThreadPoolExecutor pool = new ThreadPoolExecutor(THREADS_SIZE, THREADS_SIZE, 0, TimeUnit.SECONDS,new ArrayBlockingQueue(CAPACITY));// 设置线程池的拒绝策略为"抛出异常"pool.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());try {// 新建10个任务,并将它们添加到线程池中。for (int i = 0; i < 10; i++) {Runnable myrun = new MyRunnable("task-"+i);pool.execute(myrun);}} catch (RejectedExecutionException e) {e.printStackTrace();// 关闭线程池pool.shutdown();}}}class MyRunnable implements Runnable {private String name;public MyRunnable(String name) {this.name = name;}@Overridepublic void run() {try {System.out.println(this.name + " is running.");Thread.sleep(200);} catch (Exception e) {e.printStackTrace();}}}

(某一次)运行结果:

java.util.concurrent.RejectedExecutionExceptionat java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:1774)at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:768)at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:656)at AbortPolicyDemo.main(AbortPolicyDemo.java:27)task-0 is running.task-1 is running.

结果说明:将"线程池的拒绝策略"由DiscardPolicy修改为AbortPolicy之后,当有任务添加到线程池被拒绝时,会抛出RejectedExecutionException。

7.4 CallerRunsPolicy 示例

import java.lang.reflect.Field;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;public class CallerRunsPolicyDemo {private static final int THREADS_SIZE = 1;private static final int CAPACITY = 1;public static void main(String[] args) throws Exception {// 创建线程池。线程池的"最大池大小"和"核心池大小"都为1(THREADS_SIZE),"线程池"的阻塞队列容量为1(CAPACITY)。ThreadPoolExecutor pool = new ThreadPoolExecutor(THREADS_SIZE, THREADS_SIZE, 0, TimeUnit.SECONDS,new ArrayBlockingQueue(CAPACITY));// 设置线程池的拒绝策略为"CallerRunsPolicy"pool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());// 新建10个任务,并将它们添加到线程池中。for (int i = 0; i < 10; i++) {Runnable myrun = new MyRunnable("task-"+i);pool.execute(myrun);}// 关闭线程池pool.shutdown();}}class MyRunnable implements Runnable {private String name;public MyRunnable(String name) {this.name = name;}@Overridepublic void run() {try {System.out.println(this.name + " is running.");Thread.sleep(100);} catch (Exception e) {e.printStackTrace();}}}

运行结果:

task-2 is running.task-3 is running.task-4 is running.task-5 is running.task-6 is running.task-7 is running.task-8 is running.task-9 is running.task-0 is running.task-1 is running.

结果说明:将"线程池的拒绝策略"由DiscardPolicy修改为CallerRunsPolicy之后,当有任务添加到线程池被拒绝时,线程池会将被拒绝的任务添加到"线程池正在运行的线程"中取运行。

8 Callable 和 Future 简介

Callable 和 Future 是比较有趣的一对组合。当我们需要获取线程的执行结果时,就需要用到它们。Callable用于产生结果,Future用于获取结果。

8.1Callable

Callable 是一个接口,它只包含一个call()方法。Callable是一个返回结果并且可能抛出异常的任务。

为了便于理解,我们可以将Callable比作一个Runnable接口,而Callable的call()方法则类似于Runnable的run()方法。

Callable的源码如下:

public interface Callable<V> {V call() throws Exception;}

说明:从中我们可以看出Callable支持泛型。

8.2Future

Future 是一个接口。它用于表示异步计算的结果。提供了检查计算是否完成的方法,以等待计算的完成,并获取计算的结果。

Future的源码如下:

public interface Future<V> {// 试图取消对此任务的执行。booleancancel(boolean mayInterruptIfRunning)// 如果在任务正常完成前将其取消,则返回 true。booleanisCancelled()// 如果任务已完成,则返回 true。booleanisDone()// 如有必要,等待计算完成,然后获取其结果。V get() throws InterruptedException, ExecutionException;// 如有必要,最多等待为使计算完成所给定的时间之后,获取其结果(如果结果可用)。V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;}

说明:Future用于表示异步计算的结果。它的实现类是FutureTask,在讲解FutureTask之前,我们先看看Callable, Future, FutureTask它们之间的关系图,如下:

说明:

(01) RunnableFuture是一个接口,它继承了Runnable和Future这两个接口。RunnableFuture的源码如下:

public interface RunnableFuture<V> extends Runnable, Future<V> {void run();}

(02) FutureTask实现了RunnableFuture接口。所以,我们也说它实现了Future接口。

9 示例和源码分析

我们先通过一个示例看看Callable和Future的基本用法,然后再分析示例的实现原理。

import java.util.concurrent.Callable;import java.util.concurrent.Future;import java.util.concurrent.Executors;import java.util.concurrent.ExecutorService;import java.util.concurrent.ExecutionException;class MyCallable implements Callable {@Overridepublic Integer call() throws Exception {int sum = 0;// 执行任务for (int i=0; i<100; i++)sum += i;//return sum;return Integer.valueOf(sum);}}public class CallableTest1 {public static void main(String[] args)throws ExecutionException, InterruptedException{//创建一个线程池ExecutorService pool = Executors.newSingleThreadExecutor();//创建有返回值的任务Callable c1 = new MyCallable();//执行任务并获取Future对象Future f1 = pool.submit(c1);// 输出结果System.out.println(f1.get());//关闭线程池pool.shutdown();}}

运行结果:

4950

结果说明:在主线程main中,通过newSingleThreadExecutor()新建一个线程池。接着创建Callable对象c1,然后再通过pool.submit(c1)将c1提交到线程池中进行处理,并且将返回的结果保存到Future对象f1中。然后,我们通过f1.get()获取Callable中保存的结果;最后通过pool.shutdown()关闭线程池。

9.1submit()

submit()在java/util/concurrent/AbstractExecutorService.java中实现,它的源码如下:

public Futuresubmit(Callable task) {if (task == null) throw new NullPointerException();// 创建一个RunnableFuture对象RunnableFuture ftask = newTaskFor(task);// 执行“任务ftask”execute(ftask);// 返回“ftask”return ftask;}

说明:submit()通过newTaskFor(task)创建了RunnableFuture对象ftask。它的源码如下:

protected RunnableFuturenewTaskFor(Callable callable) {return new FutureTask(callable);}

9.2. FutureTask的构造函数

FutureTask的构造函数如下:

public FutureTask(Callable callable) {if (callable == null)throw new NullPointerException();// callable是一个Callable对象this.callable = callable;// state记录FutureTask的状态this.state = NEW; // ensure visibility of callable}

9.3FutureTask的run()方法

我们继续回到submit()的源码中。

在newTaskFor()新建一个ftask对象之后,会通过execute(ftask)执行该任务。此时ftask被当作一个Runnable对象进行执行,最终会调用到它的run()方法;ftask的run()方法在java/util/concurrent/FutureTask.java中实现,源码如下:

public void run() {if (state != NEW ||!pareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return;try {// 将callable对象赋值给c。Callable c = callable;if (c != null && state == NEW) {V result;boolean ran;try {// 执行Callable的call()方法,并保存结果到result中。result = c.call();ran = true;} catch (Throwable ex) {result = null;ran = false;setException(ex);}// 如果运行成功,则将result保存if (ran)set(result);}} finally {runner = null;// 设置“state状态标记”int s = state;if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}}

说明:run()中会执行Callable对象的call()方法,并且最终将结果保存到result中,并通过set(result)将result保存。

之后调用FutureTask的get()方法,返回的就是通过set(result)保存的值。

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。