跳到主要内容

阻塞队列

BlockingQueue 是并发编程中最常用的组件之一,能够在队列满/空时自动阻塞生产者或消费者,为生产者-消费者模型提供了天然的“背压”能力。线程池、任务调度、异步日志等场景都离不开它。


接口概览 🧾

BlockingQueue 定义了 4 组典型操作(抛异常、返回特殊值、阻塞、超时):

操作队列满时队列空时
add(e) / remove() / element()抛异常(IllegalStateException抛异常(NoSuchElementException
offer(e) / poll() / peek()返回 false / null返回 null
put(e) / take()阻塞直至有空间阻塞直至有元素
offer(e, timeout) / poll(timeout)等待指定时间后返回 false等待指定时间后返回 null

小技巧:生产者使用 offer 可避免永久阻塞;消费者使用 poll(timeout) 可实现“软超时”策略。


常见实现 🌟

队列底层结构是否有界特点应用
ArrayBlockingQueue数组 + ReentrantLock有界先进先出,支持公平锁,入出共用一把锁固定容量任务池
LinkedBlockingQueue链表 + Lock默认无界(可指定)入出两把锁,吞吐高但内存分散线程池默认队列
SynchronousQueue无容量零容量,交接点put 必须被 take 配对,支持公平/非公平模式newCachedThreadPool
PriorityBlockingQueue二叉堆无界按优先级弹出,元素需 Comparable优先任务调度
DelayQueue优先队列 + Delayed无界元素到期才能取,用于延迟任务定时器、订单超时
LinkedTransferQueue链表 + CAS无界支持直接传递/阻塞/非阻塞,吞吐极高高性能事件分发
LinkedBlockingDeque双端链表可指定容量双端阻塞,支持双向操作工作窃取(粗粒度)

常用场景 🛠️

  1. 线程池ThreadPoolExecutor 通过 BlockingQueue 管理任务。
  2. 异步日志:生产者写入队列,单消费者写磁盘,削峰填谷。
  3. 秒杀下单:阻塞队列限制瞬时请求量,保护数据库。
  4. 定时任务DelayQueue 存放延迟任务,消费者按到期顺序执行。
  5. 限速器:结合 ArrayBlockingQueue 提前放入令牌,消费者 take 令牌后才执行业务,实现漏桶算法。

示例代码 ✏️

BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(100);

// 生产者
executor.submit(() -> {
queue.put(() -> System.out.println("task"));
});

// 消费者
new Thread(() -> {
while (true) {
Runnable job = queue.take(); // 阻塞等待
job.run();
}
}).start();

DelayQueue 使用

class DelayTask implements Delayed {
private final long triggerTime;
private final String payload;

DelayTask(long delayMillis, String payload) {
this.triggerTime = System.currentTimeMillis() + delayMillis;
this.payload = payload;
}

@Override
public long getDelay(TimeUnit unit) {
long diff = triggerTime - System.currentTimeMillis();
return unit.convert(diff, TimeUnit.MILLISECONDS);
}

@Override
public int compareTo(Delayed o) {
return Long.compare(this.triggerTime, ((DelayTask) o).triggerTime);
}
}

DelayQueue<DelayTask> delayQueue = new DelayQueue<>();
delayQueue.put(new DelayTask(5000, "expire order"));
DelayTask task = delayQueue.take(); // 到期后返回

选型建议 ✅

  • 固定容量、均衡读写ArrayBlockingQueue
  • 大量数据、入出解耦LinkedBlockingQueue
  • 任务必须即时交付SynchronousQueue
  • 任务优先级PriorityBlockingQueue
  • 延迟/定时DelayQueue
  • 双端操作LinkedBlockingDeque
  • 需要直接交付/TransferLinkedTransferQueue
  • 需要批量 drainLinkedBlockingQueue 提供 drainTo(Collection, n)

面试常问 🎤

  1. SynchronousQueue 为什么容量为 0?
    它强调“交接”语义,生产者必须等消费者就绪才能完成交换,非常适合任务直接“交给”工作线程的模型。

  2. 线程池默认使用哪个队列?
    Executors.newFixedThreadPool / newSingleThreadExecutor 使用 LinkedBlockingQueuenewCachedThreadPool 使用 SynchronousQueue

  3. 阻塞队列如何实现阻塞?
    通过 ReentrantLock + Condition 控制线程在队列满/空时 await(),当入队/出队后调用 signal() 唤醒等待线程;LinkedBlockingQueue 使用两把锁和两个条件,降低竞争。

  4. DelayQueue 如何确定元素出队顺序?
    元素必须实现 Delayed 接口,getDelay() 返回剩余时间,内部 PriorityQueue 以最早到期的元素为堆顶,未到期时 take() 会阻塞。

  5. ArrayBlockingQueue 和 LinkedBlockingQueue 谁更高效?
    小数据量、频繁操作时数组实现缓存命中高,延迟低;链表实现使用两把锁,吞吐更大且容量可控,但节点对象多、GC 压力大。

  6. 如何避免阻塞队列“撑爆”内存?
    优先选择有界队列并结合拒绝策略;对无界队列要监控长度,超阈值时丢弃或降级;也可搭配 Semaphore 做额外限流。

  7. drainTo 有什么用?
    可一次性批量拉取元素到集合中,避免频繁唤醒/加锁,非常适合批处理、日志刷盘等场景。


小结

  • BlockingQueue 是多线程解耦与限流的利器,选对实现才能充分发挥性能。
  • 熟悉 4 类操作语义与常见队列特性,是面试与实战的基础。
  • 配合线程池、定时任务、监控告警等机制,可以构建稳定可靠的并发系统。