Java 并发常见问题总结(4)
Java线程出现异常,进程为啥不会退出?
因为Java是采用线程独立模型,各个线程之间互相独立,有各自的上下文,当一个线程出现错误的时候,只会影响到这个线程自己本身,不会影响到其它的线程,更不会导致程序退出
不过我们这里介绍的异常更多是Exception,如果是error级别的,通常意味着硬件层面不够,才有可能会导致退出
此外Exception我们是可以通过捕获的,捕获了的话也不会导致线程直接死掉
Java是如何判断一个线程是否存活的?需要注意什么吗?
通过isAlive() 方法:
public class Main { public static void main(String[] args) throws InterruptedException { Thread t1 = new Thread(() -> { System.out.println("t1 begin"); try { Thread.sleep(1000); } catch (InterruptedException e) { } System.out.println("t1 end"); }); t1.start(); System.out.println("t1.isAlive()="+t1.isAlive()); t1.join(); // main 会在这里阻塞等到t1执行完 System.out.println("t1.isAlive()="+t1.isAlive()); } } 但是没那么简单,可以看一下以下代码的输出
public class Main { public static void main(String[] args) throws InterruptedException { Thread t1 = new Thread(() -> { System.out.println("t1 begin"); try { Thread.sleep(1000); } catch (InterruptedException e) { } System.out.println("t1 end"); }); Thread t2 = new Thread(() -> { synchronized (t1) { System.out.println("t2 begin"); try { Thread.sleep(2000); } catch (InterruptedException e) { } System.out.println("t2 end"); System.out.println("t1 isAlive:" + t1.isAlive()); } }); t1.start(); t2.start(); } }t1 begin t2 begin t1 end t2 end t1 isAlive:true出现这样的原因就是当一个线程执行完成之后,需要先拿到线程对象(t1)的锁才能去修改线程状态,但是因为这个时候这把锁被线程2拿着,所以线程1没办法修改状态
那导致的现象t1线程已经退出,但是状态始终是存活
int a=1是原子性操作吗?User a = new User(); 是原子性操作吗?
int a=1
在单线程的情况下可以认为是原子性的,这个语句本身只是声明一个变量并赋值为1
但是在多线程情况下就不一定能做到原子性,可能会有其它的赋值操作导致覆盖问题
例如
int a = 0; // 线程1 a = 1; // 线程2 a = 2;在多线程环境下,我们无法保证线程执行的先后顺序,所以无法做到原子性
为了做到原子性,一般会使用锁机制或是原子类(CAS)
User a = new User(); 不是原子操作
它底层其实需要先初始化一个内存空间、在内存空间创建对象、变量指向内存空间这几步
但是在多线程环境下且伴随指令重排的话,可能会导致某一个线程拿到的其实是没初始化好的对象
happens-before和as-if-serial有啥区别和联系?
其实这两个是JMM中比较重要的两个设计语义
happens-before 主要是解决多线程环境下线程之间的可见性问题,比如线程A对加了 volatile 变量进行修改,且A线程执行顺序是快于线程B的,那么执行线程B的时候一定要能看见线程A的最新修改
as-if-serial 主要解决单线程环境下的有序性问题,也就是单线程环境下,无论底层的指令经过怎么样的重排序,最终的结果都应该和代码预期的结果一直
ForkJoinPool和ThreadPoolExecutor区别是什么?
主要的区别:
- 工作方式(算法):ThreadPoolExecutor是任务分配,ForkJoinPool是工作窃取
- 线程数量:ThreadPoolExecutor需要手动设置,ForkJoinPool自动扩缩容
- 任务队列:ThreadPoolExecutor所有线程共享一个全局队列,ForkJoinPool每个线程都有自己的队列,然后还有一个全局队列
- 适合类型:ThreadPoolExecutor比较适合那种IO比较密集且任务不可分割,ForkJoinPool适合任务可以拆分的计算类型任务
任务分配 VS 工作窃取
- 任务分配就是每个线程到全局队列领取任务,如果领取不到就会阻塞等待
- 工作窃取就是每个线程会维护自己的一个工作队列,然后执行完之后会主动去看一下其它线程是否执行完成,如果没执行完会偷一些任务过来执行
ThreadPoolExecutor 和 ForkJoinPool 的全局队列存储的其实都是外部提交给线程池大的任务
只不过ForkJoinPool 内部会对大的任务进行拆分成一个个的小任务,然后每个线程会自己fork一部分小任务到自己的工作队列中去执行
CompletableFuture 为啥默认用 ForkJoinPool?
因为CompletableFuture的场景主要是将多个线程的执行结果进行合并或是用来编排线程的执行顺序,更像是拆分执行完再合并,和ForkJoinPool的能力比较契合
但是对于一些IO密集型的任务,最好还是不要直接默认ForkJoinPool,因为IO阻塞住某一个线程的话,反而会卡慢整个任务进度
那说白了ForkJoinPool其实就是把任务拆分成很多的小任务交给多个线程去执行,本质就是压榨cpu的性能,比较适合计算比较多的场景
CountDownLatch、CyclicBarrier、Semaphore区别?
这三个都是多线程情况下的同步工具,或者说是通信工具,但是在具体的功能和实现上还是有一些区别的
CountDownLatch
使用方式:
public class Main { public static void main(String[] args) throws InterruptedException { // 初始的时候计时器为3 CountDownLatch countDownLatch = new CountDownLatch(3); for (int i = 0; i < 3; i++) { new Thread(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (Exception e){} System.out.println("线程执行"); countDownLatch.countDown(); // 计数器减1 }).start(); } countDownLatch.await(); // 这里会阻塞等到计数器为0 System.out.println("主线程执行"); } } CountDownLatch 其实是一个计数器,主要提供了两个api
- countDownLatch.countDown() 实现将计数器减1
- countDownLatch.await(); 会阻塞等到计数器的数量变成0才执行
这个计数器主要的作用就是实现线程等待,比如一个线程或是多个线程的执行需要等待其它的线程执行完才能执行,就可以使用这个计数器
底层具体实现:实现了AQS,并且使用state表示数量
CyclicBarrier
使用方式:
public class Main { public static void main(String[] args) throws InterruptedException { // 初始化的时候执行需要等待多少个线程 CyclicBarrier barrier = new CyclicBarrier(3, () -> { System.out.println("所有线程执行到这"); }); for (int i = 0; i < 3; i++) { int idx = i; new Thread(() -> { System.out.println("线程执行" + idx + "到这"); try { // count -- 底层判断到count--操作后到0会放行,否则阻塞 barrier.await(); } catch (Exception e) {} System.out.println("线程执行" + idx + "继续执行"); }).start(); } } }线程执行0到这 线程执行2到这 线程执行1到这 所有线程执行到这 线程执行2继续执行 线程执行0继续执行 线程执行1继续执行这个的作用同样和计数器有点像,不过它主要是用来实现同步屏障
底层其实维护了一个count,每调用一次barrier.await();,count就会减1,然后会判断,当减少到0的时候会执行初始化的时候塞入的任务;如果减少后不为0会阻塞线程
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation; if (g.broken) throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } int index = --count; if (index == 0) { // tripped Runnable command = barrierCommand; if (command != null) { try { command.run(); } catch (Throwable ex) { breakBarrier(); throw ex; } } nextGeneration(); return 0; } // loop until tripped, broken, interrupted, or timed out for (;;) { try { if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }不直接继承 AQS,而是通过 ReentrantLock + Condition 实现(ReentrantLock 底层还是 AQS)。
Semaphore
使用方式:
public class Main { public static void main(String[] args) throws InterruptedException { // 第一个参数是许可证数量,第二个参数是是否开启公平模式 Semaphore semaphore = new Semaphore(3, true); for (int i = 0; i < 10; i++) { int idx = i; new Thread(() -> { try { System.out.println("线程" + idx + "准备拿许可证"); semaphore.acquire(); // 拿许可证,拿不到会阻塞 System.out.println("线程" + idx + "拿到拿许可证并执行"); } catch (InterruptedException e) { throw new RuntimeException(e); } finally { semaphore.release(); // 释放一个许可证 System.out.println("线程" + idx + "释放许可证"); } }).start(); } 这个同样和计数器很像,不过它更像是一种信号量的方式,初始化的时候指定许可证数量并指定是否要公平模式
- 每次semaphore.acquire();会尝试去拿许可证,拿到会将许可证减1,拿不到就阻塞
- semaphore.release(); 会释放一个许可证
Semaphore 底层主要是依赖AQS来实现的
那我们可以总结一下:
CountDownLatch、CyclicBarrier、Semaphore 这三个其实说白了都是通过计数的方式实现线程同步的
不过具体的用途和功能有区别
CountDownLatch 就是简单的一个计数器,调用
- countDownLatch.countDown() 实现将计数器减1
- countDownLatch.await(); 会阻塞等到计数器的数量变成0才执行
CyclicBarrier 则是一个屏障,调用
- barrier.await(); 会在底层将count-- 并判断是否已经为0,为0的话会唤醒和放行前面阻塞的线程,不为0则阻塞
Semaphore 是一个信号量机制,初始化的时候我们就要执行许可证的数量,调用
- semaphore.acquire();会尝试去拿许可证
- semaphore.release(); 会释放一个许可证
CountDownLatch适用于一个线程等待多个线程完成操作的情况
CyclicBarrier适用于多个线程在同一个屏障处等待
Semaphore适用于一个线程需要等待获取许可证才能访问共享
CompletableFuture对比FutureTask有什么优势?
CompletableFuture 的使用方式:https://www.yuque.com/jaychou-avdol/bhpczz/yfyyemfuqdn5reqs
主要的区别从API的丰富度就可以看出
首先就是FutureTask只能支持同步获取,也就是get() 方法
而CompletableFuture可以支持同步和异步两种方式,主要就是带Async的那几个,比如thenAcceptAsync,以及handle之类的
CompletableFuture 支持链式编程,而且提供了很多的方法,而FutureTask只能通过get() 拿到结果之后再去进行操作,而且很多还要自己手动实现,比如异常捕获
CompletableFuture 支持对多任务的编排,而FutureTask不支持多任务的操作,只能我们自己手动去编排
CompletableFuture 光是对异常的处理就有好几个API,而且有些API支持只有异常出现才会回调,相当于把异常单独拎出来处理,而FutureTask只能通过try-catch去包裹get() 方法才能处理异常
总结下来FutureTask的设计初衷是让我们能对任务的执行结果,异常信息,以及对任务状态的判断和取消,主要是初步的完成这些功能,而CompletableFuture相当于一个全能版,是对FutureTask的一个全面升级,除了FutureTask的基本功能之外,还多考虑了更优雅的处理执行结果和异常的方式,以及支持异步操作避免阻塞主线程,并且重点集成了对多线程编排的功能
CompletableFuture的底层是如何实现的?
那我们为了了解CompletableFuture,可以从下面这三个维度入手
CompletableFuture 对比Future有什么优势?为什么要封装CompletableFuture?
Future 其实是jdk8以前用异编程的工具,主要就是提交一个异步任务+获取结果
- 但是Future具有很多缺点,这些缺点都在CompletableFuture得到解决
- 无法手动设置值,只能等任务执行完成,在一些失败场景不好处理
- 没办法显示处理异常,只能通过get() 的时候抛出异常才去处理
- 多任务同步等待的时候需要借助 CountDownLatch 才能实现
- get() 方法是阻塞的
- Future 无链式调用,代码看起来会比较乱
总结就是Future主要实现了提交异步任务并获取执行结果的基本需求,而CompletableFuture实现了 异步任务编排、异常处理、任务组合、非阻塞回调
CompletableFuture常见的API是什么?有什么区别?
执行异步任务:
public class Main { public static void main(String[] args) throws InterruptedException { // supplyAsync 有返回值 // 不指定线程池执行,默认使用 ForkJoinPool CompletableFuture<Object> future1 = CompletableFuture.supplyAsync(() -> { System.out.println("111"); return null; }); // 指定线程,使用我们自己指定的线程去执行 CompletableFuture<Object> future2 = CompletableFuture.supplyAsync(() -> { System.out.println("111"); return null; }, Executors.newFixedThreadPool(10)); // runAsync 无返回值 // 不指定线程池执行,默认使用 ForkJoinPool CompletableFuture<Void> future3 = CompletableFuture.runAsync(() -> { System.out.println("111"); }); // 指定线程,使用我们自己指定的线程去执行 CompletableFuture<Void> future4 = CompletableFuture.runAsync(() -> { System.out.println("111"); }, Executors.newFixedThreadPool(10)); } }其实就两个API,取决于你需不需要执行结果
- supplyAsync 有返回值
- runAsync 无返回值
然后执行时使用的线程池都是,有指定时使用指定的线程池,无指定时使用默认的ForkJoinPool
链式处理API:
处理前一个结果(有返回值)
public class Main { public static void main(String[] args) throws InterruptedException { // 同步回调 CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> { System.out.println("111"); return 222; }).thenApply(res -> { System.out.println(res); return res; }); // 异步回调 CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> { System.out.println("111"); return 222; }).thenApplyAsync(res -> { System.out.println(res); return res; }); } }无论是thenApplyAsync还是thenApply,其实本质上就是拿到上一个future处理之后return的结果,然后进行处理,只不过是使用同步还是异步的方式而已,使用异步的话,如果没有指定线程池,同样是用默认的ForkJoinPool
处理前一个结果(无返回值)
public class Main { public static void main(String[] args) throws InterruptedException { // 同步回调 CompletableFuture.supplyAsync(() -> { System.out.println("111"); return 222; }).thenAccept(res -> { System.out.println(res); }); CompletableFuture.supplyAsync(() -> { System.out.println("111"); return 222; }).thenAccept(res -> { System.out.println(res); }); } }这两个api的本质区别就是是否异步
注意:消费和处理的区别,消费就是消费完之后到这里就结束了,而处理是处理完之后继续返回出去
在上一步执行完成之后继续执行接下去的逻辑(无返回值)
public class Main { public static void main(String[] args) throws InterruptedException { // 同步回调 CompletableFuture<Void> future1 = CompletableFuture.supplyAsync(() -> { System.out.println("111"); return 222; }).thenRun(() -> { System.out.println("222"); }); CompletableFuture<Void> future2 = CompletableFuture.supplyAsync(() -> { System.out.println("111"); return 222; }).thenRunAsync(() -> { System.out.println("222"); }); } }不关心上一步的结果,而是直接执行下一步,适合做一些收尾动作
串行执行任务:
public class Main { public static void main(String[] args) throws InterruptedException { // 串行执行 CompletableFuture<Object> completableFuture = CompletableFuture.supplyAsync(() -> { System.out.println("111"); return 222; }).thenCompose(res -> { System.out.println(res); return null; }); } }串行的执行多个任务,不过一般用的不多,要串行没必要使用这个
并行执行多个任务:
//异步通知下游系统 CompletableFuture<Void> allFutures = CompletableFuture.allOf(noticeDetails.stream() .map(detail -> CompletableFuture.supplyAsync(() -> { notice(detail); return null; })).toArray(CompletableFuture[]::new)); //所有任务通知成功后,更新代还通知单 allFutures.whenComplete((v, e) -> { if (e == null) { noticeOrder.setState("SUCCESS"); Boolean res = noticeOrderService.updateById(noticeOrder); Assert.isTrue(res, "update failed"); } else { log.error("notice failed", e); } });CompletableFuture 底层怎么实现的?为什么要这么实现?
CompletableFuture 内部采用了一种链式的结构来处理异步计算的结果,每个 CompletableFuture 都有一个与之关联的 Completion 链,它可以包含多个 Completion 阶段,每个阶段都代表一个异步操作,并且可以指定它所依赖的前一个阶段的计算结果。
CompletableFuture 还使用了一种事件驱动的机制来处理异步计算的完成事件。在一个 CompletableFuture 对象上注册的 Completion 阶段完成后,它会触发一个完成事件,然后 CompletableFuture 对象会执行与之关联的下一个 Completion 阶段。
CompletableFuture 的异步计算是通过线程池来实现的。CompletableFuture在内部使用了一个ForkJoinPool线程池来执行异步任务。当我们创建一个CompletableFuture对象时,它会在内部创建一个任务,并提交到ForkJoinPool中去执行。
CAS在操作系统层面是如何保证原子性的?
底层是通过硬件层面提供的原子指令实现的
在x86架构的cpu中,通常是使用cmpxchg指令对总线进行锁住,防止在这个的期间有其它cpu来访问,并且在这个期间还会禁止cpu出现中断,执行完才会释放
所以可以总结一下就是通过底层的指令对数据进行上锁,然后执行完释放
此外因为cmpxchg是基于cpu缓存一致性协议实现的,所以还能做到可见性
CAS一定有自旋吗?
不一定,但是一般会采用自旋的方式
因为CAS是比较并交换,多线程同时操作的情况下成功率较低,所以一般会采用自旋的方式提高成功率
短时间的自旋性能是不错的,至少比直接阻塞线程,然后等待唤醒要好,但是如果长时间自旋的话就不适合了,自旋是一直占用cpu的,cpu没办法干别的事情,导致系统性能下降
最好就是有次数的自旋
读写锁了解吗?
了解一点。主要是ReentrantReadWriteLock
它里面封装了写锁和读锁
public static class WriteLock implements Lock, java.io.Serializable { private static final long serialVersionUID = -4992448646407690164L; private final Sync sync; protected WriteLock(ReentrantReadWriteLock lock) { sync = lock.sync; } public void lock() { sync.acquire(1); } public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } public boolean tryLock() { return sync.tryWriteLock(); } public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); } public void unlock() { sync.release(1); } public Condition newCondition() { return sync.newCondition(); } public String toString() { Thread o = sync.getOwner(); return super.toString() + ((o == null) ? "[Unlocked]" : "[Locked by thread " + o.getName() + "]"); } public boolean isHeldByCurrentThread() { return sync.isHeldExclusively(); } public int getHoldCount() { return sync.getWriteHoldCount(); } } public static class ReadLock implements Lock, java.io.Serializable { private static final long serialVersionUID = -5992448646407690164L; private final Sync sync; protected ReadLock(ReentrantReadWriteLock lock) { sync = lock.sync; } public void lock() { sync.acquireShared(1); } public void lockInterruptibly() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public boolean tryLock() { return sync.tryReadLock(); } public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } public void unlock() { sync.releaseShared(1); } public Condition newCondition() { throw new UnsupportedOperationException(); } public String toString() { int r = sync.getReadLockCount(); return super.toString() + "[Read locks = " + r + "]"; } }读锁是共享的,写锁是互斥的,读写也是互斥的
state字段的高16位表示读锁的重入次数,低16位表示写锁的重入次数
static int sharedCount(int c) { return c >>> SHARED_SHIFT; } static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }可以看一下写锁和读锁加锁时有什么区别:
写锁:
@ReservedStackAccess protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); // 当前确实有人持有锁 if (c != 0) { // 写锁为0或是锁不属于当前线程都无法加锁成功 if (w == 0 || current != getExclusiveOwnerThread()) return false; // 锁确实是当前线程的,但是重入次数过多,所以需要报错 if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // 重入次数加1 setState(c + acquires); return true; } // 当前没人持有任何锁,尝试CAS加锁,并将当前持有线程设置为自己 if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; setExclusiveOwnerThread(current); return true; }流程总结:
- 判断当前是否有人持有锁(读锁/写锁)
- 当前有人持有
- 判断是否是写锁且持有者为自己
- 不是直接返回失败
- 是就判断重入次数是否溢出,不溢出就重入次数+1
- 当前无人持有
- CAS尝试加锁并设置持有线程为自己
可以看见,当一个线程已经拥有读锁的时候,并没有办法升级为写锁,主要是因为读锁是共享的,升级的话可能造成死锁
读锁:
@ReservedStackAccess protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); int c = getState(); // 如果已经有人加了写锁并且这个锁的拥有者不是自己,直接返回错误 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; int r = sharedCount(c); if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { // 当前线程是第一个加读锁的线程 if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } // 判断已经拥有读锁的线程是自己,重入次数+1 else if (firstReader == current) { firstReaderHoldCount++; } // 将当前线程Id记录下来并记录重入次数 else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != LockSupport.getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } // 兜底 return fullTryAcquireShared(current); }流程总结:
- 判断是否有人加了写锁且该锁属于自己
- 不是,直接快速失败
- 如果是说明允许继续拿读锁
- 判断自己是否是第一个拿读锁的线程,是的话,重入次数+1
- 将自己的线程信息和重入次数信息进来起来
读锁解锁时:
@ReservedStackAccess protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); // 当前线程是第一个加读锁者 if (firstReader == current) { // assert firstReaderHoldCount > 0; if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } // 不是第一个,说明是使用ThreadLocal进行存储 // 找到对应的ThreadLocal之后进行移除 else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != LockSupport.getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } // CAS减少重入次数 for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) // Releasing the read lock has no effect on readers, // but it may allow waiting writers to proceed if // both read and write locks are now free. return nextc == 0; } }可以发现重入线程的记录这一块是分开存储的
第一个线程是通过变量的方式存储,只有第二个线程才是ThreadLocal存储
主要是为了在只有一个线程加锁的情况下进行提速
HoldCounter主要就是记录 线程id和重入次数
如何理解AQS?
AQS就是抽象队列同步器,是JUC下的一个同步器基础框架,常见的Semaphore, ReentrantLock、CountDownLatch 都是基于AQS实现的
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {在AQS内部,维护了一个FIFO队列和一个volatile的int类型的state变量。在state=1的时候表示当前对象锁已经被占有了,state变量的值修改的动作通过CAS来完成。
源码:
/** * The synchronization state. */ private volatile int state; /** * Returns the current value of synchronization state. * This operation has memory semantics of a {@code volatile} read. * @return current state value */ protected final int getState() { return state; } /** * Sets the value of synchronization state. * This operation has memory semantics of a {@code volatile} write. * @param newState the new state value */ protected final void setState(int newState) { state = newState; } /** * Atomically sets synchronization state to the given updated * value if the current state value equals the expected value. * This operation has memory semantics of a {@code volatile} read * and write. * * @param expect the expected value * @param update the new value * @return {@code true} if successful. False return indicates that the actual * value was not equal to the expected value. */ protected final boolean compareAndSetState(int expect, int update) { return U.compareAndSetInt(this, STATE, expect, update); }// Node类用于构建队列 static final class Node { // 标记节点状态。常见状态有 CANCELLED(表示线程取消)、SIGNAL(表示后继节点需要运行)、CONDITION(表示节点在条件队列中)等。 volatile int waitStatus; // 前驱节点 volatile Node prev; // 后继节点 volatile Node next; // 节点中的线程,存储线程引用,指向当前节点所代表的线程。 volatile Thread thread; } // 队列头节点,延迟初始化。只在setHead时修改 private transient volatile Node head; // 队列尾节点,延迟初始化。 private transient volatile Node tail; // 入队操作 private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // 必须先初始化 if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }可以发现底层的队列其实是通过Node节点组成的双向链表实现的(Node是AQS的一个内部类)
当线程抢锁失败就会被封装成一个Node节点并放入这条链表的尾部,也就是存放进队尾

同时,当有线程释放锁的时候会唤醒队头节点的线程

然后无论是公平锁还是非公平锁,唤醒的逻辑都是一样的,都是唤醒队头节点
- 但是公平锁就是唤醒之后,头节点会 调用
tryAcquire时,hasQueuedPredecessors()会返回false,那么就会直接把锁给到头节点,而新来的线程自然就抢不到 - 非公平就是唤醒之后还要通过CAS去抢夺,因为这个时候可能会有新的线程进来,不一定能抢过
为什么公平模式下,基本等价于把锁给队头节点,那为何还要调用tryAcquire去进行CAS抢锁呢?
因为state是volatile修饰的,它的目的是做到可见性,并且对state的修改只能通过CAS实现,这样才能保证多线程下的可见性、有序性以及原子性
所以直接通过这种形式上的CAS抢夺反而是更加合理的
从本质上看,AQS提供了两种锁:
分别是排它锁和共享锁。
- 排它锁就是存在多线程竞争同一共享资源时,同一时刻只允许一个线程访问该共享资源,也就是多个线程中只能有一个线程获得锁资源,比如Lock中的ReentrantLock重入锁实现就是用到了AQS中的排它锁功能。
- 共享锁也称为读锁,就是在同一时刻允许多个线程同时获得锁资源,比如CountDownLatch和Semaphore都是用到了AQS中的共享锁功能。
AQS 总共有两种队列,一种是同步队列,用于实现锁的获取和释放。还有一种是条件队列,条件队列也是一个FIFO队列,用于在特定条件下管理线程的等待和唤醒。
AQS提供了两种模式来支持不同类型的同步器:独占模式和共享模式。
AQS为什么采用双向链表?
为什么选择链表,而不是数组?
因为数组实现队列的话移除操作比较多,数组的移除性能不是很好,此外就是数组需要扩容,所以使用队列比较合适
为什么使用双向链表而不是单链表?
双链表能支持双向遍历,无论是进行数据插入还是删除都会更加的灵活
单链表的优势则是内存比较省
但是对于需要在链表中进行元素的插入和删除的场景来说,双向链表无疑是更加适合的
看一下AQS里面的具体场景:
获取同步状态,AQS提供了两种API,一种是支持中断,一种不支持
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); }/** * Acquires in exclusive interruptible mode. * @param arg the acquire argument */ private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }在这个方法的finally中执行了一个cancelAcquire的动作,也就是说在acquire的过程中,如果出现了线程中断异常,那么就会执行这个方法,他的主要作用就是将中断的线程节点从AQS同步队列中移除。
而涉及到将具体某一个节点移除,使用双向链表会更加的合适,单向链表还需要从头遍历找到这个节点才能删除
高效的挂起支持
/** * Checks and updates status for a node that failed to acquire. * Returns true if thread should block. This is the main signal * control in all acquire loops. Requires that pred == node.prev. * * @param pred node's predecessor holding status * @param node the node * @return {@code true} if thread should block */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) //如果ws为Node.SIGNAL(值为-1), //这意味着前驱节点已经处于等待状态,期望在释放同步状态时唤醒后继节点。 //在这种情况下,方法可以直接返回true,指示当前线程可以安全地挂起。 return true; if (ws > 0) { //如果ws大于0,说明前驱节点已被取消。 //此时,循环向前遍历等待队列,跳过所有已取消的节点,直到找到一个未被取消的节点作为新的前驱节点,并更新相应的链接。 //这个过程是为了维护等待队列的健康状态,移除其中的无效节点。 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { //如果前驱节点的等待状态不是SIGNAL(也即,它是0或PROPAGATE), //则将前驱节点的等待状态更新为SIGNAL。 //这是通过compareAndSetWaitStatus方法完成的,它安全地修改节点的状态,以指示当前节点(node)在释放同步状态时需要被唤醒。 //这里并不立即挂起当前线程,而是返回false,让调用者知道它需要再次尝试获取同步状态,在无法获取时再决定挂起。 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }当一个线程获取锁失败后,就需要根据前面的节点来判断自己接下来的操作,是被挂起,还是继续尝试
而不是直接在一个死循环中无限的等待
可以发现上面的这个过程就需要查询前一个节点的状态来决定当前节点的状态,所以需要使用双向链表,如果是单向链表还要从头开始遍历
高效的判断一个线程是否在等待队列中
public final boolean isQueued(Thread thread) { if (thread == null) throw new NullPointerException(); for (Node p = tail; p != null; p = p.prev) if (p.waiter == thread) return true; return false; }这里是采用从后向前遍历,因为新加入的线程会塞入队尾,从后向前遍历可能可以更快的找到(虽然概率不高)
可以发现这里需要支持从后向前遍历,所以比如使用双向链表
支持条件队列
AQS还支持条件变量,这允许线程在特定条件满足之前挂起。条件队列需要能够从等待队列中移动节点到条件队列,以及反向操作。双向链表使得这种操作更加直接和高效。
AQS的核心组成介绍一下?分别有什么用?
总的来说就是:
- 1个同步状态
- 1个同步队列
- 2套模板方法
- 若干工具方法
1个同步状态指的就是
private volatile int state;加volatile是为了保证有序性+可见性,搭配上CAS的原子性,能适配高并发场景
有什么用?
这个看具体的实现类的场景
- 独占锁:表示资源是否被占用
- 共享锁:表示资源数量
- 读写锁:高16位表示读锁重入数,低16位表示写锁重入数
一个同步队列是指Node节点堆起来的双向链表实现的FIFO队列
abstract static class Node { volatile Node prev; // initially attached via casTail volatile Node next; // visibly nonnull when signallable Thread waiter; // visibly nonnull when enqueued volatile int status; // written by owner, atomic bit ops by others }主要就是标识封装的线程以及这个节点的前后驱节点以及节点状态
同步队列主要是用来存储抢锁失败的那些线程,它们会被阻塞起来并封装成节点塞入队尾等待被唤醒重新抢锁
2套模板方法主要是指AQS对共享和独占的具体实现
final int acquire(Node node, int arg, boolean shared, boolean interruptible, boolean timed, long time) { Thread current = Thread.currentThread(); byte spins = 0, postSpins = 0; // retries upon unpark of first thread boolean interrupted = false, first = false; Node pred = null; // predecessor of node when enqueued /* * Repeatedly: * Check if node now first * if so, ensure head stable, else ensure valid predecessor * if node is first or not yet enqueued, try acquiring * else if node not yet created, create it * else if not yet enqueued, try once to enqueue * else if woken from park, retry (up to postSpins times) * else if WAITING status not set, set and retry * else park and clear WAITING status, and check cancellation */ for (;;) { if (!first && (pred = (node == null) ? null : node.prev) != null && !(first = (head == pred))) { if (pred.status < 0) { cleanQueue(); // predecessor cancelled continue; } else if (pred.prev == null) { Thread.onSpinWait(); // ensure serialization continue; } } if (first || pred == null) { boolean acquired; try { if (shared) acquired = (tryAcquireShared(arg) >= 0); else acquired = tryAcquire(arg); } catch (Throwable ex) { cancelAcquire(node, interrupted, false); throw ex; } if (acquired) { if (first) { node.prev = null; head = node; pred.next = null; node.waiter = null; if (shared) signalNextIfShared(node); if (interrupted) current.interrupt(); } return 1; } } if (node == null) { // allocate; retry before enqueue if (shared) node = new SharedNode(); else node = new ExclusiveNode(); } else if (pred == null) { // try to enqueue node.waiter = current; Node t = tail; node.setPrevRelaxed(t); // avoid unnecessary fence if (t == null) tryInitializeHead(); else if (!casTail(t, node)) node.setPrevRelaxed(null); // back out else t.next = node; } else if (first && spins != 0) { --spins; // reduce unfairness on rewaits Thread.onSpinWait(); } else if (node.status == 0) { node.status = WAITING; // enable signal and recheck } else { long nanos; spins = postSpins = (byte)((postSpins << 1) | 1); if (!timed) LockSupport.park(this); else if ((nanos = time - System.nanoTime()) > 0L) LockSupport.parkNanos(this, nanos); else break; node.clearStatus(); if ((interrupted |= Thread.interrupted()) && interruptible) break; } } return cancelAcquire(node, interrupted, interruptible); }独占 acquire()、acquireInterruptibly()、release()
共享 acquireShared()、releaseShared()
最较新的jdk版本中大部分逻辑已经抽象封装成一套接口,在接口内部进行判断处理,比如acquire方法
若干工具方法
主要就是一些调用底层的UnSafe实现的方法,比如CAS以及阻塞和唤醒线程
AQS的核心设计思想了解吗?
AQS复制封装一些模板方法,AQS本身是一个抽象类,子类继承之后通过对方法进行重写,从而实现自己想要的逻辑
AQS对公平锁和非公平锁的实现说一下?
具体的区别就是在每次唤醒的时候都是唤醒队头元素,不过公平实现的情况下,CAS的最终结果会是队头元素成功获取到锁,在这个过程中如果有新来的线程就会因为获取不到而进入同步队列等待
非公平就是CAS的时候有可能会让新来的线程成功拿到锁
AQS的acquire的流程聊一下?和acquireInterruptibly有什么区别吗? 为什么 acquire 要忽略中断?
acquire()
public final void acquire(int arg) { if (!tryAcquire(arg)) // 抢锁失败 acquire(null, arg, false, false, false, 0L); }/** * 核心的锁获取方法,是AQS(抽象队列同步器)的核心实现 * @param node 封装当前线程的节点(初始为null,后续创建) * @param arg 获取锁的参数(如重入锁的重入次数,共享锁的许可数) * @param shared 是否为共享模式(true=共享,false=独占) * @param interruptible 是否可中断(true=响应中断,false=忽略中断) * @param timed 是否超时获取(true=有超时时间,false=无限等待) * @param time 超时时间(纳秒),仅timed=true时有效 * @return 1=获取成功;其他值=取消获取(如中断/超时) */ final int acquire(Node node, int arg, boolean shared, boolean interruptible, boolean timed, long time) { // 当前请求锁的线程 Thread current = Thread.currentThread(); // spins:初始自旋次数;postSpins:唤醒后重试自旋次数(用于减少不公平性) byte spins = 0, postSpins = 0; // interrupted:标记线程是否被中断;first:标记当前节点是否为队列第二个节点(head的后继) boolean interrupted = false, first = false; // pred:当前节点的前驱节点(入队后赋值) Node pred = null; /* * 核心自旋逻辑,循环执行以下操作: * 1. 检查当前节点是否成为队列第二个节点(head的后继) * 2. 若节点是第二个节点或未入队,尝试获取锁 * 3. 若节点未创建,则创建(共享/独占节点) * 4. 若节点未入队,则尝试入队 * 5. 若从park中唤醒,重试(最多postSpins次) * 6. 若节点状态未设置为WAITING,则设置并重试 * 7. 否则阻塞线程,并检查取消条件(中断/超时) */ for (;;) { // 无限自旋,直到获取成功/取消获取 // ========== 第一步:检查节点位置和前驱节点有效性 ========== // !first:还未标记为队列第二个节点 // pred = node.prev:获取当前节点的前驱 // !(first = (head == pred)):判断前驱是否为head(即当前节点是否为第二个节点) if (!first && (pred = (node == null) ? null : node.prev) != null && !(first = (head == pred))) { // 前驱节点状态<0:前驱已取消(CANCELLED),清理队列中无效节点 if (pred.status < 0) { cleanQueue(); // 清理队列中已取消的节点 continue; // 清理后重新自旋 } // 前驱节点的前驱为null(前驱不是head),说明队列未稳定,自旋等待 else if (pred.prev == null) { Thread.onSpinWait(); // JDK9+优化:提示CPU当前线程自旋,减少空耗 continue; } } // ========== 第二步:尝试获取锁(节点是第二个/未入队) ========== if (first || pred == null) { // 节点是第二个节点 或 未入队(直接尝试获取) boolean acquired; // 是否获取成功 try { // 共享模式:调用tryAcquireShared(需子类实现,返回>=0表示成功) // 独占模式:调用tryAcquire(需子类实现,返回true表示成功) if (shared) acquired = (tryAcquireShared(arg) >= 0); else acquired = tryAcquire(arg); } catch (Throwable ex) { // 获取过程中抛出异常,取消获取并传播异常 cancelAcquire(node, interrupted, false); throw ex; } // 获取锁成功 if (acquired) { if (first) { // 当前节点是队列第二个节点(需要更新head) node.prev = null; // 新head的前驱置null head = node; // 将当前节点设为新的head pred.next = null; // 原head的后继置null(帮助GC) node.waiter = null; // 清空节点关联的线程(释放引用) if (shared) // 共享模式:唤醒后续共享节点(传播唤醒) signalNextIfShared(node); if (interrupted) // 若线程曾被中断,恢复中断状态(保证中断状态不丢失) current.interrupt(); } return 1; // 获取成功,返回1 } } // ========== 第三步:节点未创建则创建 ========== if (node == null) { // 首次自旋,节点尚未创建 if (shared) node = new SharedNode(); // 共享模式:创建共享节点 else node = new ExclusiveNode(); // 独占模式:创建独占节点 } // ========== 第四步:节点未入队则尝试入队 ========== else if (pred == null) { // 节点已创建但未入队(pred=null) node.waiter = current; // 节点关联当前线程 Node t = tail; // 获取队列尾节点 // 松弛设置前驱(避免不必要的内存屏障,提升性能) node.setPrevRelaxed(t); if (t == null) tryInitializeHead(); // 队列为空,初始化head节点 else if (!casTail(t, node)) // CAS更新尾节点(保证原子性) // CAS失败,说明有其他线程竞争入队,回滚前驱设置 node.setPrevRelaxed(null); else t.next = node; // CAS成功,原尾节点的后继指向当前节点 } // ========== 第五步:自旋等待(减少阻塞,提升性能) ========== else if (first && spins != 0) { --spins; // 减少自旋次数(降低不公平性) Thread.onSpinWait(); // 提示CPU自旋,优化性能 } // ========== 第六步:设置节点状态为WAITING(允许被唤醒) ========== else if (node.status == 0) { node.status = WAITING; // 设置状态为等待,允许后续被signal唤醒 } // ========== 第七步:阻塞当前线程(核心等待逻辑) ========== else { // 调整postSpins(指数级增加,最多到Byte.MAX_VALUE) spins = postSpins = (byte)((postSpins << 1) | 1); if (!timed) // 非超时模式:无限阻塞,直到被unpark LockSupport.park(this); else if ((nanos = time - System.nanoTime()) > 0L) // 超时模式:剩余时间>0,阻塞指定纳秒数 LockSupport.parkNanos(this, nanos); else // 超时时间已到,退出自旋 break; node.clearStatus(); // 唤醒后清空节点状态(重置为0) // 检查线程是否被中断: // 1. Thread.interrupted():清除中断状态并返回是否中断 // 2. 若interruptible=true,中断则退出自旋 if ((interrupted |= Thread.interrupted()) && interruptible) break; } } // 退出自旋(中断/超时),取消获取锁并返回结果 return cancelAcquire(node, interrupted, interruptible); }大概的逻辑就是这样:
- 尝试获取锁,如果成功直接返回,失败进入AQS队列的逻辑
- 将当前线程封装成一个Node节点,加入队列的尾部
- 进入自旋,判断自己是不是队列的第二个节点(head的下一个节点,也可以认为是第一个等待节点),如果出现异常就会将当前节点设置为 CANCELLED 并结束
- 如果是,说明有资格抢锁
- 不是,准备阻塞
- 有资格抢锁的话会去抢锁,成功直接返回,失败同样等待阻塞
- 失败,将节点设置为 WAITING ,并调用park() 将当前线程阻塞住
- 被唤醒后,继续抢锁,如果成功就返回
可以发现以上的逻辑,在出现异常的时候interrupted |= Thread.interrupted()) && interruptible只是记录,只有当interruptible为true时在退出循环,但是这个值调用方传递的是false,也就是只要没拿到锁或不抛出异常就不会退出
acquireInterruptibly()
public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted() || (!tryAcquire(arg) && acquire(null, arg, false, true, false, 0L) < 0)) throw new InterruptedException(); }其实底层调用的逻辑和前面是一样的,只不过interruptible参数传递的是true,这样就能做到出现异常的时候会及时的抛出,不会忽略
acquire之所以要忽略中断是因为语义决定的,这类锁的特点,我们更加倾向于获取到锁,而不是出现中断退出,当然AQS的实现类基本对两种方式都有实现,例如ReentranLock
AQS是如何实现线程的等待和唤醒的?
这个主要是两个方法,Java中的park()和unpark(),这两个都是本地方法,依赖底层的实现
park() 负责阻塞线程,unpark()负责唤醒线程
当一个线程尝试获取锁或者同步器时,如果获取失败,AQS会将该线程封装成一个Node并添加到等待队列中,然后通过LockSupport.park()将该线程阻塞。
当一个线程释放锁或者同步器时,AQS会通过LockSupport.unpark()方法将等待队列中的第一个线程唤醒,并让其重新尝试获取锁或者同步器。
除了基本的等待和唤醒机制,AQS还提供了条件变量(Condition)的实现,用于在某些条件不满足时让线程等待,并在条件满足时唤醒线程。具体实现是通过创建一个等待队列,将等待的线程封装成Node并添加到队列中,然后将这些线程从同步队列中移除,并在条件满足时将等待队列中的所有线程唤醒。
AQS的同步队列和条件队列原理?
AQS中提供了两种队列,同步队列主要是为了实现锁机制,而条件队列则是为了实现线程间的协调和通信
结论:
- 目的不同:同步队列主要用于管理锁的获取和释放,而条件队列用于等待特定条件的满足。
- 使用方式不同:同步队列是AQS自动管理的,开发者通常不需要直接与之交互;而条件队列是通过Condition接口暴露给开发者的,需要显式地调用等待(await)和通知(signal/signalAll)方法。
- 队列类型不同:虽然它们都是队列结构,但同步队列是所有基于AQS同步器共享的,每个同步器实例只有一个同步队列;条件队列是每个Condition实例特有的,一个同步器可以有多个Condition对象,因此也就有多个条件队列。
同步队列:
同步队列主要用于实现锁的获取和释放。如我们常用的ReentrantLock,就是基于同步队列来实现的。
- 队列是先进先出的实现逻辑
- 同步队列的逻辑就是,当一个线程去尝试获取锁失败的时候会封装成一个Node节点,塞入队列的尾部
- 然后当持有锁的线程将锁释放的时候会唤醒队头元素起来进行抢锁行为
static final class Node { // 前驱和后继节点,构成双向链表 Node prev; Node next; // 线程本身 Thread thread; // 状态信息,表示节点在同步队列中的等待状态 int waitStatus; // ... }private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // 尝试快速路径:直接尝试在尾部插入节点 Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 快速路径失败时,进入完整的入队操作 enq(node); return node; } private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // 队列为空,初始化 if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }同步队列的抢锁逻辑主要看
条件队列:
条件队列其实是让开发人员自己来制定规则,能做到当线程不满足一定的条件时就进入条件队列,等待被唤醒
条件队列与同步队列不同,它是基于Condition接口实现的,用于管理那些因为某些条件未满足而等待的线程。当条件满足时,这些线程可以被唤醒。每个Condition对象都有自己的一个条件队列
ConditionObject是AQS的一个内部类(实现了Condition接口),用于实现条件变量。条件变量是并发编程中一种用于线程间通信的机制,它允许一个或多个线程在特定条件成立之前等待,同时释放相关的锁。这在某种程度上类似于对象监视器模式中的wait()和notify()方法,但提供了更灵活和更强大的控制。
public class ConditionObject implements Condition, java.io.Serializable { // 条件队列的首尾节点 private transient Node firstWaiter; private transient Node lastWaiter; // ... }主要的原理:await():使当前线程释放锁并进入等待队列,直到被另一个线程的signal()或signalAll()方法唤醒,或被中断。
ConditionObject 的原理就是把拿到锁,但是不满足特定条件的线程从同步队列移动到条件队列中,等到其满足特定条件之后再从条件队列移动会同步队列重新进行抢锁操作
主要的区别:
- 同步队列为双向链表,只存等锁的线程,而条件队列是一个单项链表,主要是存储那些拿到锁但是不满足特定条件的线程
- await() 方法就是为已经持有锁,但是不满足我们指定的条件,且不在同步队列的线程创建一个新节点并放入条件队列
- signal() 方法是将当前线程从条件队列移动到同步队列中
这里最主要的方法就是await() 以及 signal() ,接下来就拆解一下
拆解await() 方法:它的具体目标就是 释放掉所有锁——进入条件队列——等待被唤醒
public final void await() throws InterruptedException { // 步骤1:检查前置中断(响应中断的第一步) // 含义:如果线程在调用await()前已经被中断,直接抛异常(不允许带着中断状态等) if (Thread.interrupted()) throw new InterruptedException(); // 步骤2:入条件队列(把当前线程封装成Node,加入Condition的单向链表) // addConditionWaiter()逻辑:new一个Node(状态为CONDITION),加到lastWaiter后面 Node node = addConditionWaiter(); // 步骤3:完全释放锁(核心!必须释放锁,不然其他线程拿不到锁) // fullyRelease():释放AQS的state(同步状态),返回释放前的state值(比如重入锁的重入次数) // 为什么“完全释放”?因为线程要去等条件,必须把锁彻底交出去,不能留重入次数 int savedState = fullyRelease(node); // 步骤4:初始化中断模式(记录等待过程中是否被中断、中断的类型) // 0=未中断,THROW_IE=抛异常,REINTERRUPT=补中断标记 int interruptMode = 0; // 步骤5:循环阻塞(只要没回同步队列,就一直等) // isOnSyncQueue(node):判断当前Node是否在AQS的同步队列里(不在就继续等) while (!isOnSyncQueue(node)) { // 阻塞线程(核心!线程进入休眠,直到被signal()/中断唤醒) LockSupport.park(this); // 检查等待期间是否被中断,更新interruptMode // checkInterruptWhileWaiting():返回0=未中断,THROW_IE=被中断且没被signal,REINTERRUPT=被中断但已被signal if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; // 被中断了,跳出循环,准备处理中断 } // 步骤6:回到同步队列后,重新抢锁 // acquireQueued(node, savedState):就是AQS的抢锁逻辑(循环+阻塞),返回是否被中断 // 如果抢锁过程中被中断,且不是“signal后中断”,则标记为REINTERRUPT(后续补中断) if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; // 步骤7:清理条件队列的无效节点(GC优化) // unlinkCancelledWaiters():遍历条件队列,移除状态为CANCELLED的Node(比如超时/中断的节点) if (node.nextWaiter != null) unlinkCancelledWaiters(); // 步骤8:处理中断(根据interruptMode决定抛异常还是补标记) // reportInterruptAfterWait():THROW_IE抛InterruptedException,REINTERRUPT调用selfInterrupt()补标记 if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }具体的流程可以总结一下
- 先判断调用await() 前是否就已经被中断了,如果中断就直接抛异常
- 将当前节点包装成ConditionNode并将状态记录为CONDITION表示等待条件满足,然后该节点会塞到链表尾部
- 将当前节点拥有的锁全部释放,避免其它线程拿不到锁导致死锁现象
- 初始化几个变量,记录在这个过程中是否出现了中断以及中断的类型(后续是补一下补丁就行还是抛异常)
- (循环阻塞)开一个循环进行检查,如果发现当前节点在同步队列中,说明条件满足或是已经出现中断,这两种情况都会跳出循环,否则就继续通过park方法进行阻塞(阻塞的线程只能通过 signal()/中断 唤醒)
- 跳出循环后就会尝试去抢锁,如果出现不需要抛异常的中断就会记录需要打补丁
- 遍历清理掉条件队列中状态为CANCELLED的Node(比如超时/中断的节点)
- 根据上述过程中记录的中断判断是需要打补丁还是抛异常
拆解 signal() 方法: 把条件队列的第一个有效线程,移回同步队列,唤醒它让它抢锁
public final void signal() { // 步骤1:检查权限(必须持有独占锁才能唤醒,否则抛异常) // isHeldExclusively():AQS的方法,子类实现(比如ReentrantLock判断当前线程是否持有锁) // 为什么要检查?防止没拿锁的线程乱唤醒,导致锁语义混乱 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); // 步骤2:拿到条件队列的头节点(第一个等条件的线程) Node first = firstWaiter; if (first != null) doSignal(first); // 真正执行唤醒逻辑 } private void doSignal(Node first) { do { // 步骤3:移除条件队列的头节点(更新firstWaiter) // 如果移除后队列为空,lastWaiter也置null if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; // 断开当前节点和条件队列的关联 // 步骤4:把节点移到同步队列(transferForSignal()是核心) // transferForSignal返回false → 节点已取消,换下一个节点重试 } while (!transferForSignal(first) && (first = firstWaiter) != null); }具体的流程可以总结一下:
- 先检查当前线程是否拿到锁了,没拿到直接抛出异常,避免没拿锁的线程乱唤醒
- 将条件队列的头节点拿出来执行doSignal() 进行唤醒
- doSignal() 的逻辑就是将条件队列的头节点移除,如果移除完发现整个条件队列空了,就将lastWaiter也置null
- 执行transferForSignal() ,从头节点开始尝试将一个节点放到同步队列中,如果失败就继续下一个(因为有些接节点可能已经被取消了,但是来不及清除)
常见问题:
为什么await()要用 while 循环判断isOnSyncQueue(node)?
- 防止 “虚假唤醒”:线程可能被中断、超时等非 signal 原因唤醒,此时节点还没到同步队列,必须重新 park;
- 保证语义:只有节点到了同步队列,才能去抢锁,否则继续等。
fullyRelease(node)为什么要 “完全释放” 锁?
- 如果是重入锁(比如线程拿了 3 次锁,state=3),必须把 state 置 0,否则其他线程永远拿不到锁;
- 释放失败会标记节点为 CANCELLED,避免线程一直等。
条件队列是单向链表,同步队列是双向链表,为什么?
- 条件队列:只需要 “从头取、从尾加”,单向链表足够,且节省内存;
- 同步队列:需要频繁移除取消的节点(中断 / 超时),双向链表通过 prev 指针能快速找到前驱,O (1) 移除,效率更高。
signal()为什么只唤醒头节点,不是所有节点?
- 避免 “惊群效应”:唤醒所有线程会导致多个线程同时抢锁,只有一个能拿到,其余继续阻塞,浪费 CPU;
- 符合 FIFO 语义:条件队列也是先来后到,唤醒头节点最公平。
扩展:
条件队列的常见用法:基本不用自己去造,直接基于继承了AQS的类去实现就行
class ConditionObjectUsage { // 1. 定义基于AQS的独占锁(ReentrantLock底层就是AQS) private final ReentrantLock lock = new ReentrantLock(); // 2. 获取ConditionObject实例(lock.newCondition()直接返回ConditionObject) private final Condition condition = lock.newCondition(); // 业务条件(比如“订单是否支付成功”) private boolean isPaid = false; // ===== 线程1:等待条件满足 ===== public void waitForPayment() throws InterruptedException { // 第一步:必须先获取锁(调用await()前必须持有锁,否则抛异常) lock.lock(); try { // 第二步:循环判断条件(避免虚假唤醒,绝对不能用if) while (!isPaid) { // 条件不满足 → 释放锁,进入ConditionObject的条件队列等待 condition.await(); // 这里相当于阻塞住了 } // 第三步:条件满足,执行业务逻辑 System.out.println("订单已支付,执行发货逻辑"); } finally { // 第四步:释放锁(必须在finally,防止异常导致锁泄漏) lock.unlock(); } } // ===== 线程2:满足条件后唤醒等待线程 ===== public void signalPayment() { lock.lock(); try { // 第一步:修改业务条件(先满足条件,再唤醒) isPaid = true; // 第二步:唤醒条件队列中第一个等待的线程(底层调用ConditionObject.signal()) condition.signal(); // 如果要唤醒所有线程,用condition.signalAll(); } finally { lock.unlock(); } } // 测试 public static void main(String[] args) throws InterruptedException { ConditionObjectUsage demo = new ConditionObjectUsage(); // 启动线程1:等待支付 new Thread(() -> { try { demo.waitForPayment(); } catch (Exception e) { e.printStackTrace(); } }).start(); // 模拟1秒后支付完成 Thread.sleep(1000); // 启动线程2:唤醒等待线程 new Thread(demo::signalPayment).start(); } }到这里相信你已经能看懂了
说白了无论是哪个AQS的实现类,底层其实都是只有一把锁,就是基于AQS的statue实现的
那么这里的ReentrantLock也不例外,其实就是两个线程来回的抢锁并进行操作
线程1抢到之后发现自己的条件不满足,只能调用condition.await();释放锁的同时将当前线程阻塞住并塞入条件队列
条件2抢到锁之后会修改条件,并唤醒一个条件队列的线程,再释放锁
在线程1中虽然有两步能释放锁,但是没办法同时执行,因为执行了condition.await();释放锁的同时线程也被阻塞住了,不会向下执行,如果没执行到这个方法的,则通过lock.unlock();去释放
然后可能有些人还是会疑惑,同步队列和条件队列存储哪些线程?
同步队列存储那些抢锁失败的,条件队列存储抢锁成功但是不满足条件
也就是有两条路:
首先无论是新来的节点还是在同步队列中被唤醒的节点,它第一步会尝试CAS获取锁,如果获取成功,就判断是否满足条件,满足就执行逻辑,不满足就放到条件队列中
然后获取失败是存储到同步队列中
注意:想要存储到条件队列的前提是获取锁成功+条件不满足+不在同步队列中
不过只要你是获取锁成功的一定不会在同步队列中,即使原本在也会被移除