Java并发(3):AQS

AQS 的全称为 AbstractQueuedSynchronizer ,抽象队列同步器。

AQS 就是一个抽象类,主要用来构建锁和同步器。

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
}

AQS 为构建锁和同步器提供了一些通用功能的实现,因此,使用 AQS 能简单且高效地构造出应用广泛的大量的同步器,比如我们提到的 ReentrantLockSemaphore,其他的诸如 ReentrantReadWriteLockSynchronousQueue等等皆是基于 AQS 的。

AbstractQueuedSynchronizer是个抽象类,所有用到方法的类都要继承此类的若干方法,对应的设计模式就是模版模式。模版模式定义:一个抽象类公开定义了执行它的方法的方式/模板。它的子类可以按需要重写方法实现,但调用将以抽象类中定义的方式进行。这种类型的设计模式属于行为型模式。

为实现阻塞锁和相关的同步器提供一个框架,是一个抽象类的队列式同步器

1 CLH

image.png

CLH队列,全称Craig-Landin-Hagersten队列,是一种基于链表结构的自旋锁等待队列。它由一系列节点组成,每个节点代表一个等待锁的线程。这些节点按照FIFO(先进先出)的原则组织在一起,形成一个双向队列。

在CLH队列中,每个节点都包含一个指向前驱节点的引用和一个指向后继节点的引用。此外,每个节点还维护一个表示锁状态的标志位。当一个线程需要获取锁时,它会检查前驱节点的锁状态。如果前驱节点持有锁,当前线程就会进入自旋等待状态,不断检查前驱节点的锁状态,直到前驱节点释放锁为止。一旦前驱节点释放锁,当前线程就可以获取锁并执行相应的操作。

自旋锁没有抢到锁的线程会一直自旋等待锁的释放,处于busy-waiting的状态,此时等待锁的线程不会进入休眠状态,而是一直忙等待浪费CPU周期。因此自旋锁适用于锁占用时间短的场合。

互斥锁说的是传统意义的互斥锁,就是多个线程并发竞争锁的时候,没有抢到锁的线程会进入休眠状态即sleep-waiting,当锁被释放的时候,处于休眠状态的一个线程会再次获取到锁。缺点就是这一些列过程需要线程切换,需要执行很多CPU指令,同样需要时间。如果CPU执行线程切换的时间比锁占用的时间还长,那么可能还不如使用自旋锁。因此互斥锁适用于锁占用时间长的场合。

CLH锁其实就是一种是基于逻辑队列非线程饥饿的一种自旋公平锁,由于是 Craig、Landin 和 Hagersten三位发明,因此命名为CLH锁。

CLH锁原理如下:

  1. 首先有一个尾节点指针,通过这个尾结点指针来构建等待线程的逻辑队列,因此能确保线程线程先到先服务的公平性,因此尾指针可以说是构建逻辑队列的桥梁;此外这个尾节点指针是原子引用类型,避免了多线程并发操作的线程安全性问题;

  2. 通过等待锁的每个线程在自己的某个变量上自旋等待,这个变量将由前一个线程写入。由于某个线程获取锁操作时总是通过尾节点指针获取到前一线程写入的变量,而尾节点指针又是原子引用类型,因此确保了这个变量获取出来总是线程安全的。

CLH原理解析 - 我爱2B哥の博客 (gaoyeming.github.io)
AQS基础——多图详解CLH锁的原理与实现 - 知乎 (zhihu.com)
深入解析CLH队列:原理、应用与优势-CSDN博客算法:CLH锁的原理及实现_clh算法-CSDN博客

2 Java中的CAS

compare and swap的缩写,中文翻译成比较并交换,实现并发算法时常用到的一种技术。它包含三个操作数——内存位置、预期原值及更新值。执行CAS操作的时候,将内存位置的值与预期原值比较:如果相匹配,那么处理器会自动将该位置值更新为新值,如果不匹配,处理器不做任何操作,多个线程同时执行CAS操作只有一个会成功。

CAS (CompareAndSwap) 有3个操作数,位置内存值V,旧的预期值A,要修改的更新值B。当且仅当旧的预期值A和内存值V相同时,将内存值V修改为B,否则什么都不做或重来。

  • 基本类型原子类 -AtomicLong、AtomicInteger、AtomicBoolean

  • 数组类型原子类- AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray

  • 引用类型原子类- AtomicReference、AtomicStampedReference、AtomicMarkableReference

源码级别的讲解JAVA 中的CAS_cas源码-CSDN博客

3 AQS原理

CountDownLatch、Semaphore、ReentrantLock、ReentrantReadWriteLock

AQS 核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制 AQS 是用 CLH 队列锁 实现的,即将暂时获取不到锁的线程加入到等待队列中。

CLH(Craig,Landin,and Hagersten) 队列是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系)。AQS 是将每条请求共享资源的线程封装成一个 CLH 锁队列的一个结点(Node)来实现锁的分配。在 CLH 同步队列中,一个节点表示一个线程,它保存着线程的引用(thread)、 当前节点在队列中的状态(waitStatus)、前驱节点(prev)、后继节点(next)。

AQS 使用 int 成员变量 state 表示同步状态,通过内置的 FIFO 线程等待/等待队列 来完成获取资源线程的排队工作。

image.png

state 变量由 volatile 修饰,用于展示当前临界资源的获锁情况。

// 共享变量,使用volatile修饰保证线程可见性
private volatile int state;

状态

  • getState():返回同步状态

  • setState(int newState):设置同步状态

  • compareAndSetState(int expect, int update):使用CAS设置同步状态

  • isHeldExclusively():当前线程是否持有资源

独占资源(不响应线程中断)

  • tryAcquire(int arg):独占式获取资源,子类实现

  • acquire(int arg):独占式获取资源模板

  • tryRelease(int arg):独占式释放资源,子类实现

  • release(int arg):独占式释放资源模板

共享资源(不响应线程中断)

  • tryAcquireShared(int arg):共享式获取资源,返回值大于等于0则表示获取成功,否则获取失败,子类实现

  • acquireShared(int arg):共享式获取资源模板

  • tryReleaseShared(int arg):共享式释放资源,子类实现

  • releaseShared(int arg):共享式释放资源模板

3.1 内部类Node

image.png

3.1.1 waitStatus等待状态

image.png

3.2 条件变量

image.png

谈谈Java多线程离不开的AQS_java中aqs-CSDN博客

4 AQS实现

  • tryAcquire

  • tryRelease

  • tryAcquireShared

  • tryReleaseShared

  • isHeldExclusively

public class MyAQS {
private final Sync sync = new Sync();

public void lock(){
sync.acquire(1);
}

public void unlock(){
sync.release(1);
}

public boolean isLocked(){
return sync.isHeldExclusively();
}

/**
* 静态内部类,继承AQS,重写钩子方法
*/
private class Sync extends AbstractQueuedSynchronizer {
//独占方式,尝试获取资源
public boolean tryAcquire(int arg) {
if(compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
System.out.println(Thread.currentThread().getName()+"获取锁成功");
return true;
}
System.out.println(Thread.currentThread().getName()+"获取锁失败");
return false;
}
//独占方式,尝试释放资源
public boolean tryRelease(int arg) {
if(getState() == 0) {
throw new IllegalMonitorStateException();
}

setExclusiveOwnerThread(null);
setState(0);
System.out.println(Thread.currentThread().getName()+"释放锁成功!");
return true;
}

protected boolean isHeldExclusively() {
return getState() == 1;
}
}
}

测试类

public class MyAQSTest {
private MyAQS myAQS=new MyAQS();
public void use(){
myAQS.lock();
try{
Thread.sleep(1000);
}catch(InterruptedException e){
e.printStackTrace();
}finally {
myAQS.unlock();
}
}
public static void main(String[] args) {
MyAQSTest test=new MyAQSTest();
for(int i=0;i<3;i++){
new Thread(()->{
test.use();
}).start();
}
}
}

美团一面,面试官让介绍AQS原理并手写一个同步器,直接凉了 (qq.com)

5 同步工具类

5.1 Semaphore(信号量)

synchronized 和 ReentrantLock 都是一次只允许一个线程访问某个资源,而Semaphore(信号量)可以用来控制同时访问特定资源的线程数量。

5.2 CountDownLatch(倒计时器)

CountDownLatch 允许 count 个线程阻塞在一个地方,直至所有线程的任务都执行完毕。

CountDownLatch 是一次性的,计数器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当 CountDownLatch 使用完毕后,它不能再次被使用。

5.3 CyclicBarrier(循环栅栏)

CyclicBarrierCountDownLatch 非常类似,它也可以实现线程间的技术等待,但是它的功能比 CountDownLatch 更加复杂和强大。主要应用场景和 CountDownLatch 类似。

CountDownLatch 的实现是基于 AQS 的,而 CycliBarrier 是基于 ReentrantLock(ReentrantLock 也属于 AQS 同步器)和 Condition 的。

CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是:让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。

AQS 详解 | JavaGuide
Java并发之AQS详解 - waterystone - 博客园 (cnblogs.com)