【Java 开发日记】我们来说一下无锁队列 Disruptor 的原理

【Java 开发日记】我们来说一下无锁队列 Disruptor 的原理

目录

一、为什么需要 Disruptor?—— 背景与问题

二、核心设计思想

三、核心组件与原理

1. 环形缓冲区(Ring Buffer)

2. 序列(Sequence)

3. 序列屏障(Sequence Barrier)

4. 等待策略(Wait Strategy)

5. 事件处理器(EventProcessor)

6. 生产者(Producer)

四、工作流程示例(单生产者 -> 单消费者)

五、多消费者与依赖关系

六、总结:Disruptor 高性能的秘诀


一、为什么需要 Disruptor?—— 背景与问题

在高并发编程中,传统的队列(如 java.util.concurrent.ArrayBlockingQueue 或 LinkedBlockingQueue)在高性能场景下会成为瓶颈,主要问题在于:

  1. 锁竞争:生产者和消费者之间使用同一把锁(或读写锁),导致线程频繁挂起、唤醒,上下文切换开销巨大。
  2. 伪共享:多个线程修改的、逻辑上独立但物理上相邻的变量,会因 CPU 缓存行的同步而导致性能急剧下降。
  3. 内存分配开销:对于链表结构的队列,每次入队出队都可能涉及节点对象的创建和垃圾回收,在高吞吐下 GC 压力大。
  4. 低效的遍历:队列的“头出尾入”设计,使得遍历和批量操作不够高效。

Disruptor 的目标就是解决这些问题,实现极低延迟、超高吞吐的线程间数据交换。

二、核心设计思想

Disruptor 不是一个传统意义上的 FIFO 队列,而是一个 基于数组的环形缓冲区(Ring Buffer) 。它的核心设计思想可以概括为以下几点:

1. 环形数组结构

  • 使用一个固定大小的数组预先分配所有内存,避免运行时动态内存分配。
  • 数组元素(Event)在初始化时就全部创建好,并被重复使用。这消除了 GC 压力。
  • 通过取模运算(实际是高效的位运算,要求数组大小为2的幂次)实现环形覆盖,指针无限递增,永不回收。

2. 无锁设计

  • 核心操作(生产与消费)完全无锁(Lock-Free),通过内存屏障(Memory Barrier) 和 CAS(Compare-And-Swap) 操作实现线程安全。
  • 生产者之间通过 CAS 竞争下一个可写的槽位。
  • 生产者和消费者之间通过序列(Sequence) 的协调来工作,消费者通过等待策略(Wait Strategy) 来感知新数据的到来。

3. 消除伪共享(Cache Line Padding)

  • 识别出会被多个线程频繁写入的关键变量(如生产者的 cursor,各个消费者的 Sequence)。
  • 通过在这些变量前后添加无意义的填充字节(padding),确保每个核心变量独占一个完整的 CPU 缓存行(通常为64字节),防止它们被意外地加载到同一个缓存行中,从而避免一个线程的写入使另一个线程的整个缓存行失效。

4. 批量与依赖关系

  • 支持批量处理事件,能极大提高吞吐量。
  • 可以显式地构建消费者之间的依赖关系图(如 A->B->C 或 A,B 都完成 -> C),实现高效的工作流。

三、核心组件与原理

1. 环形缓冲区(Ring Buffer)

这是 Disruptor 的物理存储核心。它是一个固定大小的 Object[] 数组。每个位置被称为一个“槽”(slot)。

  • size:必须是2的幂次(如 1024)。这样 sequence % size 可以通过 sequence & (size - 1) 位运算高效完成。
  • cursor:生产者发布事件的序列号。它代表最后成功发布的事件的位置。这是一个 Sequence 对象
  • 缓冲区本身不维护“头”和“尾”指针,头和尾的概念由生产者和消费者的 Sequence 共同决定。
2. 序列(Sequence)

Disruptor 的灵魂。它是一个使用 padding 封装的长整型(long)值。

  • 所有需要追踪进度的组件都有自己的 Sequence
    • Ring Buffer 有 cursor(一个 Sequence)。
    • 每个 EventProcessor(消费者)有自己的 Sequence,表示自己已处理完成的位置。
    • 每个 Producer(如果是多生产者)也有自己的 Sequence
  • Sequence 的值单调递增,代表对应组件在环形缓冲区中的位置。
  • 通过比较不同 Sequence 的值,就能知道生产和消费的进度关系。
3. 序列屏障(Sequence Barrier)

消费者用来协调工作、控制进度的核心工具。

  • 它持有:
    1. 生产者(或上游消费者)的 cursor 引用。
    2. 所有它所依赖的消费者的 Sequence 引用(用于构建依赖图)。
  • 当一个消费者想要消费事件时,它会询问它的 SequenceBarrier:“我可以安全消费的下一个事件是什么?”
  • SequenceBarrier 的逻辑是:返回 min(生产者cursor, 所有依赖的消费者的Sequence) 。这确保了消费者不会超越其依赖者,从而实现了无锁的有序消费
4. 等待策略(Wait Strategy)

定义了消费者如何等待新事件的到来。这是影响延迟和 CPU 占用的关键。

  • BlockingWaitStrategy:使用锁和条件变量。最节省CPU,但延迟最高。适用于异步日志等场景。
  • SleepingWaitStrategy:先自旋,后 Thread.yield(),最后使用 LockSupport.parkNanos(1)。平衡延迟和CPU。
  • YieldingWaitStrategy:先自旋100次,然后调用 Thread.yield()。延迟低,但会占用较多CPU。适用于要求极高吞吐、线程数小于CPU核心数的场景。
  • BusySpinWaitStrategy:纯自旋。延迟最低,但疯狂消耗CPU。必须在绑定核心、线程数少于物理核心数的场景下使用。
5. 事件处理器(EventProcessor)

消费者的执行体。通常指 BatchEventProcessor

  • 它是一个线程,其 run() 方法内部是一个循环:
    1. 通过 SequenceBarrier.waitFor(nextSequence) 等待自己可用的最大 nextSequence
    2. 获取到 availableSequence 后,从自己的当前 sequence 到 availableSequence 批量处理事件。
    3. 调用 EventHandler.onEvent() 处理每个事件。
    4. 处理完毕后,更新自己的消费者 Sequence 值。
6. 生产者(Producer)

负责向 Ring Buffer 发布事件。分为单生产者(Single Producer) 和多生产者(Multi Producer) 两种模式。

  • 发布过程(两阶段提交)
    1. 申请空间(Claim)
      • 单生产者:直接 nextSequence = cursor + 1(无竞争,无需CAS)。
      • 多生产者:通过 CAS 操作竞争递增一个 nextSequence
    2. 发布(Publish)
      • 生产者将数据写入 nextSequence 对应的 slot
      • 写入完成后,必须调用 RingBuffer.publish(sequence)
      • publish 方法会先添加内存屏障(store-store barrier,确保数据写入先于 cursor 更新),然后将 cursor 更新到 sequence
      • cursor 的更新会通知所有在 SequenceBarrier 上等待的消费者。

四、工作流程示例(单生产者 -> 单消费者)

  1. 初始化
    • Ring Buffer 大小为 8,cursor = -1
    • 消费者 Sequence = -1
  2. 生产者发布事件
    • 生产者需要发布事件 A。它申请下一个位置:next = cursor + 1 = 0
    • 它将事件 A 的数据写入 RingBuffer[0 & 7],即 RingBuffer[0]
    • 写入完成后,调用 publish(0),更新 cursor = 0
  3. 消费者消费事件
    • 消费者线程(BatchEventProcessor)在循环中调用 SequenceBarrier.waitFor(0)
    • SequenceBarrier 发现 cursor (0) >= 0,且没有依赖者,于是返回 availableSequence = 0
    • 消费者知道自己当前的 sequence (-1) < availableSequence (0),于是处理 RingBuffer[0] 的事件 A
    • 处理完成后,将自己的 Sequence 更新为 0
  4. 循环继续:生产者发布事件 B 到 slot 1,更新 cursor=1。消费者等待并处理,如此往复。

五、多消费者与依赖关系

这是 Disruptor 最强大的部分。例如,我们有三个消费者:C1(数据持久化),C2(数据统计),C3(发送消息,必须在 C1 和 C2 都完成后进行)。

  1. C3 的 SequenceBarrier 会持有 RingBuffer.cursorC1.sequence 和 C2.sequence
  2. 当 C3 调用 waitFor 时,SequenceBarrier 返回的是 min(生产者cursor, C1.sequence, C2.sequence)
  3. 这意味着,即使生产者已经发布了事件 10,但只要 C1 才处理到 5C3 最多也只能拿到 5。这样就保证了 C3 不会跑到 C1 前面去,完全无锁地实现了依赖

构建依赖图

RingBuffer -> C1 -> C2 -> C3 (依赖 C1 和 C2)

六、总结:Disruptor 高性能的秘诀

  1. 预分配内存,消除GC:环形数组 + 对象复用。
  2. 无锁并发:CAS + 内存屏障,取代重量级锁。
  3. 消除伪共享:对关键序列进行缓存行填充。
  4. 批量处理:一次等待,处理多个事件,摊薄开销。
  5. 依赖关系感知:通过序列比较实现无锁的消费者协调,避免了“线程间握手”的开销。
  6. 关注点分离:将并发控制(Sequence, Barrier)、等待逻辑(WaitStrategy)、业务处理(EventHandler)清晰地解耦。

Disruptor 本质上是一种精心设计的内存队列,它将共享变量的数量降到最低(核心就是那几个 Sequence),并通过硬件友好的方式(缓存行填充、内存屏障)来操作它们,从而在软件层面最大限度地压榨出现代 CPU 和内存子系统的性能。它特别适用于金融交易、高频计算、事件溯源等对延迟和吞吐有极端要求的领域。

如果小假的内容对你有帮助,请点赞评论收藏。创作不易,大家的支持就是我坚持下去的动力!

Read more

Flutter 组件 cleany 适配鸿蒙 HarmonyOS 实战:自动化清理矩阵,构建复杂应用的状态闭环与资源防腐架构

Flutter 组件 cleany 适配鸿蒙 HarmonyOS 实战:自动化清理矩阵,构建复杂应用的状态闭环与资源防腐架构

欢迎加入开源鸿蒙跨平台社区:https://openharmonycrossplatform.ZEEKLOG.net Flutter 组件 cleany 适配鸿蒙 HarmonyOS 实战:自动化清理矩阵,构建复杂应用的状态闭环与资源防腐架构 前言 在鸿蒙(OpenHarmony)生态迈向多任务并行、长周期驻留及高频账户流转的全场景办公与生活背景下,如何确保应用在退出登录、环境切换或异常恢复时能够“不留痕迹”地销毁脏数据,已成为衡量应用健壮性的核心指标。在鸿蒙设备这类强调分布式沙箱隔离与严苛内存占用(Resident Set Size)管控的环境下,如果应用缺乏统一的资源清理机制,由于由于散落在各处的 Stream 监听、本地缓存及内存单例,极易由于由于状态残留导致不同用户间的数据越权或 UI 状态的逻辑死锁。 我们需要一种能够集中注册清理任务、支持并发异步销毁且具备原子性执行保障的状态复位框架。 cleany 为 Flutter 开发者引入了极其暴力且高效的“全域清算”范式。它通过中心化的管理器(Manager),允许各个业务模块在初始化时注册其对应的资源回收钩子。在适

By Ne0inhk
【Linux】Linux 开发必备:信号处理时机 + 中断向量表 + 系统调用表,内核态切换核心知识点

【Linux】Linux 开发必备:信号处理时机 + 中断向量表 + 系统调用表,内核态切换核心知识点

前言:欢迎各位光临本博客,这里小编带你直接手撕**,文章并不复杂,愿诸君**耐其心性,忘却杂尘,道有所长!!!! IF’Maxue:个人主页  🔥 个人专栏: 《C语言》 《C++深度学习》 《Linux》 《数据结构》 《数学建模》 ⛺️生活是默默的坚持,毅力是永久的享受。不破不立! 文章目录 * Linux信号处理时机与内核态/用户态深度解析 * 一、信号处理的“合适时机”是什么? * 合适的实际? * 结合课件 * 1.1 信号处理的触发时机 * 1.2 信号检查与处理流程 * 1.3 不同处理动作的差异 * 1.4 自定义捕捉的身份切换细节 * 二、硬件中断:OS运行的“驱动力” * 2.1 硬件中断的基本原理 * 2.

By Ne0inhk
Flutter 三方库 df_generate_dart_models_core 的鸿蒙化适配指南 - 实现自动化的数据模型代码生成、支持 JSON 反序列化模板定义与工程化规范一致性

Flutter 三方库 df_generate_dart_models_core 的鸿蒙化适配指南 - 实现自动化的数据模型代码生成、支持 JSON 反序列化模板定义与工程化规范一致性

欢迎加入开源鸿蒙跨平台社区:https://openharmonycrossplatform.ZEEKLOG.net Flutter 三方库 df_generate_dart_models_core 的鸿蒙化适配指南 - 实现自动化的数据模型代码生成、支持 JSON 反序列化模板定义与工程化规范一致性 前言 在进行 Flutter for OpenHarmony 的大规模业务逻辑开发时,手动编写海量的 Data Models(POJO/Entity)以及配套的 fromJson/toJson 方法不仅枯燥乏味,还极易引入手写错误。df_generate_dart_models_core 是一个强大的代码生成核心库,它能将原始 JSON 样本或 Schema 自动转化为符合 Dart 规范的数据类代码。本文将指导大家如何将该库集成到鸿蒙项目的工程化提效链路中。 一、原理解析

By Ne0inhk
Flutter 三方库 username_gen 的鸿蒙化适配指南 - 实现具备语义化特征的随机用户名自动化生成、支持端侧快速原型开发与测试数据模拟实战

Flutter 三方库 username_gen 的鸿蒙化适配指南 - 实现具备语义化特征的随机用户名自动化生成、支持端侧快速原型开发与测试数据模拟实战

欢迎加入开源鸿蒙跨平台社区:https://openharmonycrossplatform.ZEEKLOG.net Flutter 三方库 username_gen 的鸿蒙化适配指南 - 实现具备语义化特征的随机用户名自动化生成、支持端侧快速原型开发与测试数据模拟实战 前言 在进行 Flutter for OpenHarmony 的社交原型开发、内部压力测试或注册流程的兜底模拟时,如何快速产生大量、易读且不重复的用户名?手动硬编码 "test_user_1" 显然过于僵硬且不具备真实感。username_gen 是一款专注于基于形容词与名词组合建立“有趣”用户名的轻量级库。本文将探讨如何在鸿蒙端构建极致、敏捷的模拟数据填充体系。 一、原直观解析 / 概念介绍 1.1 基础原理 该库内置了一套精选的英文形容词库与名词库。通过洗牌算法(Shuffle)与自定义后缀注入逻辑,能在毫秒级产出符合 "AdjectiveNPC"

By Ne0inhk