本文部分内容节选自《Java并发编程的艺术》

🚀 基础(上) → 🚀 基础(中) → 🚀基础(下) → 🤩集合(上) → 🤩集合(下) → 🤗JVM专题1 → 🤗JVM专题2 → 🤗JVM专题3 → 🤗JVM专题4 →😋JUC专题1 → 😋JUC专题2 → 😋JUC专题3

Lock接口

锁是用来控制多个线程访问共享资源的方式, 一般来说, 一个锁能够防止多个线程同时访问共享资源 (但是有些锁可以允许多个线程并发的访问共享资源, 比如读写锁)

使用 synchronized 关键字将会隐式地获得锁, 但是它将锁的获取和释放固化了, 也就是先获取再释放. 当然这种方式简化了同步的管理, 可是扩展性没有显式地获取和释放来的好

队列同步器

队列同步器 是用来构建锁或者其他同步组件的基础框架, 使用一个 int 成员变量表示同步状态, 通过内置的FIFO队列来完成资源获取线程的排队工作

同步器的主要使用方式是继承, 子类通过继承同步器并实现它的抽象方法来管理同步状态, 在抽象方法的实现过程中免不了要对同步状态进行过呢更改, 这时候就需要使用同步器提供的 3 个方法(getState() , setState(int state) , compareAndSetState(int expect, int update)) 来进行操作, 因为它们能够保证状态的改变是安全的

队列同步器的接口

重写同步器指定的方法时, 需要使用同步器提供的如下 3 个方法来访问或修改同步状态

  • getState() : 获取同步状态
  • setState() : 设置当前同步状态
  • compareAndSetState() : 使用CAS设置当前状态, 该方法能够保证状态设置的原子性

同步器可重写的方法如下

方法名称描述
protected boolean tryAcquire(int arg)独占式获取同步状态, 实现该方法需要查询当前状态并判断同步状态是否符合预期, 然后再进行CAS设置同步状态
protected boolean tryRelease(int arg)独占式释放同步状态, 等待获取同步状态的线程将会有机会获取同步状态
protected int tryAcquireShared(int arg)共享式获取同步状态, 返回大于等于0 的值, 表示获取成功, 反之, 获取失败
protected boolean tryReleaseShared(int arg)共享式释放同步状态
protected boolean isHeldExclusively()当前同步器是否在独占模式下被线程占用, 一般该方法表示是否被当前线程所独占

实现自定义同步组件时, 将会调用同步锁的模板方法

方法名称描述
void acquire(int arg)独占式获取同步状态, 如果当前线程获取同步状态成功, 则由该方法返回, 否则, 将会进入同步队列等待, 该方法将会调用重写的 tryAcquire(int arg) 方法
void acquireInterruptibly(int arg)acquire(int arg) 相同, 但是该方法响应中断, 当前线程未获取到同步状态而进入同步队列中, 如果当前线程被中断, 则该方法会抛出 InterruptedException 并返回
boolean tryAcquireNanos(int arg, long nanos)acquireInterruptibly(int arg) 基础上增加了超时限制, 如果在超时限制内没有获取到同步状态, 则返回 false, 否则返回 true
void acquireShared(int arg)共享式的获取同步状态, 如果当前线程未获取到同步状态, 将会进入同步队列等待, 与独占式获取的主要区别是同一时刻可以有多个线程获取到同步状态
void acquireSharedInterruptibly(int arg)acquireShared(int arg) 一样, 该方法响应中断
boolean tryAcquireSharedNanos(int arg, long nanos)acquireSharedInterruptibly(int arg) 的基础上增加了超时限制
boolean release(int arg)独占式的释放同步状态, 该方法会在释放同步状态之后, 将同步队列中第一个节点包含的线程唤醒
boolean releaseShared(int arg)共享式的释放同步状态
Collection<Thread> getQueuedThreads()获取等待在同步队列上的线程集合

队列同步器的实现分析

同步队列

同步器依赖于内部的同步队列来完成同步状态的管理, 当前线程获取同步状态失败时, 同步器会将当前线程及其等待状态等信息构造成一个节点并将其加入到同步队列, 同时会阻塞当前线程, 当同步状态释放时, 会把首节点中的线程唤醒, 使其再次尝试获取同步状态

节点是构成同步队列的基础, 同步器具有头节点(head)和尾节点(tail), 没有成功获取同步状态的线程将会成为节点加入该队列的尾部, 同步队列的基本结构如图所示

同步器将节点加入到同步队列的过程如图所示

同步队列遵循FIFO, 首节点是同步状态获取成功的节点, 首节点的线程在释放同步状态时, 会唤醒后继节点, 而后继节点将会在获取同步状态成功时将自己设置为首节点

设置首节点是通过获取同步状态成功的线程来完成的, 由于只有一个线程能够成功获取到同步状态, 因此设置头节点的方法不需要用CAS来保证, 它只需要将首节点设置成为原首节点的后继节点并断开原首节点的next引用即可

独占式同步状态获取与释放

通过调用同步器的 acquire(int arg) 方法可以获取同步状态, 该方法对中断不敏感, 也就是由于线程获取同步状态失败后进入同步队列中, 后续对线程进行中断操作时, 线程不会从同步队列中移出

// 同步器的acquire方法
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {
selfInterrupt();
}
}
// 同步器的addWaiter和enq方法
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}

private Ndoe enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
if (compareAndSetHead(new Node())) {
tail = head;
}
} else {
node.prev = t;
}
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}

节点进入同步队列之后, 就进入了自旋, 每个节点都在自省地观察, 当条件满足, 获取到了同步状态, 就从自旋状态中退出, 否则依旧留在自旋过程中

// 同步器的acquireQueued方法
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
interrupted = true;
}
}
} finally {
if (failed) {
cancelAcquire(node);
}
}
}

acquireQueued(final Node node, int arg) 方法中, 当前线程在"死循环"中尝试获取同步状态, 而只有头节点才能尝试获取同步状态, 为什么?

  1. 头节点是成功获取到同步状态的节点, 而头节点的线程释放了同步状态之后, 将会唤醒后续节点, 后续节点的线程被唤醒之后需要检查自己的前驱节点是不是头节点
  2. 维护同步队列的FIFO原则, 该方法中, 节点自旋获取同步状态的行为如图所示

节点与节点之间在循环检查的过程中基本不相互通信, 而是简单的判断自己的前驱节点是否为头节点, 这样就使得节点的释放规则符合FIFO, 并且也便于对过早通知的处理(过早通知是指前驱节点不是头节点的节点由于中断而被唤醒)

独占式同步状态获取流程如图所示

当前线程获取同步状态并执行了相应逻辑之后, 就需要释放同步状态, 使得后续节点能够继续获取同步状态, 通过调用同步器的 release(int arg) 方法可以释放同步状态, 该方法在释放了同步状态之后, 会唤醒其后续节点(进而使后续节点重新尝试获取同步状态)

// 同步器的release方法
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
}
}

总结: 在获取同步状态时, 同步器维护一个同步队列, 获取同步状态失败的线程都会被加入到队列中并进行自旋; 移出队列的前提是前驱节点为头节点且能够获取到同步状态. 在释放同步状态时, 同步器调用 tryRelease(int arg) 方法释放同步状态, 然后唤醒头节点的后续节点

共享式同步状态获取与释放

共享式获取与独占式获取最大的区别在于同一时刻是否能有多个线程同时获取到同步状态.

通过调用同步器的 acquireShared(int arg) 方法可以共享式地获取到同步状态

// 同步器的acquiredShared和doAcquireShared方法
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0) {
doAcquireShared(arg);
}
}

private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = NULL;
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed) {
cancelAcquire(node);
}
}
}

共享式获取也需要释放同步状态, 通过调用 releaseShared(int arg) 方法可以释放同步状态

public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

独占式超时获取同步状态

通过调用同步器的 doAcquireNanos(int arg, long nanoTimeout) 方法可以超时获取同步状态, 即在指定的时间段内获取同步状态, 如果获取到同步状态则返回 true, 否则, 返回 false

private boolean doAcquireNanos(int arg, long nanoTimeout) throws InterruptedException {
long lastTime = System.nanoTime();
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
failed = false;
return true;
}
if (nanosTimeout <= 0) {
return false;
}
if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
long now = System.nanoTime();
nanosTimeout -= now - lastTime;
lastTime = out;
if (Thread.interrupted()) {
throw new InterruptedException();
}
}
} finally {
if (failed) {
cancelAcquire(node);
}
}
}

独占式超时获取同步状态的过程如图所示

重入锁

重入锁 ReentrantLock, 顾名思义就是支持重进入的锁,它表示该锁能够支持一个线程对资源的重复加锁. 除此之外还支持获取锁时的公平和非公平性选择

ReentrantLock虽然没像synchronized关键字一样支持隐式的重进入, 但是在调用 lock() 方法时, 已经获取到锁的进程, 能够再次调用 lock() 方法获取锁而不被阻塞

实现重进入

重进入是指任意线程在获得到锁之后能够再次获取到锁而不会被阻塞

ReentrantLock通过组合自定义同步器来实现锁的获取和释放. 以非公平性实现为例, 获取同步状态的代码如下所示

final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAnsSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) {
throw new Error("Maximum lock count exceeded");
}
setState(nextc);
return true;
}
return false;
}

成功获取锁的线程再次获取锁, 只是增加了同步状态值, 这也就要求 ReentrantLock在释放同步状态时减少同步状态值, 该方法的代码如下所示

protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread()) {
throw new IllegalMonitorStateException();
}
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

公平与非公平获得锁的区别

如果一个锁是公平的, 那么锁的获取顺序就应该符合请求的绝对时间顺序

公平锁的获取同步状态代码如下

protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) {
throw new Error("Maximum lock count exceeded");
}
setState(nextc);
return true;
}
return false;
}

非公平锁可能导致线程"饥饿", 但是极少的线程切换保证了更大的吞吐量

读写锁

读写锁在同一时刻允许多个读线程访问, 但是在写线程访问时, 其他的读线程和写线程都会被阻塞. 读写锁维护了一个读锁和一个写锁, 通过分离读锁和写锁, 使得并发性相比一般的排他锁有很大的提升

一般情况下读写锁的性能比排他锁要好, 这是因为大多数场景是读大于写的. 在读大于写的情况下, 读写锁能够提供比排他锁更好的并发性和吞吐量. Java实现读写锁是 ReentrantReadWriteLock, 它的特性如下

特性说明
公平性选择支持非公平(默认)与公平的锁获取方式, 吞吐量非公平优于公平
重进入该锁支持重进入, 以读写线程为例: 读线程获取读锁后能够再次获取读锁, 写线程获取写锁之后能够再次获得写锁, 也能获得读锁
锁降级遵循获取写锁, 获取读锁再释放写锁的次序, 写锁能够降级为读锁

读写锁的接口

ReadWriteLock仅定义了获取读锁与写锁的两个方法, 即 readLock() 方法和 writeLock() 方法, 而其实现 ReentrantReadWriteLock , 除了接口方法外还提供了一些便于外界监控其内部工作状态的方法

方法名称描述
int getReadLockCount()返回当前读锁被获取的次数. 该次数不等于获取读锁的线程数
int getReadHoldCount()返回当前线程获取读锁的次数
boolean isWriteLocked()判断写锁是否被获取
int getWriteHoldCount()返回当前写锁被获取的次数

读写锁的实现分析

读写状态的设计

读写锁依赖于自定义同步器来实现同步功能, 而读写状态就是其同步器的同步状态.

读写锁的自定义同步器需要在同步状态(一个整型变量)上维护多个读线程和一个写线程的状态, 使得该状态的设计成为读写锁实现的关键

如果在一个整型变量上维护多种状态, 就一定需要 “按位切割使用” 这个变量, 读写锁将变量切分成了两个部分, 高16位表示读, 低16位表示写

假设当前同步状态为S, 当S不等于0时, 当写状态 (S & 0x0000FFFF) 等于 0 时, 则读状态 (S >>> 16) 大于 0, 即读锁已被获取

写锁的获取和释放

写锁是一个支持重进入的排它锁. 如果当前线程已经获取到了写锁, 则增加写状态. 如果当前线程在获取写锁时, 读锁已经被获取或者该线程不是已经获取到写锁的线程, 那么该线程进入等待状态

获取写锁的代码如下

protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
if (w == 0 || current != getExclusiveOwnerThread()) {
return false;
}
if (w + exclusiveCount(acquires) > MAX_COUNT) {
throw new Error("Maximum lock count exceeded");
}
setState(c + acquires);
return true;
}
if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) {
return false;
}
setExclusiveOwnerThread(current);
return true;
}

写锁的释放和ReentrantLock的释放过程基本类似, 每次释放都减少写状态, 当写状态为 0 时表示写锁已被释放, 从而等待的读写线程可以继续访问写锁, 同时前次写线程的修改对后续线程是可见的

读锁的获取和释放

读锁是一个支持重进入的共享锁, 它能够被多个线程同时获取, 在没有其他写线程访问时, 读锁总能成功获取到. 如果当前线程已经获取到了读锁, 则增加读状态. 如果当前线程在试图获取读锁时, 写锁已经被其他线程获取, 则该线程进入等待状态

获取读锁的代码如下所示

protected final int tryAcquireShared(int unused) {
for (;;) {
int c = getState();
int nextc = c + (1 << 16);
if (nextc < c) {
throw new Error("Maximum lock count exceeded");
}
if (exclusiveCount(c) != 0 && owner != Thread.currentThread()) {
return -1;
}
if (compareAndSetState(c, nextc)) {
return 1;
}
}
}

锁降级

锁降级是指写锁降级为读锁. 具体来讲, 锁降级是指把持住当前拥有的写锁, 然后获取读锁, 最后在释放写锁的过程

锁降级中是否有必要获取读锁? 答案是必要的, 目的是为了数据的可见性. 如果一个线程不获取读锁而直接释放了写锁, 此时若另外一个线程获取到了写锁并修改了数据, 当前线程就无法感知到数据另一个线程的数据更新. 假设当前线程获取了读锁, 遵循锁降级的步骤, 另一个试图获取写锁的线程就会被阻塞, 直到当前线程使用完数据并释放掉读锁之后, 另一个线程才能获取到写锁进行更新