阻塞队列
BlockingQueue 是并发编程中最常用的组件之一,能够在队列满/空时自动阻塞生产者或消费者,为生产者-消费者模型提供了天然的“背压”能力。线程池、任务调度、异步日志等场景都离不开它。
接口概览 🧾
BlockingQueue 定义了 4 组典型操作(抛异常、返回特殊值、阻塞、超时):
| 操作 | 队列满时 | 队列空时 |
|---|---|---|
add(e) / remove() / element() | 抛异常(IllegalStateException) | 抛异常(NoSuchElementException) |
offer(e) / poll() / peek() | 返回 false / null | 返回 null |
put(e) / take() | 阻塞直至有空间 | 阻塞直至有元素 |
offer(e, timeout) / poll(timeout) | 等待指定时间后返回 false | 等待指定时间后返回 null |
小技巧:生产者使用
offer可避免永久阻塞;消费者使用poll(timeout)可实现“软超时”策略。
常见实现 🌟
| 队列 | 底层结构 | 是否有界 | 特点 | 应用 |
|---|---|---|---|---|
ArrayBlockingQueue | 数组 + ReentrantLock | 有界 | 先进先出,支持公平锁,入出共用一把锁 | 固定容量任务池 |
LinkedBlockingQueue | 链表 + Lock | 默认无界(可指定) | 入出两把锁,吞吐高但内存分散 | 线程池默认队列 |
SynchronousQueue | 无容量 | 零容量,交接点 | put 必须被 take 配对,支持公平/非公平模式 | newCachedThreadPool |
PriorityBlockingQueue | 二叉堆 | 无界 | 按优先级弹出,元素需 Comparable | 优先任务调度 |
DelayQueue | 优先队列 + Delayed | 无界 | 元素到期才能取,用于延迟任务 | 定时器、订单超时 |
LinkedTransferQueue | 链表 + CAS | 无界 | 支持直接传递/阻塞/非阻塞,吞吐极高 | 高性能事件分发 |
LinkedBlockingDeque | 双端链表 | 可指定容量 | 双端阻塞,支持双向操作 | 工作窃取(粗粒度) |
常用场景 🛠️
- 线程池:
ThreadPoolExecutor通过BlockingQueue管理任务。 - 异步日志:生产者写入队列,单消费者写磁盘,削峰填谷。
- 秒杀下单:阻塞队列限制瞬时请求量,保护数据库。
- 定时任务:
DelayQueue存放延迟任务,消费者按到期顺序执行。 - 限速器:结合
ArrayBlockingQueue提前放入令牌,消费者take令牌后才执行业务,实现漏桶算法。
示例代码 ✏️
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(100);
// 生产者
executor.submit(() -> {
queue.put(() -> System.out.println("task"));
});
// 消费者
new Thread(() -> {
while (true) {
Runnable job = queue.take(); // 阻塞等待
job.run();
}
}).start();
DelayQueue 使用
class DelayTask implements Delayed {
private final long triggerTime;
private final String payload;
DelayTask(long delayMillis, String payload) {
this.triggerTime = System.currentTimeMillis() + delayMillis;
this.payload = payload;
}
@Override
public long getDelay(TimeUnit unit) {
long diff = triggerTime - System.currentTimeMillis();
return unit.convert(diff, TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
return Long.compare(this.triggerTime, ((DelayTask) o).triggerTime);
}
}
DelayQueue<DelayTask> delayQueue = new DelayQueue<>();
delayQueue.put(new DelayTask(5000, "expire order"));
DelayTask task = delayQueue.take(); // 到期后返回
选型建议 ✅
- 固定容量、均衡读写:
ArrayBlockingQueue - 大量数据、入出解耦:
LinkedBlockingQueue - 任务必须即时交付:
SynchronousQueue - 任务优先级:
PriorityBlockingQueue - 延迟/定时:
DelayQueue - 双端操作:
LinkedBlockingDeque - 需要直接交付/Transfer:
LinkedTransferQueue - 需要批量 drain:
LinkedBlockingQueue提供drainTo(Collection, n)
面试常问 🎤
-
SynchronousQueue为什么容量为 0?
它强调“交接”语义,生产者必须等消费者就绪才能完成交换,非常适合任务直接“交给”工作线程的模型。 -
线程池默认使用哪个队列?
Executors.newFixedThreadPool/newSingleThreadExecutor使用LinkedBlockingQueue;newCachedThreadPool使用SynchronousQueue。 -
阻塞队列如何实现阻塞?
通过ReentrantLock+Condition控制线程在队列满/空时await(),当入队/出队后调用signal()唤醒等待线程;LinkedBlockingQueue使用两把锁和两个条件,降低竞争。 -
DelayQueue 如何确定元素出队顺序?
元素必须实现Delayed接口,getDelay()返回剩余时间,内部PriorityQueue以最早到期的元素为堆顶,未到期时take()会阻塞。 -
ArrayBlockingQueue 和 LinkedBlockingQueue 谁更高效?
小数据量、频繁操作时数组实现缓存命中高,延迟低;链表实现使用两把锁,吞吐更大且容量可控,但节点对象多、GC 压力大。 -
如何避免阻塞队列“撑爆”内存?
优先选择有界队列并结合拒绝策略;对无界队列要监控长度,超阈值时丢弃或降级;也可搭配Semaphore做额外限流。 -
drainTo有什么用?
可一次性批量拉取元素到集合中,避免频繁唤醒/加锁,非常适合批处理、日志刷盘等场景。
小结
BlockingQueue是多线程解耦与限流的利器,选对实现才能充分发挥性能。- 熟悉 4 类操作语义与常见队列特性,是面试与实战的基础。
- 配合线程池、定时任务、监控告警等机制,可以构建稳定可靠的并发系统。