aliexpress|并发编程实践之公平有界阻塞队列实现

aliexpress|并发编程实践之公平有界阻塞队列实现


一 背景 JUC 工具包是 JAVA 并发编程的利器 。
本文讲述在没有 JUC 工具包帮助下 , 借助原生的 JAVA 同步原语 如何实现一个公平有界的阻塞队列 。
希望你也能在文后体会到并发编程的复杂之处 , 以及 JUC 工具包的强大 。
二 方法 本文使用到的基本工具:



























































【aliexpress|并发编程实践之公平有界阻塞队列实现】
同步监听器 synchronized, 方法基本和代码块级别; Object 基础类的 wait notify notifyAll; 基于以上基础工具 , 实现公平有界的阻塞队列 , 此处: 将公平的定义限定为 FIFO, 也就是先阻塞等待的请求 , 先解除等待; 并不保证解除等待后执行 Action 的先后顺序; 确保队列的大小始终不超过设定的容量;但阻塞等待的请求数不做限制; 三 实现 1 基础版本 首先 , 考虑在非并发场景下 , 借助 ADT 实现一个基础版本 interface Queue { boolean offer(Object obj); Object poll();class FairnessBoundedBlockingQueue implements Queue { // 当前大小 protected int size; // 容量 protected final int capacity; // 头指针 , empty: head.next == tail == null protected Node head; // 尾指针 protected Node tail; public FairnessBoundedBlockingQueue(int capacity) { this.capacity = capacity; this.head = new Node(null); this.tail = head; this.size = 0;// 如果队列已满 , 通过返回值标识 public boolean offer(Object obj) { if (sizecapacity) { Node node = new Node(obj); tail.next = node; tail = node; ++size; return true;return false;// 如果队列为空 , head.next == null;返回空元素 public Object poll() { if (head.next != null) { Object result = head.next.value; head.next.value = https://mparticle.uc.cn/api/null; head = head.next; // 丢弃头结点 --size; return resu return null;class Node { Object value; Node next; Node(Object obj) { this.value = obj; next = null;以上 定义支持队列的两个基础接口 ,poll 和 offer; 队列的实现 , 采用经典实现; 考虑在队列空的情况下 ,poll 返回为空 , 非阻塞; 队列在满的情况下 ,offer 返回 false, 入队不成功 , 无异常; 需要注意的一点:在出队时 , 本文通过迁移头结点的方式实现 , 避免修改尾结点 。 在下文实现并发版本时 , 会看到此处的用意 。2 并发版本 如果在并发场景下 , 上述的实现面临一些问题 , 同时未实现给定的一些需求 。通过添加 synchronized, 保证并发条件下的线程安全问题 。注意此处做同步的原因是为了保证类的不变式 。并发问题 在并发场景下 , 基础版本的实现面临的问题包括:原子性 , 可见性和指令重排的问题 。参考 JMM 的相关描述 。并发问题 , 最简单的解决方法是:通过 synchronized 加锁 , 一次性解决问题 。// 省略接口定义class BoundedBlockingQueue implements Queue { // 当前大小 protected int size; // 容量 protected final int capacity; // 头指针 , empty: head.next == tail == null protected Node head; // 尾指针 protected Node tail; public BoundedBlockingQueue(int capacity) { this.capacity = capacity; this.head = new Node(null); this.tail = head; this.size = 0;// 如果队列已满 , 通过返回值标识 public synchronized boolean offer(Object obj) { if (sizecapacity) { Node node = new Node(obj); tail.next = node; tail = node; ++size; return true;return false;// 如果队列为空 , head.next == null;返回空元素 public synchronized Object poll() { if (head.next != null) { Object result = head.next.value; head.next.value = null; head = head.next; // 丢弃头结点 --size; return resu return null;// 省略 Node 的定义 以上 , 简单粗暴的加 synchronized 可以解决问题 , 但会引入新的问题:系统活性问题(此问题下文会解决) 。同时 , 简单加 synchronized 同步是无法实现阻塞等待;即 如果队列为空 , 那么出队的动作还是会立即返回 , 返回为空; 如果队列已满 , 那么入队动作还是会立即返回 , 返回操作不成功; 实现阻塞等待 , 需要借助 JAVA 中的 PV 原语:wait notify notifyAll。参考:JDK 中对 wait notify notifyAll 的相关描述 。卫式方法 阻塞等待 , 可以通过简单的卫式方法来实现 , 此问题本质上可以抽象为: 任何一个方法都需要在满足一定条件下才可以执行; 执行方法前需要首先校验不变式 , 然后执行变更; 在执行完成后 , 校验是否满足后验不变式; WHEN(condition) Object action(Object arg) { checkPreCondition(); doAction(arg); checkPostCondition(); 此种抽象 Ada 在语言层面上实现 。 在 JAVA 中 , 借助 wait notify notifyAll 可以翻译为: // 当前线程synchronized Object action(Object arg) { while(!condition) { wait();// 前置条件 , 不变式 checkPreCondition(); doAction(); // 后置条件 , 不变式 checkPostCondition();// 其他线程synchronized Object notifyAction(Object arg) { notifyAll(); 需要注意: 通常会采用 notifyAll 发送通知 , 而非 notify ;因为如果当前线程收到 notify 通知后被中断 , 那么系统将一直等待下去 。如果使用了 notifyAll 那么卫式语句必须放在 while 循环中;因为线程唤醒后 , 执行条件已经不满足 , 虽然当前线程持有互斥锁 。卫式条件的所有变量 , 有任何变更都需要发送 notifyAll 不然面临系统活性问题 据此 , 不难实现简单的阻塞版本的有界队列 , 如下 interface Queue { boolean offer(Object obj) throws InterruptedException; Object poll() throws InterruptedException;class FairnessBoundedBlockingQueue implements Queue { // 当前大小 protected int size; // 容量 protected final int capacity; // 头指针 , empty: head.next == tail == null protected Node head; // 尾指针 protected Node tail; public FairnessBoundedBlockingQueue(int capacity) { this.capacity = capacity; this.head = new Node(null); this.tail = head; this.size = 0;// 如果队列已满 , 通过返回值标识 public synchronized boolean offer(Object obj) throws InterruptedException { while (sizecapacity) { wait();Node node = new Node(obj); tail.next = node; tail = node; ++size; notifyAll(); // 可以出队 return true;// 如果队列为空 , 阻塞等待 public synchronized Object poll() throws InterruptedException { while (head.next == null) { wait();Object result = head.next.value; head.next.value = null; head = head.next; // 丢弃头结点 --size; notifyAll(); // 可以入队 return resu // 省略 Node 的定义 以上 , 实现了阻塞等待 , 但也引入了更大的性能问题 入队和出队动作阻塞等待同一把锁 , 恶性竞争; 当队列变更时 , 所有阻塞线程被唤醒 , 大量的线程上下文切换 , 竞争同步锁 , 最终可能只有一个线程能执行; 需要注意的点: 阻塞等待 wait 会抛出中断异常 。 关于异常的问题下文会处理; 接口需要支持抛出中断异常; 队里变更需要 notifyAll 避免线程中断或异常 , 丢失消息; 3 锁拆分优化 以上第一个问题 , 可以通过锁拆分来解决 , 即:定义两把锁 , 读锁和写锁;读写分离 。// 省略接口定义class FairnessBoundedBlockingQueue implements Queue { // 容量 protected final int capacity; // 头指针 , empty: head.next == tail == null protected Node head; // 尾指针 protected Node tail; // guard: canPollCount head protected final Object pollLock = new Object(); protected int canPollCount; // guard: canOfferCount tail protected final Object offerLock = new Object(); protected int canOfferCount; public FairnessBoundedBlockingQueue(int capacity) { this.capacity = capacity; this.canPollCount = 0; this.canOfferCount = capacity; this.head = new Node(null); this.tail = head;// 如果队列已满 , 通过返回值标识 public boolean offer(Object obj) throws InterruptedException { synchronized(offerLock) { while(canOfferCount= 0) { offerLock.wait();Node node = new Node(obj); tail.next = node; tail = node; canOfferCount--;synchronized(pollLock) { ++canPollCount; pollLock.notifyAll();return true;// 如果队列为空 , 阻塞等待 public Object poll() throws InterruptedException { Object result = null; synchronized(pollLock) { while(canPollCount= 0) { pollLock.wait();result = head.next.value; head.next.value = null; head = head.next; canPollCount--;synchronized(offerLock) { canOfferCount++; offerLock.notifyAll();return resu // 省略 Node 定义 以上 定义了两把锁 ,pollLock 和 offerLock 拆分出队和入队竞争; 入队锁同步的变量为:callOfferCount 和 tail; 出队锁同步的变量为:canPollCount 和 head; 出队的动作:首先拿到 pollLock 卫式等待后 , 完成出队动作;然后拿到 offerLock 发送通知 , 解除入队的等待线程 。入队的动作:首先拿到 offerLock 卫式等待后 , 完成入队的动作;然后拿到 pollLock 发送通知 , 解除出队的等待线程 。以上实现 确保通过入队锁和出队锁 , 分别保证入队和出队的原子性; 出队动作 , 通过特别的实现 , 确保出队只会变更 head, 避免获取 offerLock; 通过 offerLock.notifyAll 和 pollLock.notifyAll 解决读写竞争的问题; 但上述实现还有未解决的问题: 当有多个入队线程等待时 , 一次出队的动作会触发所有入队线程竞争 , 大量的线程上下文切换 , 最终只有一个线程能执行 。即 , 还有 读与读 和 写与写 之间的竞争问题 。4 状态追踪解除竞争 此处可以通过状态追踪 , 解除读与读之间和写与写之间的竞争问题 class FairnessBoundedBlockingQueue implements Queue { // 容量 protected final int capacity; // 头指针 , empty: head.next == tail == null protected Node head; // 尾指针 protected Node tail; // guard: canPollCount head protected final Object pollLock = new Object(); protected int canPollCount; protected int waitPollCount; // guard: canOfferCount tail protected final Object offerLock = new Object(); protected int canOfferCount; protected int waitOfferCount; public FairnessBoundedBlockingQueue(int capacity) { this.capacity = capacity; this.canPollCount = 0; this.canOfferCount = capacity; this.waitPollCount = 0; this.waitOfferCount = 0; this.head = new Node(null); this.tail = head;// 如果队列已满 , 通过返回值标识 public boolean offer(Object obj) throws InterruptedException { synchronized(offerLock) { while(canOfferCount= 0) { waitOfferCount++; offerLock.wait(); waitOfferCount--;Node node = new Node(obj); tail.next = node; tail = node; canOfferCount--;synchronized(pollLock) { ++canPollCount; if (waitPollCount0) { pollLock.notify();return true;// 如果队列为空 , 阻塞等待 public Object poll() throws InterruptedException { Object resusynchronized(pollLock) { while(canPollCount= 0) { waitPollCount++; pollLock.wait(); waitPollCount--;result = head.next.value; head.next.value = null; head = head.next; canPollCount--;synchronized(offerLock) { canOfferCount++; if (waitOfferCount0) { offerLock.notify();return resu // 省略 Node 的定义 以上 通过 waitOfferCount 和 waitPollCount 的状态追踪解决 读写内部的竞争问题; 当队列变更时 , 根据追踪的状态 , 决定是否派发消息 , 触发线程阻塞状态解除; 但 , 上述的实现在某些场景下会运行失败 , 面临活性问题 , 考虑 情况一: 初始状态队列为空 线程 A 执行出队动作 , 被阻塞在 pollLock此时 waitPollCount==1; 此时线程 A 在执行 wait 时被中断 , 抛出异常 ,waitPollCount==1 并未被重置; 阻塞队列为空 , 但 waitPollCount==1 类状态异常; 情况二: 初始状态队列为空 线程 A B 执行出队动作 , 被阻塞在 pollLock此时 waitPollCount==2; 线程 C 执行入队动作 , 可以立即执行 , 执行完成后 , 触发 pollLock 解除一个线程等待 notify; 触发的线程在 JVM 实现中是随机的 , 假设线程 A 被解除阻塞; 假设线程 A 在阻塞过程中已被中断 , 阻塞解除后 JVM 检查 interrupted 状态 , 抛出 InterruptedException 异常; 此时队列中有一个元素 , 但线程 A 仍阻塞在 pollLock 中 , 且一直阻塞下去; 以上为解除阻塞消息丢失的例子 , 问题的根源在与异常处理 。5 解决异常问题 解决线程中断退出的问题 , 线程校验中断状态的场景 JVM 通常只会在有限的几个场景检测线程的中断状态 ,wait Thread.join Thread.sleep; JVM 在检测到线程中断状态 Thread.interrupted() 后 , 会清除中断标志 , 抛出 InterruptedException; 通常为了保证线程对中断及时响应 ,run 方法中需要自主检测中断标志 , 中断线程 , 特别是对中断比较敏感需要保持类的不变式的场景; class FairnessBoundedBlockingQueue implements Queue { // 容量 protected final int capacity; // 头指针 , empty: head.next == tail == null protected Node head; // 尾指针 protected Node tail; // guard: canPollCount head waitPollCount protected final Object pollLock = new Object(); protected int canPollCount; protected int waitPollCount; // guard: canOfferCount tail waitOfferCount protected final Object offerLock = new Object(); protected int canOfferCount; protected int waitOfferCount; public FairnessBoundedBlockingQueue(int capacity) { this.capacity = capacity; this.canPollCount = 0; this.canOfferCount = capacity; this.waitPollCount = 0; this.waitOfferCount = 0; this.head = new Node(null); this.tail = head;// 如果队列已满 , 通过返回值标识 public boolean offer(Object obj) throws InterruptedException { if (Thread.interrupted()) { throw new InterruptedException(); // 线程已中断 , 直接退出即可 , 防止中断线程竞争锁synchronized(offerLock) { while(canOfferCount= 0) { waitOfferCount++; try { offerLock.wait();catch (InterruptedException e) { // 触发其他线程 offerLock.notify(); throw e;finally { waitOfferCount--;Node node = new Node(obj); tail.next = node; tail = node; canOfferCount--;synchronized(pollLock) { ++canPollCount; if (waitPollCount0) { pollLock.notify();return true;// 如果队列为空 , 阻塞等待 public Object poll() throws InterruptedException { if (Thread.interrupted()) { throw new InterruptedException();Object result = null; synchronized(pollLock) { while(canPollCount= 0) { waitPollCount++; try { pollLock.wait();catch (InterruptedException e) { pollLock.notify(); throw e;finally { waitPollCount--;result = head.next.value; head.next.value = 0; // ignore head; head = head.next; canPollCount--;synchronized(offerLock) { canOfferCount++; if (waitOfferCount0) { offerLock.notify();return resu // 省略 Node 的定义 以上 当等待线程中断退出时 , 捕获中断异常 , 通过 pollLock.notify 和 offerLock.notify 转发消息; 通过在 finally 中恢复状态追踪变量; 通过状态变量追踪可以解决读与读之间和写与写之间的锁竞争问题 。以下考虑如果解决读与读之间和写与写之间的公平性问题 。6 解决公平性 公平性的问题的解决需要将状态变量的追踪转换为:请求监视器追踪 。每个请求对应一个监视器; 通过内部维护一个 FIFO 队列 , 实现公平性; 在队列状态变更时 , 释放队列中的监视器; 以上逻辑可以统一抽象为 boolean needToWait;synchronized(this) { needToWait = calculateNeedToWait(); if (needToWait) { enqueue(monitor); // 请求对应的monitor if (needToWait) { monitor.doWait(); 需要注意 monitor.doWait() 需要在 this 的卫式语句之外 , 因为如果在内部 ,monitor.doWait 并不会释放 this锁; calculateNeedToWait() 需要在 this 的守卫之内完成 , 避免同步问题; 需要考虑中断异常的问题; 基于以上的逻辑抽象 , 实现公平队列 // 省略接口定义class FairnessBoundedBlockingQueue implements Queue { // 容量 protected final int capacity; // 头指针 , empty: head.next == tail == null protected Node head; // 尾指针 protected Node tail; // guard: canPollCount head pollQueue protected final Object pollLock = new Object(); protected int canPollCount; // guard: canOfferCount tail offerQueue protected final Object offerLock = new Object(); protected int canOfferCount; protected final WaitQueue pollQueue = new WaitQueue(); protected final WaitQueue offerQueue = new WaitQueue(); public FairnessBoundedBlockingQueue(int capacity) { this.capacity = capacity; this.canOfferCount = capacity; this.canPollCount = 0; this.head = new Node(null); this.tail = head;// 如果队列已满 , 通过返回值标识 public boolean offer(Object obj) throws InterruptedException { if (Thread.interrupted()) { throw new InterruptedException(); // 线程已中断 , 直接退出即可 , 防止中断线程竞争锁WaitNode wait = null; synchronized(offerLock) { // 在有阻塞请求或者队列为空时 , 阻塞等待 if (canOfferCount= 0 || !offerQueue.isEmpty()) { wait = new WaitNode(); offerQueue.enq(wait);else { // continue.try { if (wait != null) { wait.doWait();if (Thread.interrupted()) { throw new InterruptedException();catch (InterruptedException e) { offerQueue.doNotify(); throw e;// 确保此时线程状态正常 , 以下不会校验中断 synchronized(offerLock) { Node node = new Node(obj); tail.next = node; tail = node; canOfferCount--;synchronized(pollLock) { ++canPollCount; pollQueue.doNotify();return true;// 如果队列为空 , 阻塞等待 public Object poll() throws InterruptedException { if (Thread.interrupted()) { throw new InterruptedException();Object result = null; WaitNode wait = null; synchronized(pollLock) { // 在有阻塞请求或者队列为空时 , 阻塞等待 if (canPollCount= 0 || !pollQueue.isEmpty()) { wait = new WaitNode(); pollQueue.enq(wait);else { // ignoretry { if (wait != null) { wait.doWait();if (Thread.interrupted()) { throw new InterruptedException();catch (InterruptedException e) { // 传递消息 pollQueue.doNotify(); throw e;// 以下不会检测线程中断状态 synchronized(pollLock) { result = head.next.value; head.next.value = 0; // ignore head; head = head.next; canPollCount--;synchronized(offerLock) { canOfferCount++; offerQueue.doNotify();return resu class WaitQueue { WaitNode head; WaitNode tail; WaitQueue() { head = new WaitNode(); tail = head;synchronized void doNotify() { for(;;) { WaitNode node = deq(); if (node == null) { break;else if (node.doNotify()) { // 此处确保NOTIFY成功 break;else { // ignore and retry.synchronized boolean isEmpty() { return head.next == null;synchronized void enq(WaitNode node) { tail.next = node; tail = tail.next;synchronized WaitNode deq() { if (head.next == null) { return null;WaitNode res = head.next; head = head.next; if (head.next == null) { tail = head; // 为空 , 迁移tail节点return res;class WaitNode { boolean released; WaitNode next; WaitNode() { released = false; next = null;synchronized void doWait() throws InterruptedException { try { while (!released) { wait();catch (InterruptedException e) { if (!released) { released = true; throw e;else { // 如果是NOTIFY之后收到中断的信号 , 不能抛出异常;需要做RELAY处理 Thread.currentThread().interrupt();synchronized boolean doNotify() { if (!released) { released = true; notify(); // 明确释放了一个线程 , 返回true return true;else { // 没有释放新的线程 , 返回false return false;// 省略 Node 的定义 以上 核心是替换状态追踪变量为同步节点 ,WaitNode; WaitNode 通过简单的同步队列组织实现 FIFO 协议 , 每个线程等待各自的 WaitNode 监视器; WaitNode 内部维持 released 状态 , 标识线程阻塞状态是否被释放 , 主要是为了处理中断的问题; WaitQueue 本身是全同步的 , 由于已解决了读写竞争已经读写内部竞争的问题 ,WaitQueue 同步并不会造成问题; WaitQueue 是无界队列 , 是一个潜在的问题;但由于其只做同步的追踪 , 而且追踪的通常是线程 , 通常并不是问题; 最终的公平有界队列实现 , 无论是入队还是出队 , 首先卫式语句判定是否需要入队等待 , 如果入队等待 , 通过公平性协议等待; 当信号释放时 , 借助读写锁同步更新队列;最后同样借助读写锁 , 触发队列更新消息; 7 等待时间的问题 并发场景下 , 等待通常会设置为限时等待 TIMED_WAITING, 避免死锁或损失系统活性; 实现同步队列的限时等待 , 并没想象的那么困难 class TimeoutException extends InterruptedException {class WaitNode { boolean released; WaitNode next; WaitNode() { released = false; next = null;synchronized void doWait(long milliSeconds) throws InterruptedException { try { long startTime = System.currentTimeMillis(); long toWait = milliSeconds; for (;;) { wait(toWait); if (released) { return;long now = System.currentTimeMillis(); toWait = toWait - (now - startTime); if (toWait= 0) { throw new TimeoutException();catch (InterruptedException e) { if (!released) { released = true; throw e;else { // 如果已经释放信号量 , 此处不抛出异常;但恢复中断状态 Thread.currentThread().interrupt();synchronized boolean doNotify() { if (!released) { released = true; notify(); return true;else { return false;由于所有的等待都阻塞在 WaitNode 监视器 , 以上 首先定义超时异常 , 此处只是为了方便异常处理 , 继承 InterruptedException; 此处依赖于 wait(long timeout) 的超时等待实现 , 这通常不是问题; 最后 , 将 WaitNode 超时等待的逻辑 , 带入到 FairnessBoundedBlockingQueue 实现中 , 即可 。四 总结 本文通过一步步迭代 , 最终借助 JAVA 同步原语实现初版的公平有界队列 。 迭代实现过程中可以看到以下几点: 观念的转变 , 将调用一个类的方法思维转换为:在满足一定条件下方法才可以调用 , 在调用前需要满足不变式 , 调用后满足不变式;由于并发的问题很难测试 , 通常要采用卫式表达证明并发的正确性; 在迭代实现中会看到很多模式 , 比如 , 读写分离时 , 其实可以抽象为读锁和写锁;就得到了一个抽象的 Lock 的定义;比如 , 读写状态追踪 , 可以采用 Exchanger 抽象表达; 另外 , 本文的实现远非完善 , 还需要考虑支持 Iterator 遍历、状态查询及数据迁移等操作; 最后 , 相信大家再看 JUC 的工具包实现 , 定有不一样的体会 。作者 | 李新然 原文链接:https://developer.aliyun.com/article/800322 本文为阿里云原创内容 , 未经允许不得转载 。