600字范文,内容丰富有趣,生活中的好帮手!
600字范文 > 线程池 volatile 原子性 并发工具类

线程池 volatile 原子性 并发工具类

时间:2020-01-08 02:57:23

相关推荐

线程池 volatile 原子性 并发工具类

目录

线程状态

线程池-基本原理

线程池 - Executors默认线程池

线程池 - ThreadPoolExecutor

线程池参数-拒绝策略

volatile

原子性

原子性 - AtomicInteger

AtomicInteger - 内存解析

AtomicInteger - 源码解析

悲观锁和乐观锁

并发工具类 - Hashtable

并发工具类 - ConcurrentHashMap基本使用

ConcurrentHashMap1.7原理

ConcurrentHashMap1.8原理

并发工具类 - CountDownLatch

并发工具类 - Semaphore

线程状态

如何验证线程的状态有几种?

API中查询Thread.State内部类, 可以看到虚拟机中线程的六种状态

NEW

至今尚未启动的线程处于这种状态。RUNNABLE

正在 Java 虚拟机中执行的线程处于这种状态。BLOCKED

受阻塞并等待某个监视器锁的线程处于这种状态。WAITING

无限期地等待另一个线程来执行某一特定操作的线程处于这种状态。TIMED_WAITING

等待另一个线程来执行取决于指定等待时间的操作的线程处于这种状态。TERMINATED

已退出的线程处于这种状态。

虚拟机中线程的六种状态?

NEW 创建对象

RUNNABLEstart()

BLOCKED无法获得锁对象

WAITINGwait()

TIMED_WAITING sleep()

TERMINATED所有代码执行完毕

线程池-基本原理

吃饭摔碗故事的解决思路?

1. 找一个柜子放碗, 此时柜子是空的

2. 第一次吃饭, 买一次碗

3. 吃完后将碗放入柜子

4. 第二次吃饭, 就不需要买碗了, 直接从柜子中拿

5. 吃完再次将碗放回柜子

之前使用多线程存在类似的问题? -> 效率问题

1. 用到线程就要创建

2. 用完之后线程就消亡了

如何解决?

1. 创建一个池子(线程池), 刚开始是空的

2. 有任务要执行时, 才会创建线程对象, 任务执行完毕, 将线程对象归还给池子

3. 所有任务都执行完毕, 关闭连接池

线程池是一种多线程处理形式,处理过程中将任务添加到队列,线程池在系统启动时即创建大量空闲的线程,程序将一个任务传给线程池,线程池就会启动一条线程来执行这个任务。执行结束以后,该线程并不会死亡,而是再次返回线程池中成为空闲状态,等待执行下一个任务。

简单来说,线程池就相当于是线程的集合。

线程池 - Executors默认线程池

Executors默认线程池使用思路?

1. 创建一个池子(线程池), 刚开始是空的 -> 使用Executors的静态方法 -> 查阅API

2. 有任务要执行时, 才会创建线程对象, 任务执行完毕, 将线程对象归还给池子 ->submit();

3. 所有任务都执行完毕, 关闭连接池 ->shutdown();

Executors类创建线程池的方法?

1. newCachedThreadPool();

创建一个可根据需要创建新线程的线程池, 最大容纳int的MAX_VALUE个线程

2. newFixedThreadPool(int 最大容量);

创建一个可重用固定线程数的线程池

创建线程池:newCachedThreadPool()

public class ThreadPoolDemo01 {public static void main(String[] args) throws InterruptedException {// 创建线程池服务对象, 可以控制线程池ExecutorService executorService = Executors.newCachedThreadPool();// 通过线程池服务对象, 创建线程executorService.submit(() -> {System.out.println(Thread.currentThread().getName() + "线程在执行了");});Thread.sleep(1000);// 通过线程池服务对象创建线程executorService.submit(() -> {System.out.println(Thread.currentThread().getName() + "线程在执行了");});// 关闭线程池executorService.shutdown();ExecutorService executorService = Executors.newFixedThreadPool(2);System.out.println(executorService);//}}

创建指定线程数量的线程池:newFixedThreadPool()

Executors类创建线程池的方法?1. newCachedThreadPool(); 创建线程池, 默认是空的, 最大容纳int的MAX_VALUE2. newFixedThreadPool(int 最大容量);如果查询默认的线程数量?1. debug查看, workers表示线程数量, 初始size=02. 调用ThreadPoolExecutor对象的getPoolSize方法代码示例:public class ThreadPoolDemo02 {public static void main(String[] args) {// 创建线程服务对象ExecutorService executorService = Executors.newFixedThreadPool(10);// 强转executorServiceThreadPoolExecutor executor = (ThreadPoolExecutor) executorService;System.out.println(executor.getPoolSize()); //0// 启动线程1executorService.submit(() -> {System.out.println(Thread.currentThread().getName() + "线程在执行了");});// 启动线程2executorService.submit(() -> {System.out.println(Thread.currentThread().getName() + "线程在执行了");});System.out.println(executor.getPoolSize()); //2// 释放资源executorService.shutdown();}}

线程池 - ThreadPoolExecutor

上述方式都是java帮我们创建线程池对象, 如果我们想自己创建怎么办?

跟进上述两个方法的源码1. newCachedThreadPool();public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());}2. newFixedThreadPool(int 最大容量);public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());}我们发现底层都new了ThreadPoolExecutor, 自己创建的话可以使用该类!

餐厅员工服务举例:

1. 正式员工数量核心线程数量

2. 餐厅最大员工数最大线程数量

3. 临时员工被辞退空闲时长(值)空闲时间(值)

4. 临时员工被辞退空闲时长(单位) 空闲时间(单位)

5. 排队客户 任务队列

6. 从哪里招人 创建线程的方式

7. 当排队人数过多, 拒绝策略 要执行的任务过多时的解决方案

ThreadPoolExecutor pool = new ThreadPoolExecutor(

核心线程数量, //不能小于0

最大线程数量, //不能小于等于0, 最大数量 ?= 核心线程数量

空闲线程最大存活时间, //不能小于0

时间单位, //时间单位

任务队列, //不能为null

创建线程工厂,//不能为null

拒绝策略 //不能为null

);

public class ThreadPoolDemo03 {public static void main(String[] args) {// 创建线程池ThreadPoolExecutor pool = new ThreadPoolExecutor(3, //核心线程数量5, //最大线程数量2, //空闲线程最大存活时间TimeUnit.SECONDS, //秒new ArrayBlockingQueue<>(10), //任务队列(阻塞队列)Executors.defaultThreadFactory(), //默认工厂new ThreadPoolExecutor.AbortPolicy() //任务的拒绝策略(停止));// 创建线程对象pool.submit(new MyRunnable());pool.submit(new MyRunnable());// 释放资源pool.shutdown();}}class MyRunnable implements Runnable {@Overridepublic void run() {System.out.println(Thread.currentThread().getName() + "线程执行了");}}

线程池参数-拒绝策略

1. 什么时候拒绝?

提交的任务 > 线程池最大容量 + 任务队列容量的时候

2. 如何拒绝?

2.1 ThreadPoolExecutor.AbortPolicy(); 丢弃任务抛出异常(默认策略)

2.2 ThreadPoolExecutor.DiscardPolicy(); 丢弃任务不抛出异常(不推荐)

2.3 ThreadPoolExecutor.DiscardOldestPolicy(); 抛弃队列中等待时间最久的,将当前任务加入

2.4 ThreadPoolExecutor.CallerRunsPolicy(); 调用任务的run()绕过线程池执行

默认策略代码示例:

public class ThreadPoolDemo04 {public static void main(String[] args) {// 创建线程池ThreadPoolExecutor pool = new ThreadPoolExecutor(2, //核心线程数量5, //最大线程数量2, //空闲线程最大存活时间TimeUnit.SECONDS, //秒new ArrayBlockingQueue<>(10), //任务队列: 让任务在队列中等有线程空闲了,再从队列中获取任务并执行Executors.defaultThreadFactory(), //默认工厂: 底层会按照默认方式创建线程对象new ThreadPoolExecutor.AbortPolicy() //拒绝策略(默认): 提交任务 > 最大线程数量 + 任务队列时拒绝);// ThreadPoolExecutor.AbortPolicy() 丢弃任务抛出异常(默认策略)// 如果提交任务 > 最大线程数量 + 任务队列, 抛出异常for (int i = 1; i <= 16; i++) {pool.submit(new MyRunnable());// java.util.concurrent.RejectedExecutionException}// 释放资源pool.shutdown();}}

非默认任务拒绝策略:

ThreadPoolExecutor.DiscardPolicy(); 丢弃任务不抛出异常(不推荐)

只能看到线程池最大线程数量 + 任务队列容量条结果, 没有报错提示

ThreadPoolExecutor.DiscardOldestPolicy(); 抛弃队列中等待时间最久的,将当前任务加入

只能看到线程池最大线程数量 + 任务队列容量条结果, 其中最后一条是最的任务

ThreadPoolExecutor.CallerRunsPolicy(); 调用任务的run()绕过线程池执行

只能看到线程池最大线程数量 + 任务队列容量条结果, main方法帮我们执行了其他任务

volatile

当一个共享变量被volatile修饰时,它会保证修改的值会立即被更新到主存,当有其他线程需要读取时,它会去内存中读取新值。

使用volatile关键字会强制将修改的值立即写入主存! 后续再出详细的讲解

class Test {public static void main(String[] args) {//创建线程对象Girl g = new Girl();g.setName("女孩");Boy b = new Boy();b.setName("男孩");//开启线程g.start();b.start();}}public class Money {// 通过volatile关键字解决(修饰共享数据)// 强制要求每一次使用共享数据前, 先去堆看一下共享数据, 及时更新自己的[变量副本]为最新值public static volatile int money = 100000;}public class Boy extends Thread{@Overridepublic void run() {try {Thread.sleep(10);// 修改金额Money.money = 90000;} catch (InterruptedException e) {e.printStackTrace();}}}public class Girl extends Thread{@Overridepublic void run() {while (Money.money == 100000){}System.out.println("金额发生变动, 停止循环");}}

volatile关键字不能保证原子性(同步锁可以),

volatile关键字只能保证当前线程栈中, 变量副本的值是新的

送100次100个冰淇淋代码示例:class Test {public static void main(String[] args) {// 创建MyAtomThread对象MyAtomThread mt = new MyAtomThread();// 送100次100个冰淇淋for (int i = 1; i <= 100; i++) {Thread t = new Thread(mt);t.start();}}}public class MyAtomThread implements Runnable {private int count = 0; // 冰淇淋的个数@Overridepublic void run() {// 送100个冰淇淋for (int i = 1; i <= 100; i++) {/*代码运行结果可能错误! 问题分析: count++不是一个原子性的操作, 每一步操作都有可能被抢执行权!1.从共享数据中读取数据到本线程栈中2.修改本线程栈中的变量副本3.将本线程栈中的变量副本的值,赋值给共享数据*///Thread.sleep(10);count++;System.out.println(Thread.currentThread().getName() + "送了第" + count + "个冰激凌");}}}

原子性

原子性是指是指在一次或者多次操作过程中,要么全部执行成功要么全部执行失败,有着“同生共死”的感觉。即使在多个线程一起执行的时候,一个操作一旦开始,就不会被其他线程所干扰。我们先来看看哪些是原子操作,哪些不是原子操作,有一个直观的印象:

int a = 10; //1

a++; //2

int b=a; //3

a = a+1; //4

上面这四个语句中只有第1个语句是原子操作,将10赋值给线程工作内存的变量a

语句2(a++),实际上包含了三个操作:1. 读取变量a的值;2:对a进行加一的操作;3.将计算后的值再赋值给变量a,而这三个操作无法构成原子操作。

对语句3,4的分析同理可得这两条语句不具备原子性。

原子性 - AtomicInteger

在JDK1.5之后提供了一个在util包下的comcurrent包下的atomic包(原子包)

里面提供对各种类的原子操作

AtomicInteger常用方法

构造方法:

public AtomicInteger();初始值为0的新 AtomicInteger

public AtomicInteger(int initialValue);给定初始值的新 AtomicInteger

成员方法:

int get();获取值

int getAndIncrement();以原子方式将当前值+1, 返回自增的值

int incrementAndGet();以原子方式将当前值+1, 返回自增的值

int addAndGet(int data);以原子方式将当前值和参数相加, 返回相加结果

int getAndSet(int value);以原子方式设置为参数的值, 返回旧值

public class Demo01_Constructor {public static void main(String[] args) {// 空参构造 public AtomicInteger();AtomicInteger ac = new AtomicInteger();System.out.println(ac); //0// 带参构造 public AtomicInteger(int initialValue);AtomicInteger ac2 = new AtomicInteger(10);System.out.println(ac2); //10}}public class Demo02_Method {public static void main(String[] args) {// 创建对象AtomicInteger ac = new AtomicInteger(10);// int get(); 获取值System.out.println(ac.get()); //获取值 10// int getAndIncrement(); 以原子方式将当前值+1, 返回自增前的值System.out.println(ac.getAndIncrement()); //返回自增前的值 10System.out.println(ac.get()); //10 + 1 = 11// int incrementAndGet(); 以原子方式将当前值+1, 返回自增后的值System.out.println(ac.incrementAndGet()); //11 + 1 = 12// int addAndGet(int data); 以原子方式将当前值和参数相加, 返回相加结果System.out.println(ac.addAndGet(20)); //返回相加结果 32// int getAndSet(int value); 以原子方式设置为参数的值, 返回旧值System.out.println(ac.getAndSet(50)); //返回旧值 32System.out.println(ac.get()); //设置为参数的值 50}}

AtomicInteger - 内存解析

首先我们通过AtomicInteger将代码实现:class Test {public static void main(String[] args) {MyAtomThread mt = new MyAtomThread();for (int i = 1; i <= 100; i++) {Thread t = new Thread(mt);t.start();}}}public class MyAtomThread implements Runnable {//2. AtomicInteger(效率高,线程安全)AtomicInteger ac = new AtomicInteger(); //或者给0@Overridepublic void run() {for (int i = 1; i <= 100; i++) {// count++; //数据错误, 原因count++不具备原子性// int incrementAndGet(); 以原子方式将当前值+1, 返回自增后的值int count = ac.incrementAndGet();System.out.println(Thread.currentThread().getName() + "送了第" + count + "个冰激凌");}}}AtomicInteger原理: CAS + 自旋有三个操作数据 (内存值V, 旧值A, 要修改的值B)当旧值A == 内存值, 证明在当前线程操作时, 没有其他线程来过, 此时可以修改, 将V改为B当旧值A != 内存值, 证明在当前线程操作时, 有其他线程来过, 此时不能修改, 进行自旋自旋: 重新获取现在的最新内存值V, 继续进行上述判断简单理解:在修改共享数据时, 将修改前的旧值记录下来如果现在的内存值, 和原来的旧值一样, 证明没有其他县城操作过内存值, 则可以修改内存值如果现在的内存值, 和原来的旧值不一样, 证明有其他线程操作过内存值, 则不能修改内存值继续获取最新的内存值, 再次进行上述操作(自旋)

AtomicInteger - 源码解析

源码解析:public AtomicInteger(int initialValue) {value = initialValue;}// 先自增, 然后返回自增后的结果public final int incrementAndGet() {// this: 当前的AtomicInteger对象// 1: 自增一次// +1: 最后的就是自增后的结果return U.getAndAddInt(this, VALUE, 1) + 1;}@HotSpotIntrinsicCandidatepublic final int getAndAddInt(Object o, long offset, int delta) {// o: 内存值// v + delta: 修改后的值// v: 旧值int v;// do while: 自旋, 不断获取旧值do {v = getIntVolatile(o, offset);// 判断条件: weakCompareAndSetInt方法比较内存中的值和旧值是否相等// 情况1: 比较内存值和旧值是否相等, 相等就修改, 返回true结束循环// 情况2: 比较内存值和旧值是否相等, 不相等, 不能修改, 返回false继续循环(自旋转)} while (!weakCompareAndSetInt(o, offset, v, v + delta));return v;}

悲观锁和乐观锁

synchronized和CAS的区别:

1.相同点:

在多线程的情况下, 都可以保证共享数据的安全性

2.不同点

synchronized是从最坏的角度出发, 认为每次获取数据的时候, 别人都有可能修改, 所以每次操作共享数据之前, 都会上锁 (悲观锁)

CAS时候从乐观的角度出发, 认为每次获取数据的时候, 别人都不会修改, 所以不会上锁. 只不过在修改共享数据的时候, 检查一下别人有没有操作过这个数据 (乐观锁)

如果操作了, 那么再次获取最新的值

如果没有操作, 那么直接修改共享数据的值

并发工具类 - Hashtable

HashMap是线程不安全的, 在多线程环境下会存在问题

为了保证线程安全, 我们可以使用Hashtable, 但是Hashtable效率低

Hashtable底层是哈希表结构, 由数组 + 链表组成

数组默认长度16, 加载因子0.75 (存满12个要扩容)

链表是当计算当前元素要存入的位置有元素时, 先判断内容

如果一样则不存

如果不一样老元素挂在新元素下, 形成链表 (哈希桶)

Hashtable效率低的原因:

通过查看底层代码我们发现, 底层使用悲观锁(synchronized), 每一次操作都会讲整张表锁起来

并发工具类 - ConcurrentHashMap基本使用

HashMap: 线程不安全

Hashtable: 线程安全, 效率低 (底层悲观锁锁整张表)

ConcurrentHashMap: 线程安全, 效率高(分析JDK7和8的底层区别)

ConcurrentHashMap1.7原理

创建ConcurrentHashMap对象时:

创建一个长度为16的大数组, 加载因子是0.75 (Segment[])

创建一个长度为2的小数组, 将地址值赋值给0索引处, 其他索引位置都为null (HashEntry[])

0索引处数组是用来当做模板使用,当新元素添加进来会以0索引数组为模板来创建长度为2的小数组

添加元素时, 根据键的哈希值来计算出在大数组中的位置

如果为null, 按照模板创建小数组

创建完毕, 会二次哈希计算出在小数组中应存入的位置, 由于第一次都是null所以直接存入

如果不为null, 会二次哈希, 计算出在小数组中应存入的位置

如果小数组需要扩容, 则扩容为2倍 (存到索引1的地方)

如果不需要扩容, 则会判断小数组当前索引位置是否为null

如果为null代表没有元素, 直接存入

如果不为null代表有元素, 则根据equals方法比较属性值

一样则不存

不一样则将老元素挂在新元素下, 形成链表 (哈希桶)

综上所述, 如果这个大数组Segment[]存满了, 就是一个16*16的大哈希表

为什么效率高?

因为每一次操作只会锁小表 (小数组HashEntry[]), 不会锁大表

所以在JDK1.7之前, 某一时刻最多允许16个线程同时访问

ConcurrentHashMap1.8原理

相关快捷键:

Alt + 7: 底层中展示所有方法

Ctrl + Alt + Shift + U: 底层中展示继承结构

ConcurrentHashMap在JDK1.8底层分析:

结构: 哈希表 (数组 + 链表 + 红黑树)

线程安全: CAS机制 + synchronized同步代码块

1. 如果使用空参构造创建ConcurrentHashMap对象时, 则什么都不做 (查看空参构造及父类的空参)

2. 在第一次添加元素时 (调用put方法时) 创建哈希表 (initTable方法)

计算当前元素应存入的索引位置

如果为null, 代表没有元素, 则通过CAS算法, 将本节点添加到数组中

如果不为null, 代表有元素, 则利用volatile获得当前索引位置最新的节点地址挂在它下面, 形成链表, 链表长度大于等于8的时候, 自动转为红黑树

3. 每次操作, 会以链表或者树的头结点为锁对象, 配合悲观锁(synchronized) 保证多线程操作集合时的安全问题

并发工具类 - CountDownLatch

CountDownLatch同步计数器,当计数器数值减为0时,所有受其影响而等待的线程将会被激活,这样保证模拟并发请求的真实性。

CountDownLatch应用场景

让一条线程等待其他线程执行完毕后再执行

CountDownLatch相关方法

1.CountDownLatch(int count);(构造方法)表示要等待的线程数量

2.public void await();让线程等待

3.public void countDown();表示当前线程执行完毕

案例: 使用代码实现, 妈妈等三个孩子吃饺子, 吃完收拾碗筷妈妈等待 await();sout("收拾碗筷");孩子*3补齐带参构造sout(name+吃饺子);说一声自己吃完了 countDown() 测试类开启4条线程创建CountDownLatch()对象传递给4条线程代码示例public class CountDownLatchTest {public static void main(String[] args) {//创建CountDownLatch()对象传递给4条线程CountDownLatch countDownLatch = new CountDownLatch(3); //要等待三个(孩子)线程//开启4条线程Mother mother = new Mother(countDownLatch);mother.start();Child01 c1 = new Child01(countDownLatch);Child02 c2 = new Child02(countDownLatch);Child03 c3 = new Child03(countDownLatch);c1.start();c2.start();c3.start();}}//孩子1class Child01 extends Thread {//补齐带参构造private CountDownLatch countDownLatch;public Child01(CountDownLatch countDownLatch) {this.countDownLatch = countDownLatch;}@Overridepublic void run() {//打印结果for (int i = 1; i < 10; i++) {System.out.println(getName() + "在吃饺子" + ",吃了" + i + "个");}//说一声自己吃完了countDownLatch.countDown();}}//孩子2class Child02 extends Thread {//补齐带参构造private CountDownLatch countDownLatch;public Child02(CountDownLatch countDownLatch) {this.countDownLatch = countDownLatch;}@Overridepublic void run() {//打印结果for (int i = 1; i < 15; i++) {System.out.println(getName() + "在吃饺子" + ",吃了" + i + "个");}//说一声自己吃完了countDownLatch.countDown();}}//孩子3class Child03 extends Thread {//补齐带参构造private CountDownLatch countDownLatch;public Child03(CountDownLatch countDownLatch) {this.countDownLatch = countDownLatch;}@Overridepublic void run() {//打印结果for (int i = 1; i < 20; i++) {System.out.println(getName() + "在吃饺子" + ",吃了" + i + "个");}//说一声自己吃完了countDownLatch.countDown();}}//妈妈class Mother extends Thread {//补齐带参构造private CountDownLatch countDownLatch;public Mother(CountDownLatch countDownLatch) {this.countDownLatch = countDownLatch;}@Overridepublic void run() {//等待try {countDownLatch.await();} catch (InterruptedException e) {e.printStackTrace();}//打印结果System.out.println("妈妈正在收拾碗筷~");}}

并发工具类 - Semaphore

通常用于限制可以访问某些资源(物理或逻辑的)的线程数目

也可以比作同行证,给一个或多个线程发同行证,到出口(或执行完毕)时再把通行证归还,再发给下一个或多个线程

Semaphore构造方法

Semaphore(int 最大通行数量);

创建具有给定的许可数和非公平的公平设置的Semaphore

Semaphore(intpermits, booleanfair);

创建具有给定的许可数和给定的公平设置的Semaphore

Semaphore使用步骤分析

1. 需要有人管理这个通道 - 创建Semaphore对象

2. 有车子进来, 发放通行证 - acquire();发

3. 有车子出去, 收回通行证 - release();收

4. 如果通行证都发出去了, 那么只允许车子等待 - 自动完成

代码示例 (实现Runnable接口实现多线程)public class SemaphoreTest {public static void main(String[] args) {//创建MyRannable对象MyRannable mr = new MyRannable();//多次启动线程for (int i = 1; i <= 50 ; i++) {new Thread(mr).start();}}}class MyRannable implements Runnable{//1. 需要有人管理这个通道 - 创建Semaphore对象Semaphore semaphore = new Semaphore(2); //最大通行数量@Overridepublic void run() {//2. 有车子进来, 发放通行证 - acquire();发try {semaphore.acquire();System.out.println("获取通行证,车子进来!");semaphore.release();System.out.println("归还通行证,车子出去了!");} catch (InterruptedException e) {e.printStackTrace();}}}

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。