一个用来实现同步锁以及其他涉及到同步功能的核心组件
AQS 全称为 AbstractQueuedSynchronizer,位于 java.util.concurrent.locks 包下,作者是著名的并发包大神 Doug Lea(java.util.concurrent 下的类基本都是他写的),AQS 提供了一个 FIFO 队列,用来构建锁和同步器。
AQS 的资源共享方式
- 独占锁(Exclusive),每次只能有一个线程持有锁,ReentrantLock
- 共享锁(Share),允许多个线程同时获取锁,并发访问共享资源,ReentrantReadWriteLock
AQS 的原理
AQS 核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制 AQS 是用 CLH 队列锁实现的,即将暂时获取不到锁的线程加入到队列中。
CLH(Craig,Landin,and Hagersten)队列是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系),AQS 是将每条请求共享资源的线程封装成一个 CLH 锁队列的一个 Node 来实现锁的分配。
AQS 使用一个 int 变量 state 来表示同步状态,通过队列来完成获取资源线程的排队工作,AQS 使用 CAS 对该同步状态进行原子操作实现对其值的修改。
/**
* The synchronization state.
*/
private volatile int state;
与 state 相关的方法:
/**
* Returns the current value of synchronization state.
* This operation has memory semantics of a {@code volatile} read.
* @return current state value
*/
protected final int getState() {
return state;
}
/**
* Sets the value of synchronization state.
* This operation has memory semantics of a {@code volatile} write.
* @param newState the new state value
*/
protected final void setState(int newState) {
state = newState;
}
/**
* Atomically sets synchronization state to the given updated
* value if the current state value equals the expected value.
* This operation has memory semantics of a {@code volatile} read
* and write.
*
* @param expect the expected value
* @param update the new value
* @return {@code true} if successful. False return indicates that the actual
* value was not equal to the expected value.
*/
protected final boolean compareAndSetState(int expect, int update) {
return STATE.compareAndSet(this, expect, update);
}
独占锁(Exclusive)和共享锁(Share)
-
Exclusive
以 ReentrantLock 为例,只有一个线程能执行,又可分为公平锁和非公平锁,非公平锁会有更好的性能。
公平锁:按照线程在队列中的排队顺序,先到者先拿到锁
非公平锁:当线程要获取锁时,先通过两次 CAS 操作去抢锁,如果没抢到,当前线程再加入到队列中等待唤醒
ReentrantLock 默认采用非公平锁,通过构造函数传入 boolean 来决定是否用公平锁。
/** Synchronizer providing all implementation mechanics */ private final Sync sync; /** * Creates an instance of {@code ReentrantLock}. * This is equivalent to using {@code ReentrantLock(false)}. */ public ReentrantLock() { sync = new NonfairSync(); } /** * Creates an instance of {@code ReentrantLock} with the * given fairness policy. * * @param fair {@code true} if this lock should use a fair ordering policy */ public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
ReentrantLock 中公平锁的 lock 方法:
static final class FairSync extends Sync { final void lock() { acquire(1); } // AbstractQueuedSynchronizer.acquire(int arg) public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 1. 和非公平锁相比,这里多了一个判断:是否有线程在等待 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } }
ReentrantLock 中非公平锁的 lock 方法:
static final class NonfairSync extends Sync { final void lock() { // 2. 和公平锁相比,这里会直接先进行一次CAS,成功就返回了 if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } // AbstractQueuedSynchronizer.acquire(int arg) public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } } /** * Performs non-fair tryLock. tryAcquire is implemented in * subclasses, but both need nonfair try for trylock method. */ final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 这里没有对阻塞队列进行判断 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
公平锁和非公平锁不同之处:
- 非公平锁在调用 lock 后,首先就会调用 CAS 进行一次抢锁,如果这个时候恰巧锁没有被占用,那么直接就获取到锁返回了
- 非公平锁在 CAS 失败后,和公平锁一样都会进入到 tryAcquire 方法,在 tryAcquire 方法中,如果发现锁这个时候被释放了(state == 0),非公平锁会直接 CAS 抢锁,但是公平锁会判断等待队列是否有线程处于等待状态,如果有则不去抢锁,自动排到后面
如果这两次 CAS 都不成功,那么后面非公平锁和公平锁是一样的,都要进入到阻塞队列等待唤醒。
相对来说,非公平锁会有更好的性能,因为它的吞吐量比较大。当然,非公平锁让获取锁的时间变得更加不确定,可能会导致在阻塞队列中的线程长期处于饥饿状态。
-
Share
多个线程可同时执行。ReentrantReadWriteLock 读写锁允许多个线程同时对某一资源进行读。
不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源 state 的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS 已经在上层已经帮我们实现好了。
AQS 的模板方法模式
同步器的设计是基于模板方法模式的,如果需要自定义同步器一般的方式是:
- 使用者继承 AbstractQueuedSynchronizer 并重写指定的方法
- 将 AQS 组合在自定义同步组件的实现中,并调用其模板方法,而这些模板方法会调用使用者重写的方法
自定义同步器需要重写下面几个 AQS 提供的模板方法:
/**
* 该线程是否正在独占资源,只有用到condition才需要去实现它。
*/
protected boolean isHeldExclusively();
/**
* 独占方式。尝试获取资源,成功则返回true,失败则返回false。
*/
protected boolean tryAcquire(int arg);
/**
* 独占方式。尝试释放资源,成功则返回true,失败则返回false。
*/
protected boolean tryRelease(int arg);
/**
* 共享方式。尝试获取资源。
* 负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
*/
protected int tryAcquireShared(int arg);
/**
* 共享方式。尝试释放资源,成功则返回true,失败则返回false。
*/
protected boolean tryReleaseShared(int arg);
以 ReentrantLock 为例,state 初始化为 0,表示未锁定状态。A 线程 lock() 时,会调用 tryAcquire() 独占该锁并将 state+1。此后,其他线程再 tryAcquire() 时就会失败,直到 A 线程 unlock() 到 state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A 线程自己是可以重复获取此锁的(state 会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证 state 是能回到 0 的。
一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryRelease
、tryAcquireShared-tryReleaseShared
中的一种即可。
但 AQS 也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock
。