Java多线程梳理

Java多线程

并发三要素

Java并发的三要素:可见性、原子性、有序性。线程不安全即这三个性质有至少一个被破坏了。

这三个要素分别是什么意思?分别由什么来保障的?即这个东西没有保障才导致的现成安全问题。

接下来,我们先来介绍这三个要素是什么,然后说明是什么最终导致了这三个要素的破坏,进而使线程不安全。最后,再来说Java是如何保证线程安全的。

可见性:一个线程对共享变量的修改,别的线程立即可见。

立即可见是说,不同的线程对应不同的cpu去使用共享变量去做运算,而共享变量是放在主存里的,所有线程对共享变量进行操作改变其值,都要写回主存,让其他线程可见。
这意味着,如果一个线程对共享变量的修改没有及时写回主存,让其他线程看到,而其他线程用了共享变量的旧值做了运算,就违背了可见性。

在计算机中,可见性的破坏是由什么导致的呢?
答案是cpu缓存!
以下面代码为例,对于多线程编程,如果thread1使用的是cpu1,thread2使用的是cpu2,当thread1执行i=10时,计算机做的是把i的初始值先加载到cpu1的高速缓存中,然后再赋值为10,再写回主存。而如果thread2把i的值加载到它的高速缓存发生在cpu1把i=10写回主存之前,那j的值就还是0.

1
2
3
4
5
6
//线程1执行的代码
int i = 0;
i = 10;

//线程2执行的代码
j = i;

原子性:一个操作或者多个操作,要么全部执行并且执行的过程不会被任何因素打断,要么就都不执行。
以经典的转账问题为例:比如从账户A向账户B转1000元,那么必然包括2个操作:从账户A减去1000元,往账户B加上1000元。如果只进行了第一个步骤,而因为某些原因,没有执行第二个步骤,就破坏了原子性。

有序性:程序执行的顺序按照代码的先后顺序执行。
但是计算机为了提高执行性能,编译器和处理器会对指令进行重排序。这就间接导致内存可见性的问题。

重排序分为编译器优化重排序、指令级并行重排序、内存系统重排序,第一个属于编译器重排序,后两个属于处理器重排序。

Java是如何解决并发问题的?
通过Java内存模型规范了JVM如何提供按需禁用缓存和编译优化的方法。
具体的方法有:volatile、synchronized 和 final 三个关键字,Happens-Before 规则。
接下来具体来说分别是如何保证三要素的。

原子性的保证:是通过synchronized和Lock来保证原子性的。(synchronized和Lock能够保证任一时刻只有一个线程执行该代码块)
原子操作:读取、赋值(必须是将数字赋值给某个变量,变量之间的相互赋值不是原子操作)。

1
2
3
4
x = 10;        //语句1: 直接将数值10赋值给x,也就是说线程执行这个语句的会直接将数值10写入到工作内存中
y = x; //语句2: 包含2个操作,它先要去读取x的值,再将x的值写入工作内存,虽然读取x的值以及 将x的值写入工作内存 这2个操作都是原子性操作,但是合起来就不是原子性操作了。
x++; //语句3: x++包括3个操作:读取x的值,进行加1操作,写入新的值。
x = x + 1; //语句4: 同语句3

可见性的保证:volatile关键字(当一个共享变量被volatile修饰时,它会保证修改的值会立即被更新到主存,当有其他线程需要读取时,它会去内存中读取新值。)
通过synchronized和Lock也能够保证可见性,synchronized和Lock能保证同一时刻只有一个线程获取锁然后执行同步代码,并且在释放锁之前会将对变量的修改刷新到主存当中。因此可以保证可见性。

有序性的保证:volatile只能保证一部分的有序性。而synchronized和Lock可以完全保证有序性。JMM层面是通过Happens-Before规则保证有序性的。
❓❓❓Happens-Before和前几种方法有什么关系,它具体是怎么实现的?

共享变量按照安全强度分为五个安全级别:不可变、绝对线程安全、相对线程安全、线程兼容和线程对立。
不可变:如String类这种被final修饰过的数据类型。
绝对线程安全:不管运行时环境如何,调用者都不需要任何额外的同步措施。
相对线程安全:对象调用的时候不作同步措施,但在程序连续执行调用的顺序上,要通过同步手段(加锁)保证调用的正确性。
线程兼容:对象本身并非线程安全的,在调用的时候就要通过同步手段保证线程安全。

线程安全的类:绝大多数平常使用的类,比如HashMap、StringBuilder等。
线程不安全的类:JUC包下的类,如NonCurrentHashMap,
线程对立: 线程对立是指无论调用端是否采取了同步措施,都无法在多线程环境中并发使用的代码。(在代码中很少出现,略)

线程安全的实现方法:
互斥同步:synchronized 和 ReentrantLock。

线程原理

线程生命周期

实践

创建线程

1
Thread thread = new Thread(new DemoThread());

创建线程基本就用这个方法。只是传参的时候,要传一个Runnable Target,用Thread创建线程,也是要重写Runnable接口的run方法。

Thread和Runnable的关系:Thread类是Runnable接口的实现类。

当然也可以自定义一个线程类,直接implements Runnable接口,@Override run方法。

线程生命周期中的方法调用

参考:https://www.cnblogs.com/paddix/p/5381958.html

wait、notify、sleep、yield、join

wait和notify是Object的方法,sleep、yield、join是Thread类方法。

wait

wait是将当前运行的线程挂起(进入阻塞状态),知道notify来唤醒该线程。

wait方法是通过monitor对象来实现的。

wait方法的使用要放在同步范围内,即:在synchronized修饰的代码块内

notify/notifyAll

notify表示持有对象锁的线程准备释放对象锁权限。

调用wait方法后,线程会释放对monitor对象的所有权;

一个通过wait方法阻塞的线程,必须同时满足以下两个条件才能被唤醒:

  1. 需要超时唤醒或调用了notify/notifyAll方法
  2. 线程唤醒后竞争到锁(monitor)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class Test {

public synchronized void testWait() {
System.out.println(Thread.currentThread().getName() + "START-------");
try {
wait(0); // wait使用
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "END-------");
}

public static void main(String[] args) throws Exception {
final Test test = new Test();
for (int i = 0; i < 5; i++) {
new Thread(new Runnable() {
@Override
public void run() {
test.testWait();
}
}).start();
}
synchronized (test) {
test.notify(); // notify使用
}
Thread.sleep(2000);
System.out.println("分割线------------");
synchronized (test) {
test.notifyAll(); // notifyAll使用
}
}
}

sleep

sleep方法和wait方法功能类似,但是调用起来更简单。这是说,wait方法依赖于同步,而sleep方法可以直接调用

yield

yield是暂停当前线程(将Running状态改为Runnable状态),以便其他线程有机会执行。

join

join是父线程等待子线程执行完再执行,将异步执行的线程合并为同步执行。

join是通过wait方法将线程阻塞,如果join的线程还在执行,就把当前线程阻塞起来,直到join的线程执行完成,再执行当前线程。

线程安全的实现方法

互斥同步: synchronized 和 ReentrantLock
非阻塞同步: CAS, AtomicXXXX
无同步方案: 栈封闭,Thread Local,可重入代码

synchronized

概要

synchronized是java的一个关键字,依赖锁来保证线程安全的。

synchronized可以锁一个对象,也可以锁一个类。当作用于当前实例,或者指定的某个实例时,锁的是对象;当作用于一个静态方法,或者通过实例反射获取类的时候,锁的是一个类。

synchronized是基于monitor机制实现的。所谓monitor机制是说一个对象关联一个monitor锁,同一时间只能被一个线程持有。java对象会在对象头里存有相关的锁信息(锁标志位,锁计数器,当前持有monitor锁的线程id等)。

从jvm层面来说,jvm会在synchronized代码块的前后插入monitorenter和moniterexit指令。当线程想获取对象的monitor锁时,要查看锁计数器,当锁计数器为0的时候,表示该对象当前处于未加锁状态,该线程可以获得对象的monitor锁;对象会记录当前线程的id,当线程重入该对象的时候,锁计数器会+1,当线程释放monitor所有权的时候,锁计数器会-1,直到计数器减到0,表明该对象处于无锁状态;在对象处于加锁状态期间,其他线程获取monitor锁失败后会进入阻塞队列等待被唤醒。

从os层面来说,synchronized底层时通过互斥锁实现的。当多个线程存在竞争时,获取锁失败的线程会进入阻塞队列,在唤醒时会涉及到内核态和用户态的转换,比较消耗系统资源,因此针对该问题进行了锁优化(锁升级/锁膨胀)。

锁膨胀过程是不可逆的。偏向锁不涉及资源竞争,性能很高;自旋锁会通过自旋等待锁的释放,会提高响应速度,但牺牲了cpu资源;重量级锁响应速度会慢一点,但解放了cpu,提高了吞吐量。

synchronized的使用

synchronized可以锁对象,也可以锁一个类。

对象锁

对象锁包括方法锁(默认锁对象为this,当前实例对象)和同步代码块锁(用户指定锁对象)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class SynchronizedObjectLock implements Runnable {
static SynchronizedObjectLock instence = new SynchronizedObjectLock();

@Override
public void run() {
// 同步代码块形式——锁为this,两个线程使用的锁是一样的,线程1必须要等到线程0释放了该锁后,才能执行
synchronized (this) {
System.out.println("我是线程" + Thread.currentThread().getName());
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "结束");
}
}

public static void main(String[] args) {
Thread t1 = new Thread(instence); // 两个线程抢同一个对象资源
Thread t2 = new Thread(instence);
t1.start();
t2.start();
}
}
/* 输出:
我是线程Thread-0
Thread-0结束
我是线程Thread-1
Thread-1结束
*/
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
//指定锁的对象,然后接一个同步代码块,
public class SynchronizedObjectLock implements Runnable {
static SynchronizedObjectLock instence = new SynchronizedObjectLock();
// 创建2把锁
Object block1 = new Object();
Object block2 = new Object();

@Override
public void run() {
// 这个代码块使用的是第一把锁,当他释放后,后面的代码块由于使用的是第二把锁,因此可以马上执行
synchronized (block1) {
System.out.println("block1锁,我是线程" + Thread.currentThread().getName());
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("block1锁,"+Thread.currentThread().getName() + "结束");
}

synchronized (block2) {
System.out.println("block2锁,我是线程" + Thread.currentThread().getName());
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("block2锁,"+Thread.currentThread().getName() + "结束");
}
}

public static void main(String[] args) {
Thread t1 = new Thread(instence);
Thread t2 = new Thread(instence);
t1.start();
t2.start();
}
}
/*输出:
block1锁,我是线程Thread-0
block1锁,Thread-0结束
block2锁,我是线程Thread-0  // 可以看到当第一个线程在执行完第一段同步代码块之后,第二个同步代码块可以马上得到执行,因为他们使用的锁不是同一把
block1锁,我是线程Thread-1
block2锁,Thread-0结束
block1锁,Thread-1结束
block2锁,我是线程Thread-1
block2锁,Thread-1结束
*/
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//锁方法
public class SynchronizedObjectLock implements Runnable {
static SynchronizedObjectLock instence = new SynchronizedObjectLock();

@Override
public void run() {
method();
}

public synchronized void method() {
System.out.println("我是线程" + Thread.currentThread().getName());
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "结束");
}

public static void main(String[] args) {
Thread t1 = new Thread(instence);
Thread t2 = new Thread(instence);
t1.start();
t2.start();
}
}

类锁

锁静态方法或直接通过反射锁类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class SynchronizedObjectLock implements Runnable {
static SynchronizedObjectLock instence1 = new SynchronizedObjectLock();
static SynchronizedObjectLock instence2 = new SynchronizedObjectLock();

@Override
public void run() {
method();
}

// synchronized用在静态方法上,默认的锁就是当前所在的Class类,所以无论是哪个线程访问它,需要的锁都只有一把
public static synchronized void method() {
System.out.println("我是线程" + Thread.currentThread().getName());
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "结束");
}

public static void main(String[] args) {
Thread t1 = new Thread(instence1);
Thread t2 = new Thread(instence2);
t1.start();
t2.start();
}
}
/* 输出:
我是线程Thread-0
Thread-0结束
我是线程Thread-1
Thread-1结束
*/
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 锁类
public class SynchronizedObjectLock implements Runnable {
static SynchronizedObjectLock instence1 = new SynchronizedObjectLock();
static SynchronizedObjectLock instence2 = new SynchronizedObjectLock();

@Override
public void run() {
// 所有线程需要的锁都是同一把
synchronized(SynchronizedObjectLock.class){
System.out.println("我是线程" + Thread.currentThread().getName());
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "结束");
}
}

public static void main(String[] args) {
Thread t1 = new Thread(instence1);
Thread t2 = new Thread(instence2);
t1.start();
t2.start();
}
}
/*输出:
我是线程Thread-0
Thread-0结束
我是线程Thread-1
Thread-1结束
*/

synchronized的原理(synchronized是如何保证线程安全的)

原理简介

synchronized是基于monitor锁机制保证线程安全的。

在JVM层面:

会通过MonitorenterMonitorexit指令作用在synchronized域的前后,通过对象锁计数器的+1/-1,保证代码块的同步。在同一时间每个对象与一个monitor锁关联,且只能被一个线程获得。当一个对象尝试获得monitor锁的时候:1、当monitor计数器为0,获取成功,计数器+1,别的线程再获得就只能等待;2、当对象已经拿到monitor所有权,又重入了这把锁,计数器会一直累加;3、当monitor锁已被其他线程获取了,会等待锁的释放。

当对象释放monitor所有权的时候,计数器会-1,当计数器不为0的时候,对象并不会释放对monitor锁的所有权,下一次是可以重入的;当计数器为0的时候,会释放对monitor锁的所有权,表明其他线程可以尝试获取到锁资源。

在操作系统层面:

monitor锁本质又是依赖操作系统底层的互斥锁(mutex lock)实现的,线程A获取对象的锁后,线程B如果再申请对象的锁,会进行放到同步队列阻塞,直到A释放锁对象,B才能被唤醒去竞争对象锁。该过程涉及线程的阻塞和唤醒,在os层面对象用户态到核心态的转换,开销很大。因此在java1.6,对锁进行了优化。

synchronized对于其他线程可见是基于Happens-Before对监视器锁制定的一条规则:对同一个监视器的解锁happens-before对该监视器的加锁。

JVM锁优化

当程序初次进入synchronized代码块的时候,对象头里的锁状态由无锁转换为偏向锁。偏向锁表现为同一线程可重入(Mark Word里存有线程ID)。

当存在锁竞争时,偏向锁升级为自旋锁(轻量级锁)。

偏向锁相对自旋锁只有一次CAS操作,效率极高。而自旋锁存在“忙等”,涉及多次CAS操作。

当锁竞争严重时(自旋次数超过10次),自旋锁将再次升级为重量级锁

锁升级

在整个获得锁和释放锁的过程中,通过CAS操作修改对象头里的锁标志位。

CAS是一种能够实现变量同步无锁算法。该算法涉及三个操作数,需要读写的内存值V。进行比较的值A(工作内存对主存V的拷贝副本),和要写入的新值B。当且仅当V = A时,CAS用B来更新V,否则不执行任何操作。

CAS存在的问题:ABA问题、自旋忙等、只能保证一个共享变量的原子操作。

锁粗化:当JVM检测到连续几条加锁和释放锁的操作,会对锁进行粗话,在更大范围内进行加锁和释放锁。(如连续多个StringBUffer的append操作)

锁消除:代码层面要求同步,但数据层面不涉及竞争,会删除不必要的加锁操作。(字符串拼接的时候,在数据不涉及竞争时,底层用StringBuilder实现)

锁的优缺点对比

优点 缺点 使用场景
偏向锁 加锁和解锁不需要CAS操作,没有额外的性能消耗,和执行非同步方法相比仅存在纳秒级的差距 如果线程间存在锁竞争,会带来额外的锁撤销的消耗 适用于只有一个线程访问同步快的场景
轻量级锁 竞争的线程不会阻塞,提高了响应速度 如线程成始终得不到锁竞争的线程,使用自旋会消耗CPU性能 追求响应时间,同步快执行速度非常快
重量级锁 线程竞争不适用自旋,不会消耗CPU 线程阻塞,响应时间缓慢,在多线程下,频繁的获取释放锁,会带来巨大的性能消耗 追求吞吐量,同步快执行速度较长

java对象头

java对象头包含三个部分:

  1. MarkWord(32bit)
  2. 指向类的指针(32bit,指向存在方法区的Class对象)
  3. 数组长度(当对象是数组的时候才有这部分)

锁信息在java对象头的MarkWord里。

java对象头-markword

synchronized与Lock

synchronized和Lock的区别:

synchronized Lock
实现层面 Java关键字,在JVM层面实现加锁和解锁。 Lock时一个接口,在代码层面实现的加锁和解锁。
作用范围 代码块、方法上 代码块
释放锁的时机 异常时自动释放锁 需要在finally显示释放锁,不会自动释放
持有锁时间 线程拿不到锁要自旋或阻塞 可以设置超时时间
获取锁判断 不知道是否获取锁 可以通过tryLock进行判断
公平性 非公平 公平 / 非公平
中断 不可中断 可中断

synchronized和ReentrantLock的对比:

synchronized ReentrantLock
锁实现机制 monitor机制(mutex lock) 依赖AQS
锁释放 自动释放 显示释放
锁类型 非公平锁 公平 / 非公平
条件队列 关联一个条件队列 关联多个条件队列
可重入性 可重入 可重入
灵活性 支持响应中断、超时、尝试获取锁

volatile

概要

在并发环境下,volatile修饰的变量可以保证可见性和有序性。

实现原理

可见性的实现(内存屏障lock前缀指令缓存一致性

volatile是通过内存屏障(一条lock前缀的cpu指令)实现可见性的,当volatile修饰的变量在写的时候,JVM会向处理器发送一条lock前缀指令,将该变量在处理器缓存行中的数据写回系统内存。各处理器再通过缓存一致性协议保证缓存该变量的数据是一致的。

缓存一致性协议:每个处理器通过嗅探在总线上传播的数据来检查自己缓存行数据的内存地址被修改,此时处理器会将缓存行数据设置成无效状态,并重新从系统内存中把数据读到处理器缓存里。

有序性的实现(Happens-Before原则)

volatile通过Happens-Before原则保证有序性,Happens-Before对volatile变量的一条原则是:对volatile变量的写happens-before任意后续多个volatile变量的读。

在jvm层面也是通过内存屏障实现的:对于volatile域的写,分别在前和后加一道屏障,表示禁止与上面的普通写和下面volatile读/写重排序;对于volatile域的读,分别在其后加两道内存屏障,表示禁止与下面的普通读和普通写重排序。

应用场景

  1. 状态标志:标识一个Boolean变量,指示发生了一次重要的一次性事件;

  2. 单例模式的双重校验(double check)实现方式中,对于单例要用volatile声明;

  3. 独立观察:当有一个变量不断更新,后台需要定时读取的时候,需要用volatile声明。

线程池

线程池简介

线程池的UML图

Java线程池UML

  • <interface>Executor
    • void execute(Runnable command);
  • <interface>ExecutorService extends <interface>Executor
    • <T> Future<T> submit(Callable<T> task)
    • <T> Future<T> submit(Runnable task, T result);
    • Future<?> submit(Runnable task);
  • <class>AbstractExecutorService implements <interface>ExecutorService
  • <class>ThreadPoolExecutor extends <class>AbstractExecutorService

Executor是顶层接口,只有一个execute方法,任务执行的时候需要传入一个Runnable的对象,并且任务执行完没有返回值;

ExecutorService继承于Executor,也是一个接口,但是提供了更加丰富的功能。如提供了submit执行任务的方法,可以配合Future使用,得到任务执行的状态和执行成功的返回值。同时可以通过shutdown方法关闭线程、cancel取消任务、invoke得到任务执行的返回结果。

AbstractExecutorServiceExecutorService的抽象实现类,实现了submit、shutdown、invoke、cancel等方法。

ThreadPoolExecutorAbstractExecutorService的子类,是对线程池复杂功能的具体实现。Executors工具类的三种线程池(FixedThreadPool/SingleThreadPool/CachedThreadPool)就是通过ThreadPoolExecutor来实现的。

Future
Reference
Future保存异步计算的结果,可以在我们执行任务时去做其他工作。

通过Future可以得到任务的返回结果:

  1. 通过isDone()方法,判断任务是否已完成,并可以通过cancel()方法中断取消任务;
  2. get()可以获得重写Callable接口的call方法的返回结果;
  3. get()方法可以通过设置时间参数判断任务是否执行超时。

执行方法的返回值

execute方法没有返回值,submit方法都有一个Future的返回值,Callable的task相比于Runnable的task有一个返回值,在调用future.get()的时候会将Callable的返回值返回。

使用线程池的好处

  1. 降低资源消耗:线程池创建线程后可以反复利用,省去线程反复创建、销毁带来的资源消耗;
  2. 提高响应速度:线程池内有空闲线程时,可以及时响应任务,执行线程;
  3. 提高线程管理性:线程池提供最大线程数参数,避免线程无限创建,会造成系统崩溃;
  4. 可扩展功能:支持任务定时执行、延迟执行。

几种不同类型的线程池及使用场景

类型介绍及场景使用

Executors类几个常见的静态方法,对应不同类型的线程池。其中FixedThreadPoolSingleThreadExecutorCachedThreadPool都是用ThreadPoolExecutor创建的,ScheduledThreadPool是用ScheduledThreadPoolExecutor创建的(ScheduledThreadPoolExecutorThreadPoolExecutor的子类)。

  1. FixedThreadPool : 线程池的线程数量达corePoolSize后,即使线程池没有可执行任务时,也不会释放线程;
  2. CachedThreadPool :超过corePoolSize后,会根据线程的空闲时间释放线程,如果有新的任务没有空闲线程再新建;
  3. SingleThreadExecutor :线程池中只初始化一个线程来顺序执行任务。如果该线程异常结束,会重新创建一个新的线程继续执行任务;
  4. ScheduledThreadPool :用于执行定时任务,周期任务;

线程池的状态

线程池内线程的运行状态(runState)和线程数量(workerCount)由一个变量进行管理,高3位保存runState,低29位保存workCount。

运行状态:

  • RUNNING:能接受新提交的任务,并且也能处理阻塞队列中的任务;
  • SHUTDOWN:关闭状态,不再接受新提交的任务,但可以处理阻塞队列中的任务
  • STOP:不接受新任务,也不处理阻塞队列中的任务
  • TIDYING:所有任务都已经终止,workerCount=0
  • TERMINATED:在terminated()方法执行完进入该状态

线程池生命周期

线程池的使用

创建线程池

创建规范

线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,手动设置参数可以明确线程池的运行规则,避免资源耗尽的风险。

newFixedThreadPoolnewSingleThreadExecutor:主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至OOM。
newCachedThreadPoolnewScheduledThreadPool:主要问题是线程数最大数是Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至OOM。

线程池的参数

  1. corePoolSize
  2. maximumPoolSize
  3. keepAliveTime :空闲线程存活时间
  4. unit: 空闲线程存活时间单位
  5. workQueue
    • ArrayBlockQueue 有界队列,初始化的时候必须制定容量
    • LinkedBlokingQueue 指定容量为有界,不指定为无界,默认大小为Integer.MAX_VALUE
    • SynchronousQueue 无界队列,无缓冲,不存数据
    • PriorityBlockingQueue 无界队列,排序规则和PriorityQueue一样,需要实现java.lang.Comparable接口
    • DelayQueue 无界队列,要实现java.util.concurrent.Delayed接口;返回值就是队列元素被释放前的保持时间;会通过其take()方法释放此对象;可应用于定时关闭连接、缓存对象,超时处理等各种场景;
  6. threadFactory 线程工厂,一般情况下使用默认,自定义线程工厂可以进行一些前置处理,跟踪线程状态
  7. handler
    • AbortPolicy 默认,队列满了丢任务抛出异常
    • DiscardPolicy 队列满了丢任务不异常
    • DiscardOldestPolicy 将最早进入队列的任务删掉,之后再尝试加入队列
    • CallerRunsPolicy 如果添加到线程池失败,那么主线程会自己去执行该任务

线程数的选取策略

性质不同的任务可用使用不同规模的线程池分开处理:

  1. CPU密集型: 尽可能少的线程,Ncpu+1
  2. IO密集型: 尽可能多的线程, Ncpu*2,比如数据库连接池
  3. 混合型: CPU密集型的任务与IO密集型任务的执行时间差别较小,拆分为两个线程池;否则没有必要拆分。

工作队列的选取策略

Reference

  • ArrayBlockQueue :基于数组实现,数据更新不频繁,没有并发吞吐量的要求。比如公司人事部门人员离职变更,相关部门进行数据同步等。
  • LinkedBlokingQueue :基于链表实现,对并发吞吐量要求比较高的时候,如并发场景下单的短信通知等场景。
  • SynchronousQueue : 基于双栈双队列实现的,不确定生产者请求数量但希望任务及时被处理掉,可以使用该阻塞队列为每个生产者分配一个消费线程。比如电话转接人工客服。newCachedThreadPool就是用该阻塞队列创建的。
  • PriorityBlockingQueue : 基于数组实现,vip排队购票
  • DelayQueue : 基于优先队列实现,用户下单超时自动取消订单。

拒绝策略的选取策略

Reference

  • AbortPolicy 关键业务时可以使用,在系统不能承受并发量的时候可以通过异常发现;
  • DiscardPolicy 无关紧要的业务,博客网站的阅读量;
  • DiscardOldestPolicy 老消息和新消息重叠了,老消息如果没来得及处理,可以丢弃,直接处理新消息;
  • CallerRunsPolicy 任务执行不允许失败的、但对性能要求不高的、并发量小的场景下使用。

执行任务

执行任务的方法

Executor接口下提供了execute方法执行任务,该执行方法是没有返回值的;ExecutorService接口下提供了submit方法执行任务,该方法是一个泛型方法,返回的是一个Future类。

Runnable & Callable

Runnable Callable
接口类型 普通接口 泛型接口
重写方法 run call
返回结果 没有返回结果,不能抛出异常 有返回结果,能抛出异常

Future & FutureTask

ExecutorService.submit()方法用Future来接收。 Future是一个接口类,FutureTaskFuture的实现类。

Future模式在于客户端发起任务后,在服务器处理任务期间,客户端仍然可以做其他工作,而不需要等待服务器响应才能继续别的任务。

<interface>Future定义的几个方法:

  • cancel(boolean mayInterruptIfRunning) :用来取消任务,如果取消任务成功则返回true,如果取消任务失败则返回false;
  • isCancelled() :表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回 true。
  • isDone() :表示任务是否已经完成,若任务完成,则返回true;
  • get() :用来获取执行结果,这个方法会产生阻塞,阻塞的线程为调用get()方法的线程,会一直等到任务执行完毕返回结果,之后阻塞的主线程才能够往后执行。
  • get(long timeout, TimeUnit unit) :用来获取执行结果,如果在指定时间内,还没获取到结果,就会抛出TimeoutException异常(慎用这个方法,因为有很多坑)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// future 线程池 执行任务
public class Test {
public static void main(String[] args) throws Exception {
MyTask task = new MyTask();
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 2, 10, TimeUnit.SECONDS, new LinkedBlockingDeque<>());
Future<Object> future = threadPoolExecutor.submit(task);
System.out.println("执行其他任务1");
System.out.println("执行其他任务2");
System.out.println("调用get方法");
try {
Object result = future.get(5, TimeUnit.SECONDS); // 线程阻塞
System.out.println(result.toString());
} catch (Exception e) {
System.out.println(e);
}
}

}

class MyTask implements Callable<Object> {
@Override
public Object call() throws Exception {
System.out.println("execute a task.");
return "ok";
}
}
/*
执行其他任务1
执行其他任务2
调用get方法
execute a task.
ok
*/

定期定时任务

ScheduledThreadPoolExecutor (extends ThreadPoolExecutor)

  1. schedule
  2. scheduleAtFixedRate 无论周期内任务实际执行了多久,都以固定时间间隔触发,执行任务
  3. scheduleWithFixedDelay 上一次任务执行完,间隔固定时间,再执行下一个任务
1
2
3
4
5
6
7
8
// 1秒后执行一次性任务:
ses.schedule(new Task("one-time"), 1, TimeUnit.SECONDS);

// 2秒后开始执行定时任务,每3秒执行:
ses.scheduleAtFixedRate(new Task("fixed-rate"), 2, 3, TimeUnit.SECONDS);

// 2秒后开始执行定时任务,以3秒为间隔执行:
ses.scheduleWithFixedDelay(new Task("fixed-delay"), 2, 3, TimeUnit.SECONDS);

关闭线程池

  1. shutdown() 等待当前任务执行完关闭
  2. shutdownNow 立刻停止正在执行的任务,并关闭线程
  3. awaitTermination() 等待指定的时间关闭线程

ThreadPoolExecutor

类继承关系

  1. 顶层接口``Executor`提供了一种思想:将任务提交和任务执行进行解耦。
  2. ExecutorService接口增加了一些能力:
    • 扩充执行任务的能力,补充可以为一个或一批异步任务生成Future的方法
    • 提供了管控线程池的方法,比如停止线程池的运行
  3. AbstractExecutorService将执行流程进行串联,保证下层的实现只需关注一个执行任务的方法即可。
  4. ThreadPoolExecutor实现最复杂的运行部分,一方面维护自身的生命周期,另一方面同时管理线程和任务。

任务的提交

通过submit方法提交的Callable任务会被封装成了一个FutureTask对象。通过Executor.execute方法提交FutureTask到线程池中等待被执行,最终执行的是FutureTask的run方法;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// submit()在ExecutorService中的定义
<T> Future<T> submit(Callable<T> task);

<T> Future<T> submit(Runnable task, T result);

Future<?> submit(Runnable task);

// submit方法在AbstractExecutorService中的实现
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
// 通过submit方法提交的Callable任务会被封装成了一个FutureTask对象。
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}

任务的执行

任务的执行过程

execute –> addWorker –>runworker (getTask)

execute

  1. 首先检测线程池运行状态,如果不是RUNNING,则直接拒绝,线程池要保证在RUNNING的状态下执行任务。
  2. 如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务。
  3. 如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中。
  4. 如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务。
  5. 如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
//workerCountOf获取线程池的当前线程数;小于corePoolSize,执行addWorker创建新线程执行command任务
if (addWorker(command, true))
return;
c = ctl.get();
}
// double check: c, recheck
// 线程池处于RUNNING状态,把提交的任务成功放入阻塞队列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// recheck and if necessary 回滚到入队操作前,即倘若线程池shutdown状态,就remove(command)
//如果线程池没有RUNNING,成功从阻塞队列中删除任务,执行reject方法处理任务
if (! isRunning(recheck) && remove(command))
reject(command);
//线程池处于running状态,但是没有线程,则创建线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 往线程池中创建新的线程失败,则reject任务
else if (!addWorker(command, false))
reject(command);
}

addWorker()

addWorker主要负责创建线程并执行任务。

  1. 通过CAS更新工作线程的数量;
  2. 新建一个工作线程,并获取全局锁(ReentrantLock);
  3. 获取线程池状态,当线程池出于运行态或者工作队列中有未执行完的任务,会释放全局锁并启动线程执行任务。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
private final ReentrantLock mainLock = new ReentrantLock();

private boolean addWorker(Runnable firstTask, boolean core) {
// CAS更新工作线程数量
retry:
for (;;) {
int c = ctl.get();

// 获取当前线程状态
int rs = runStateOf(c);

if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

// 内层循环,worker + 1
for (;;) {
// 线程数量
int wc = workerCountOf(c);
// 如果当前线程数大于线程最大上限CAPACITY return false
// 若core == true,则与corePoolSize 比较,否则与maximumPoolSize ,大于 return false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// worker + 1,成功跳出retry循环
if (compareAndIncrementWorkerCount(c))
break retry;

// CAS add worker 失败,再次读取ctl
c = ctl.get();

// 如果状态不等于之前获取的state,跳出内层循环,继续去外层循环判断
if (runStateOf(c) != rs)
continue retry;
}
}

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {

// 新建线程:Worker
w = new Worker(firstTask);
// 当前线程
final Thread t = w.thread;
if (t != null) {
// 获取主锁:mainLock
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {

// 线程状态
int rs = runStateOf(ctl.get());

// rs < SHUTDOWN ==> 线程处于RUNNING状态
// 或者线程处于SHUTDOWN状态,且firstTask == null(可能是workQueue中仍有未执行完成的任务,创建没有初始任务的worker线程执行)
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {

// 当前线程已经启动,抛出异常
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();

// workers是一个HashSet<Worker>
workers.add(w);

// 设置最大的池大小largestPoolSize,workerAdded设置为true
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
// 释放锁
mainLock.unlock();
}
// 启动线程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {

// 线程启动失败
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

runworker()

runworker是线程池核心代码,用于启动线程后执行任务操作。

  1. 执行任务前,对worker实例进行加锁,保证线程不被其他线程中断(但可以被线程池中断);
  2. 当线程池状态不是中断的时候,执行run方法;
  3. 解锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 先执行firstTask,再从workerQueue中取task(getTask())

while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

getTask()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?

for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}

int wc = workerCountOf(c);

// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : //
workQueue.take(); //
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

Worker类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {

private static final long serialVersionUID = 6138294804551838833L;

// task 的thread
final Thread thread;

// 运行的任务task
Runnable firstTask;

volatile long completedTasks;

Worker(Runnable firstTask) {

// 设置AQS的同步状态private volatile int state,是一个计数器,大于0代表锁已经被获取
setState(-1);
this.firstTask = firstTask;

// 利用ThreadFactory和 Worker这个Runnable创建的线程对象
this.thread = getThreadFactory().newThread(this);
}

// 任务执行
public void run() {
runWorker(this);
}

}

任务的关闭

shutdown()

shutdown方法会将线程池的状态设置为SHUTDOWN,线程池进入这个状态后,就拒绝再接受任务,然后会将剩余的任务全部执行完.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//检查是否可以关闭线程
checkShutdownAccess();
//设置线程池状态
advanceRunState(SHUTDOWN);
//尝试中断worker
interruptIdleWorkers();
//预留方法,留给子类实现
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}

private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}

private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//遍历所有的worker
for (Worker w : workers) {
Thread t = w.thread;
//先尝试调用w.tryLock(),如果获取到锁,就说明worker是空闲的,就可以直接中断它
//注意的是,worker自己本身实现了AQS同步框架,然后实现的类似锁的功能
//它实现的锁是不可重入的,所以如果worker在执行任务的时候,会先进行加锁,这里tryLock()就会返回false
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}

shutdownNow()

shutdownNow先将线程池状态设置为STOP,然后拒绝所有提交的任务。最后中断左右正在运行中的worker,然后清空任务队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
//检测权限
advanceRunState(STOP);
//中断所有的worker
interruptWorkers();
//清空任务队列
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}

private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//遍历所有worker,然后调用中断方法
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}

ConcurrentHashMap

概要

ConcurrentHashMap1.7使用的是分段锁。ConcurrentHashMap1.7在对象中用一个segment数组将Hash表进行分段,每个segment相当于一个Hashtable,然后put的时候把key通过hash映射到对应的segment中,对该segment进行加锁。

ConcurrentHashMap1.8使用的数组+链表+红黑树的方式实现,而加锁则采用CAS和synchronized实现。

Hashtable慢是因为使用了synchronized为put等操作进行加锁,而synchronized加的是对象锁,锁住了整个Hash表,因此效率低。

ConcurrentHashMap1.8

put

  1. 数组为空就初始化;

  2. 根据key计算hash值,映射到数组下标;

  3. 如果当前数组位置为空的话,CAS将新值put;

  4. 如果数组在扩容,就帮助进行数据迁移;

  5. 否则,对该数组位置的node进行加锁(synchronized):

    1. 如果当前node对应的数据结构是链表,遍历链表,遇到相等的key的node就覆盖;否则加到链表尾部;
    2. 如果当前node对应的数据结构是红黑树,调用红黑树插值方法进行put
  6. 最后要进行一个链表是否转红黑树的判断,如果链表长度达到8,就进行转型。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
public V put(K key, V value) {
return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
// 得到 hash 值
int hash = spread(key.hashCode());
// 用于记录相应链表的长度
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 如果数组"空",进行数组初始化
if (tab == null || (n = tab.length) == 0)
// 初始化数组,后面会详细介绍
tab = initTable();

// 找该 hash 值对应的数组下标,得到第一个节点 f
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 如果数组该位置为空,
// 用一次 CAS 操作将这个新值放入其中即可,这个 put 操作差不多就结束了,可以拉到最后面了
// 如果 CAS 失败,那就是有并发操作,进到下一个循环就好了
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// hash 居然可以等于 MOVED,这个需要到后面才能看明白,不过从名字上也能猜到,肯定是因为在扩容
else if ((fh = f.hash) == MOVED)
// 帮助数据迁移,这个等到看完数据迁移部分的介绍后,再理解这个就很简单了
tab = helpTransfer(tab, f);

else { // 到这里就是说,f 是该位置的头结点,而且不为空

V oldVal = null;
// 获取数组该位置的头结点的监视器锁
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) { // 头结点的 hash 值大于 0,说明是链表
// 用于累加,记录链表的长度
binCount = 1;
// 遍历链表
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 如果发现了"相等"的 key,判断是否要进行值覆盖,然后也就可以 break 了
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
// 到了链表的最末端,将这个新值放到链表的最后面
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) { // 红黑树
Node<K,V> p;
binCount = 2;
// 调用红黑树的插值方法插入新节点
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}

if (binCount != 0) {
// 判断是否要将链表转换为红黑树,临界值和 HashMap 一样,也是 8
if (binCount >= TREEIFY_THRESHOLD)
// 这个方法和 HashMap 中稍微有一点点不同,那就是它不是一定会进行红黑树转换,
// 如果当前数组的长度小于 64,那么会选择进行数组扩容,而不是转换为红黑树
// 具体源码我们就不看了,扩容部分后面说
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
//
addCount(1L, binCount);
return null;
}

扩容机制

参考

AQS

概要

AQS是队列同步器,是一种用于设计锁或同步器的组件。

AQS定义了两种资源的共享方式,独占(ReentrantLock)和共享(ReadWriteLock)。独占又有公平和非公平两种策略。

AQS使用模板方法设计模式,提供了5个模板方法:

  • isHeldExclusively() 该线程是否正在独占资源。只有用到condition才需要去实现它。
  • tryAcquire(int) 线程以独占方式尝试获取资源,成功则返回true,失败则返回false。
  • tryRelease(int) 线程以独占方式尝试释放资源,成功则返回true,失败则返回false。
  • tryAcquireShared(int) 线程以共享方式尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
  • tryReleaseShared(int) 线程以共享方式尝试释放资源,成功则返回true,失败则返回false。

AQS有两种核心方法。

AQS通过调用acquire方法以独占模式获取资源。

acquire方法做了这些事:

  1. 通过调用tryAcquire方法,调用此方法的线程会试图在独占模式下获取对象状态;
  2. 通过调用addWaiter方法,将线程封装成一个结点(Node类),放到Sync queue里;
  3. 调用acquireQueued方法,让Sync queue中的结点不断尝试获取资源,若成功,则返回true,否则,返回false。

AQS通过release方法以独占模式释放资源。

release方法中做了这些事:

  1. 通过tryRelease方法,去释放锁;
  2. 如果释放成功了,调unparkSuccessor方法找到队列第一个非CANCELLED的线程,将其unpark(唤醒线程);

简介

AQS(抽象队列同步器)

抽象队列同步器(AbstractQueuedSynchronizer)是一种构建锁和同步器的框架,如:ReentrantLock、Semaphore、ReentrantReadWriteLock、SynchronousQueue。

AQS的核心思想是:如果被请求的共享资源空闲,线程设为有效,共享资源设为锁定状态;如果被请求的共享资源被占用,需要一套线程阻塞等待以及被唤醒时锁分配的机制。

AQS底层原理

同步状态变量

AQS使用一个int成员变量state来表示同步状态,通过CAS操作该同步状态(state)进行原子操作实现对其值的修改。

同步队列

AQS底层的数据结构是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系)。

CLH(Craig,Landin,and Hagersten)队列是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系)。AQS是将每条请求共享资源的线程封装成一个CLH锁队列的一个结点(Node)来实现锁的分配。

具体实现为:用双向链表实现的同步队列和用单向链表实现的条件队列。同步队列是必须的,条件队列不是必须的,但当存在多个条件时,条件队列可以为多个。

AQS数据结构

AQS共享方式

  • 独占(Exclusive):只有一个线程能执行,如ReentrantLock。独占锁又分为公平锁和非公平锁。
    • 公平锁:线程按照在队列中的顺序,先到先得锁;
    • 非公平锁:无视队列顺序,谁抢到就是谁的。
  • 共享(share):多个线程可同时执行,如ReadWriteLock、Semaphore

在实现上,公平比非公平多了一个当前线程是否在队首的判断。

AQS通过调用acquire方法以独占模式获取资源。

acquire方法做了这些事:

  1. 通过调用tryAcquire方法,调用此方法的线程会试图在独占模式下获取对象状态;(需要开发者重写)
  2. 通过调用addWaiter方法,将线程封装成一个结点(Node类),放到Sync queue里;
  3. 调用acquireQueued方法,让Sync queue中的结点不断尝试获取资源,若成功,则返回true,否则,返回false。

AQS通过release方法以独占模式释放资源。

release方法中做了这些事:

  1. 通过tryRelease方法,去释放锁;
  2. 如果释放成功了,调unparkSuccessor方法找到队列第一个非CANCELLED的线程,将其unpark(唤醒线程);

unparkSuccessor在从后往前遍历node,并发环境下addWaiter方法node入队并非原子操作,源码中先让node->pre = tail,再让tail->next = node;如此第二步没走的话,从前往后遍历是遍历不了所有node的。

AQS通过调用acquireShared方法以共享模式获取资源。

AQS通过调用releaseShared方法以共享模式释放资源。

AQS需要子类实现的方法

方法名 方法描述
tryAcquire 以独占模式尝试获取锁,独占模式下调用acquire,尝试去设置state的值,如果设置成功则返回,如果设置失败则将当前线程加入到等待队列,直到其他线程唤醒
tryRelease 尝试独占模式下释放状态
tryAcquireShared 尝试在共享模式获得锁,共享模式下调用acquire,尝试去设置state的值,如果设置成功则返回,如果设置失败则将当前线程加入到等待队列,直到其他线程唤醒
tryReleaseShared 尝试共享模式下释放状态
isHeldExclusively 是否是独占模式,表示是否被当前线程占用

Node节点信息

字段名 类型 默认值 描述
SHARED Node new Node() 一个标识,指示节点使用共享模式等待
EXCLUSIVE Nodel Null 一个标识,指示节点使用独占模式等待
CANCELLED int 1 节点因超时或被中断而取消时设置状态为取消状态
SIGNAL int -1 当前节点的后节点被park,当前节点释放时,必须调用unpark通知后面节点,当后面节点竞争时,会将前面节点更新为SIGNAL
CONDITION int -2 标识当前节点已经处于等待中,通过条件进行等待的状态
PROPAGATE int -3 共享模式下释放节点时设置的状态,被标记为当前状态是表示无限传播下去
0 int 不属于上面的任何一种状态
waitStatus int 0 等待状态,默认初始化为0,表示正常同步等待,
pre Node Null 队列中上一个节点
next Node Null 队列中下一个节点
thread Thread Null 当前Node操作的线程
nextWaiter Node Null 指向下一个处于阻塞的节点

AQS底层设计模式

AQS底层使用了模板方法模式。

1
2
3
4
5
isHeldExclusively()//该线程是否正在独占资源。只有用到condition才需要去实现它。
tryAcquire(int)//独占方式。尝试获取资源,成功则返回true,失败则返回false。
tryRelease(int)//独占方式。尝试释放资源,成功则返回true,失败则返回false。
tryAcquireShared(int)//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
tryReleaseShared(int)//共享方式。尝试释放资源,成功则返回true,失败则返回false。

源码分析

同步状态变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private volatile int state;//共享变量,使用volatile修饰保证线程可见性

//返回同步状态的当前值
protected final int getState() {
return state;
}
// 设置同步状态的值。
protected final void setState(int newState) {
state = newState;
}
//原子地(CAS操作)将同步状态值设置为给定值update如果当前同步状态的值等于expect(期望值)
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

类的继承关系

1
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {

// 版本序列号
private static final long serialVersionUID = 3737899427754241961L;
// 构造方法
protected AbstractOwnableSynchronizer() { }
// 独占模式下的线程
private transient Thread exclusiveOwnerThread;

// 设置独占线程
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}

// 获取独占线程
protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
}

transient:一个对象只要实现了Serilizable接口,这个对象就可以被序列化。当一个类的某些属性需要序列化,而其他属性不需要被序列化,比如一些敏感信息(如密码),为了安全起见,不希望在网络操作中被传输,这些信息对应的变量就可以加上transient关键字。

Node类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
static final class Node {
// 模式,分为共享与独占
// 共享模式
static final Node SHARED = new Node();
// 独占模式
static final Node EXCLUSIVE = null;
// 结点状态
// CANCELLED,值为1,表示当前的线程被取消
// SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark
// CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中
// PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行
// 值为0,表示当前节点在sync队列中,等待着获取锁
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;

// 结点状态
volatile int waitStatus;
// 前驱结点
volatile Node prev;
// 后继结点
volatile Node next;
// 结点所对应的线程
volatile Thread thread;
// 下一个等待者
Node nextWaiter;

// 结点是否在共享模式下等待
final boolean isShared() {
return nextWaiter == SHARED;
}

// 获取前驱结点,若前驱结点为空,抛出异常
final Node predecessor() throws NullPointerException {
// 保存前驱结点
Node p = prev;
if (p == null) // 前驱结点为空,抛出异常
throw new NullPointerException();
else // 前驱结点不为空,返回
return p;
}

// 无参构造方法
Node() { // Used to establish initial head or SHARED marker
}

// 构造方法
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}

// 构造方法
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}

acquire()

该方法以独占模式获取(资源),忽略中断,即线程在aquire过程中,中断此线程是无效的。

1
2
3
4
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

首先,线程获取对象状态,看是否可以获得该对象的锁状态。如果可以获得,直接获得,返回。

如果不能获得,将线程封装成一个结点放到同步队列,不断尝试获取资源。

release()

以独占模式释放对象。

tryRelease的默认实现是抛出异常,需要具体的子类实现,如果tryRelease成功,那么如果头结点不为空并且头结点的状态不为0,则释放头结点的后继结点。

1
2
3
4
5
6
7
8
9
10
public final boolean release(int arg) {
if (tryRelease(arg)) { // 释放成功
// 保存头结点
Node h = head;
if (h != null && h.waitStatus != 0) // 头结点不为空并且头结点状态不为0
unparkSuccessor(h); //释放头结点的后继结点
return true;
}
return false;
}

参考资料

  1. https://tech.meituan.com/2019/12/05/aqs-theory-and-apply.html

ReentrantLock

概要

ReentrantLock是一种以独占方式获取资源的可重入锁。

它实现了Lock接口(实现Lock接口的lock方法就是调用AQS的acquire方法),并在内部定义了一个sync的静态内部类,sync继承了AQS,并用公平和非公平两种方式重写了tryAcquire方法。

具体实现为:首先,获取当前线程状态,当state=0时,表示可以获取资源,此时公平策略会首先判断当前线程是否位于队首(非公平则没有这个条件判断)然后会CAS更新state,并设置线程独占;如果state不为0,但独占线程是当前线程,表明在重入,此时会CAS更新state增加重入次数。

ReentrantLock和synchronized的区别:

ReentrantLock synchronized
实现机制 依赖AQS monitor机制
锁类型 公平/非公平(默认) 非公平
释放锁的方式 unlock()显示调用(实现了Lock接口) 自动释放监视器锁
灵活性 支持中断、超时、尝试获取锁 不灵活
可重入性 可重入 可重入
条件队列 可以关联多个条件队列 只能关联一个条件队列

ReentrantLock UML

ReentrantLock-UML

  1. ReentrantLock实现了Lock接口;ReentrantLock 通过调用lock方法获得锁;lock方法里用的是AQS的acquire方法;AQS的acquire方法里调用了tryAquire方法;ReentrantLock需要重写tryAquire方法;
  2. ReentrantLock包含了Sync对象;Sync继承于AQS;在Sync中实现了FairSyncNonFairSync
1
2
3
final void lock() {
acquire(1);
}

对”1”的解释:它是设置“锁的状态”的参数也可以称为重入数。对于“独占锁”而言,锁处于可获取状态时,它的状态值是0;锁被线程初次获取到了,它的状态值就变成了1。由于ReentrantLock(公平锁/非公平锁)是可重入锁,所以“独占锁”可以被单个线程多此获取,每获取1次就将锁的状态+1。也就是说,初次获取锁时,通过acquire(1)将锁的状态值设为1;再次获取锁时,将锁的状态值设为2;依次类推…这就是为什么获取锁时,传入的参数是1的原因了。

NonFairSync

1
2
3
4
5
6
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
@ReservedStackAccess
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

FairSync

公平锁就比非公平锁多了一步判断:看当前线程是否位于同步队列的队头,如果在队头,说明没有等待更久的线程,符合公平策略,可以进行独占;否则反之。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// 公平锁
static final class FairSync extends Sync {
// 版本序列化
private static final long serialVersionUID = -3000897897090466540L;

/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
// 尝试公平获取锁
protected final boolean tryAcquire(int acquires) {
// 获取当前线程
final Thread current = Thread.currentThread();
// 获取状态
int c = getState();
if (c == 0) { // 状态为0
if (!hasQueuedPredecessors() && // 公平锁比非公平锁就多了这一步判断!
compareAndSetState(0, acquires)) { // 不存在已经等待更久的线程并且比较并且设置状态成功
// 设置当前线程独占
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) { // 状态不为0,即资源已经被线程占据
// 下一个状态
int nextc = c + acquires;
if (nextc < 0) // 超过了int的表示范围
throw new Error("Maximum lock count exceeded");
// 设置状态
setState(nextc);
return true;
}
return false;
}
}

hasQueuedPredecessors方法:

1
2
3
4
5
6
7
8
//判断"当前线程"是不是在CLH队列的队首,来返回AQS中是不是有比“当前线程”等待更久的线程
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}

参考资料

  1. https://www.cnblogs.com/dwlsxj/p/reentrantlock-principle-nonfairsync.html
  2. https://yuanrengu.com/2020/7691e770
  3. https://www.pdai.tech/md/java/thread/java-thread-x-lock-ReentrantLock.html

ThreadLocal

概要

ThreadLocal是一个类,和一般的线程同步机制不同,ThreadLocal为每个线程创建一个单独的副本,每个线程都独立地改变自己的变量副本,副本之间互不影响。

常见的使用场景是session管理和数据库链接管理。

ThreadLocal实现线程隔离的思路是,通过threadLocalMap缓存线程,通过以线程为key,获取线程对应的threadLocals。

ThreadLocalMap是ThreadLocal的静态内部类,通过Entry数组存键值对,该Entry继承了弱引用(WeakReference<ThreadLocal<?>>)类,通过线性探测法解决哈希冲突。

在使用线程池时,由于线程池可能会不销毁线程,导致对ThreadLocal强引用,从而使ThreadLocalMap的弱引用失效,因此需要主动通过remove方法防止内存泄漏。

简介

本地存储(ThreadLocal)是一个为每个线程创建单独的变量副本的类, 避免因多线程操作共享变量而导致的数据不一致的情况。

其特点在于,共享变量在不同线程间不存在依赖关系。

ThreadLocal比较常见的应用场景是,session管理和数据库链接管理。

对于ThreadLocal的理解(为什么要用ThreadLocal)

对于下面的这个数据库管理类,在多线程情况下,它可能会存在线程同步问题:不同的线程可能会多次调用openConnection 建立连接;t1调用connect进行数据库操作,t2调用closeConnection关闭链接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class ConnectionManager {
private static Connection connect = null;

public static Connection openConnection() {
if (connect == null) {
connect = DriverManager.getConnection();
}
return connect;
}

public static void closeConnection() {
if (connect != null)
connect.close();
}
}

虽然可以通过synchronzied方法或者ReentrantLock进行线程间的同步,但是发现不同线程间的connect不存在依赖关系。而ThreadLocal可以为每个线程创建单独的变量副本,避免因多线程操作共享变量而导致的数据不一致的情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;

public class ConnectionManager {

private static final ThreadLocal<Connection> dbConnectionLocal = new ThreadLocal<Connection>() {
@Override
protected Connection initialValue() {
try {
return DriverManager.getConnection("", "", "");
} catch (SQLException e) {
e.printStackTrace();
}
return null;
}
};

public Connection getConnection() {
return dbConnectionLocal.get();
}
}

如果我们希望通过某个类将状态(例如用户ID、事务ID)与线程关联起来,那么通常在这个类中定义private static类型的ThreadLocal实例。

ThreadLocal原理

ThreadLocal通过ThreadLocalMap类对当前线程及其threadlocals(线程局部变量,即共享变量)进行缓存,实现线程隔离。
<thread,threadlocals>

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
return setInitialValue();
}

private T setInitialValue() {
T value = initialValue();
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
return value;
}

ThreadLocalMap

ThreadLocalMap 本质就是一个Map,区别在于:

  1. ThreadLocalMap 属于ThreadLocal的静态内部类
  2. 用Entry数组来存储Key, Value;Entry继承了WeakReference<ThreadLocal<?>>,降低对内存的占用;哈希冲突的时候采用顺移落位。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private void set(ThreadLocal<?> key, Object value) {

// We don't use a fast path as with get() because it is at
// least as common to use set() to create new entries as
// it is to replace existing ones, in which case, a fast
// path would fail more often than not.

Entry[] tab = table;
int len = tab.length;
int i = key.threadLocalHashCode & (len-1);

for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
ThreadLocal<?> k = e.get();

if (k == key) {
e.value = value;
return;
}

if (k == null) {
replaceStaleEntry(key, value, i);
return;
}
}

tab[i] = new Entry(key, value);
int sz = ++size;
if (!cleanSomeSlots(i, sz) && sz >= threshold)
rehash();
}

ThreadLocal的内存泄漏问题

内存泄漏的原因

使用线程池操作ThreadLocal对象的时候,因为线程池里面有不会销毁的线程,存在着对ThreadLocal的强引用,这就导致被final static修饰的ThreadLocalMap的弱引用也不会释放,因此会造成内存泄漏问题。

解决方法

通过调用ThreadLocal类提供的remove方法,该方法通过调用ThreadLocalMap的remove方法,根据key把对应Entry删除,从而防止内存泄漏。

原子类

简介

如synchronized、Lock等都是采取阻塞同步策略的悲观锁,而原子工具类则是采用非阻塞同步策略实现的乐观锁。

原子变量类比锁的粒度更细,更轻量。原子变量将发生竞争的范围缩小到单个变量上。

原子类在内部使用 CAS 指令(基于硬件的支持)来实现同步。这些指令通常比锁更快。

CAS

CAS 全称 Compare And Swap(比较与交换),CAS操作需要输入两个数值,一个旧值(期望操作前的值)和一个新值,在操作期间先比较下在旧值有没有发生变化,如果没有发生变化,才交换成新值,发生了变化则不交换。

是一条CPU的原子指令,其作用是让CPU先进行比较两个值是否相等,然后原子地更新某个位置的值,经过调查发现,其实现方式是基于硬件平台的汇编指令,就是说CAS是靠硬件实现的,JVM只是封装了汇编调用,那些AtomicInteger类便是使用了这些封装后的接口。

在不使用锁(没有线程被阻塞)的情况下实现多线程之间的变量同步。J.U.C包中的原子类就是通过 CAS 来实现了乐观锁。

CAS存在的问题:

  1. ABA
  2. 自旋时间长开销大
  3. 只能保证一个共享变量的原子操作

Unsafe

https://tech.meituan.com/2019/02/14/talk-about-java-magic-class-unsafe.html

原子类API使用