Java Queue 和 Deque 完全指南
"Queue 是数据流动的艺术" —— 从先进先出到双端操作,掌握队列就掌握了数据流转的核心
Queue 是 Java 集合框架的重要接口,用于表示**先进先出(FIFO)**的数据结构。想象一个排队系统:先来的人先得到服务,这就是Queue的精髓。
🎯 快速索引
- 基础概念 - 5分钟理解队列核心特性
- Queue实现对比 - 选择最适合的队列实现
- PriorityQueue深度解析 - 堆排序的队列实现
- Deque双端队列 - 支持两端操作的队列
- 并发队列 - 多线程环境下的队列设计
- 延迟队列 - 时间驱动的队列系统
- 实战应用 - 真实项目中的队列模式
- 性能优化 - 提升队列操作效率的技巧
🔍 什么是 Queue 和 Deque?
Queue 核心特性
Queue 是 Java 集合框架的核心接口,继承自 Collection,表示**先进先出(FIFO)**的数据结构:
| 特性 | 说明 | 实际场景 |
|---|---|---|
| 🚶 FIFO 原则 | 第一个插入的元素第一个被移除 | 任务调度、消息队列 |
| 📦 数据处理 | 按顺序处理元素 | 生产者-消费者模型 |
| ⏰ 缓冲机制 | 在数据生产者和消费者之间建立缓冲 | 异步处理、流量控制 |
| 🎯 有限容量 | 支持有界队列,防止内存溢出 | 限流器、任务池 |
| 🔌 阻塞语义 | 满时空时的阻塞/非阻塞行为 | 线程池、同步机制 |
Deque 双端队列特性
Queue vs Deque vs List 对比
| 维度 | Queue | Deque | List |
|---|---|---|---|
| 主要用途 | FIFO处理 | 双端操作 | 索引访问 |
| 插入位置 | 队尾 | 队头/队尾 | 任意位置 |
| 删除位置 | 队头 | 队头/队尾 | 任意位置 |
| 访问方式 | 只能访问队头 | 队头/队尾 | 随机访问 |
| 典型场景 | 任务调度 | 浏览器历史 | 数据存储 |
🏗️ 常见 Queue 实现对比
Queue 实现类详细对比表
| 实现类 | 底层结构 | 时间复杂度 | 线程安全 | 特殊功能 | 推荐指数 |
|---|---|---|---|---|---|
| PriorityQueue ⭐ | 二叉堆 | O(log n)插入,O(1)peek | ❌ | 优先级排序 | ⭐⭐⭐⭐ |
| ArrayDeque 🚀 | 循环数组 | O(1)头尾操作 | ❌ | 高性能双端 | ⭐⭐⭐⭐⭐ |
| LinkedList 🔗 | 双向链表 | O(1)头尾,O(n)中间 | ❌ | 支持索引访问 | ⭐⭐⭐ |
| ConcurrentLinkedQueue 🔄 | CAS链表 | O(1)无锁操作 | ✅ | 高并发无界 | ⭐⭐⭐⭐ |
| ArrayBlockingQueue 📦 | 数组 + 锁 | O(1)有界阻塞 | ✅ | 公平/非公平 | ⭐⭐⭐ |
| LinkedBlockingQueue ⛓️ | 链表 + 锁 | O(1)无界阻塞 | ✅ | 高吞吐阻塞 | ⭐⭐⭐⭐ |
Deque 实现类详细对比表
| 实现类 | 底层结构 | 时间复杂度 | 空间复杂度 | 线程安全 | 推荐指数 |
|---|---|---|---|---|---|
| ArrayDeque | 循环数组 | O(1)头尾操作 | O(n) | ❌ | ⭐⭐⭐⭐⭐ |
| LinkedList | 双向链表 | O(1)头尾,O(n)索引 | O(n) | ❌ | ⭐⭐⭐ |
| LinkedBlockingDeque | 链表 + 锁 | O(1)阻塞操作 | O(n) | ✅ | ⭐⭐⭐ |
| ConcurrentLinkedDeque | CAS链表 | O(1)无锁操作 | O(n) | ✅ | ⭐⭐⭐⭐ |
💡 选择建议:
- 高性能双端操作:
ArrayDeque(推荐首选)- 需要优先级:
PriorityQueue- 高并发场景:
ConcurrentLinkedQueue/ConcurrentLinkedDeque- 需要阻塞机制:
ArrayBlockingQueue/LinkedBlockingQueue- 需要索引访问:
LinkedList
Queue 应用场景矩阵
| 业务场景 | 推荐实现 | 性能特点 | 代码示例 |
|---|---|---|---|
| 任务调度 | PriorityQueue | 按优先级处理 | PriorityQueue<Task> tasks = new PriorityQueue<>(); |
| 浏览器历史 | ArrayDeque | 支持前进后退 | Deque<String> history = new ArrayDeque<>(); |
| 消息队列 | ConcurrentLinkedQueue | 高并发无界 | Queue<Message> queue = new ConcurrentLinkedQueue<>(); |
| 限流器 | ArrayBlockingQueue | 固定容量阻塞 | BlockingQueue<Request> limiter = new ArrayBlockingQueue<>(100); |
| 缓存淘汰 | ArrayDeque | LRU实现 | Deque<CacheItem> lru = new ArrayDeque<>(); |
| 延迟任务 | DelayQueue | 时间驱动处理 | DelayQueue<DelayedTask> scheduler = new DelayQueue<>(); |
🎯 PriorityQueue 优先级队列
核心原理
PriorityQueue 核心参数
public class PriorityQueue<E> extends AbstractQueue<E> {
// 默认初始容量
private static final int DEFAULT_INITIAL_CAPACITY = 11;
// 存储元素的数组
transient Object[] queue;
// 队列中元素的数量
private int size = 0;
// 比较器,如果为null则使用自然排序
private final Comparator<? super E> comparator;
// 修改次数(用于Fail-Fast)
transient int modCount = 0;
}
堆操作详解
public class PriorityQueueAnalysis {
// 插入操作 - 上浮调整
private void siftUp(int k, E x) {
if (comparator != null)
siftUpUsingComparator(k, x);
else
siftUpComparable(k, x);
}
// 自然排序上浮
@SuppressWarnings("unchecked")
private void siftUpComparable(int k, E x) {
Comparable<? super E> key = (Comparable<? super E>) x;
while (k > 0) {
int parent = (k - 1) >>> 1; // 计算父节点索引
Object e = queue[parent];
if (key.compareTo((E) e) >= 0) // 如果大于等于父节点,停止上浮
break;
queue[k] = e; // 父节点下沉
k = parent;
}
queue[k] = key; // 插入到正确位置
}
// 删除操作 - 下沉调整
private void siftDown(int k, E x) {
if (comparator != null)
siftDownUsingComparator(k, x);
else
siftDownComparable(k, x);
}
// 自然排序下沉
@SuppressWarnings("unchecked")
private void siftDownComparable(int k, E x) {
Comparable<? super E> key = (Comparable<? super E>) x;
int half = size >>> 1; // 只需要遍历到一半
while (k < half) {
int child = (k << 1) + 1; // 左子节点
Object c = queue[child];
int right = child + 1; // 右子节点
if (right < size &&
((Comparable<? super E>) c).compareTo((E) queue[right]) > 0)
c = queue[child = right]; // 选择较小的子节点
if (key.compareTo((E) c) <= 0) // 如果小于等于最小子节点,停止下沉
break;
queue[k] = c; // 子节点上浮
k = child;
}
queue[k] = key; // 插入到正确位置
}
}
PriorityQueue 使用示例
public class PriorityQueueExamples {
// 1. 自然排序 - 数值从小到大
public void naturalOrdering() {
PriorityQueue<Integer> numbers = new PriorityQueue<>();
numbers.addAll(Arrays.asList(5, 2, 8, 1, 9, 3));
System.out.print("自然排序: ");
while (!numbers.isEmpty()) {
System.out.print(numbers.poll() + " "); // 1 2 3 5 8 9
}
}
// 2. 自定义排序 - 大顶堆
public void customOrdering() {
// 降序排列
PriorityQueue<Integer> maxHeap = new PriorityQueue<>(Comparator.reverseOrder());
maxHeap.addAll(Arrays.asList(5, 2, 8, 1, 9, 3));
System.out.print("降序排列: ");
while (!maxHeap.isEmpty()) {
System.out.print(maxHeap.poll() + " "); // 9 8 5 3 2 1
}
}
// 3. 对象优先级
public static class Task implements Comparable<Task> {
private final String name;
private final int priority;
private final long timestamp;
public Task(String name, int priority) {
this.name = name;
this.priority = priority;
this.timestamp = System.currentTimeMillis();
}
@Override
public int compareTo(Task other) {
// 先按优先级排序(数字越小优先级越高)
int priorityCompare = Integer.compare(this.priority, other.priority);
if (priorityCompare != 0) {
return priorityCompare;
}
// 优先级相同时按时间排序(先到先服务)
return Long.compare(this.timestamp, other.timestamp);
}
@Override
public String toString() {
return String.format("Task{name='%s', priority=%d}", name, priority);
}
// getters...
}
public void taskScheduling() {
PriorityQueue<Task> taskQueue = new PriorityQueue<>();
// 添加不同优先级的任务
taskQueue.offer(new Task("低优先级任务", 3));
taskQueue.offer(new Task("高优先级任务", 1));
taskQueue.offer(new Task("中优先级任务", 2));
taskQueue.offer(new Task("紧急任务", 1));
System.out.println("任务执行顺序:");
while (!taskQueue.isEmpty()) {
System.out.println("执行: " + taskQueue.poll());
}
}
// 4. TopK 问题解决
public List<Integer> findTopK(List<Integer> numbers, int k) {
if (k <= 0) return Collections.emptyList();
if (k >= numbers.size()) return new ArrayList<>(numbers);
// 使用小顶堆保持K个最大元素
PriorityQueue<Integer> minHeap = new PriorityQueue<>(k);
for (int num : numbers) {
if (minHeap.size() < k) {
minHeap.offer(num);
} else if (num > minHeap.peek()) {
minHeap.poll(); // 移除最小的
minHeap.offer(num); // 添加当前较大的
}
}
// 堆中元素就是前K大的,需要排序输出
List<Integer> result = new ArrayList<>(minHeap);
result.sort(Comparator.reverseOrder());
return result;
}
// 5. 中位数维护
public static class MedianFinder {
private final PriorityQueue<Integer> maxHeap; // 存储较小的一半
private final PriorityQueue<Integer> minHeap; // 存储较大的一半
public MedianFinder() {
maxHeap = new PriorityQueue<>(Comparator.reverseOrder()); // 大顶堆
minHeap = new PriorityQueue<>(); // 小顶堆
}
public void addNum(int num) {
// 先加入大顶堆
maxHeap.offer(num);
// 将大顶堆的最大元素移到小顶堆
minHeap.offer(maxHeap.poll());
// 如果小顶堆比大顶堆多超过1个,平衡一下
if (minHeap.size() > maxHeap.size()) {
maxHeap.offer(minHeap.poll());
}
}
public double findMedian() {
if (maxHeap.size() == minHeap.size()) {
return (maxHeap.peek() + minHeap.peek()) / 2.0;
} else {
return maxHeap.peek();
}
}
}
public void medianMaintenance() {
MedianFinder finder = new MedianFinder();
int[] numbers = {5, 15, 1, 3};
for (int num : numbers) {
finder.addNum(num);
System.out.printf("添加 %d 后的中位数: %.1f%n", num, finder.findMedian());
}
}
}
PriorityQueue 性能特点
public class PriorityQueuePerformance {
// 容量优化
public static void capacityOptimization() {
// ❌ 错误:频繁扩容
PriorityQueue<Integer> badQueue = new PriorityQueue<>();
for (int i = 0; i < 1000; i++) {
badQueue.offer(i); // 多次扩容:11→22→33→49→73→109→...
}
// ✅ 正确:预估容量
int expectedSize = 1000;
PriorityQueue<Integer> goodQueue = new PriorityQueue<>(expectedSize);
for (int i = 0; i < 1000; i++) {
goodQueue.offer(i); // 只需一次扩容或不需要扩容
}
}
// 性能基准测试
@Benchmark
public void priorityQueueInsert() {
PriorityQueue<Integer> queue = new PriorityQueue<>(10_000);
Random random = new Random();
for (int i = 0; i < 10_000; i++) {
queue.offer(random.nextInt(100_000));
}
}
// 与其他数据结构对比
public static void performanceComparison() {
int size = 100_000;
Random random = new Random();
List<Integer> data = new ArrayList<>();
for (int i = 0; i < size; i++) {
data.add(random.nextInt(size));
}
// PriorityQueue
long start = System.nanoTime();
PriorityQueue<Integer> pq = new PriorityQueue<>(data);
while (!pq.isEmpty()) {
pq.poll();
}
long pqTime = System.nanoTime() - start;
// Arrays.sort (排序整个数组)
start = System.nanoTime();
int[] array = data.stream().mapToInt(Integer::intValue).toArray();
Arrays.sort(array);
long sortTime = System.nanoTime() - start;
System.out.printf("PriorityQueue排序时间: %.2f ms%n", pqTime / 1_000_000.0);
System.out.printf("Arrays.sort排序时间: %.2f ms%n", sortTime / 1_000_000.0);
}
}
🔄 Deque 双端队列详解
ArrayDeque - 高性能循环数组实现
ArrayDeque 是 Java 中最高效的双端队列实现,基于循环数组的数据结构:
核心数据结构
public class ArrayDeque<E> extends AbstractCollection<E>
implements Deque<E>, Cloneable, Serializable {
// 存储元素的数组
transient Object[] elements;
// 队列头部的索引
transient int head;
// 队列尾部的索引
transient int tail;
// 最小初始容量
private static final int MIN_INITIAL_CAPACITY = 8;
}
循环数组原理
public class ArrayDequeAnalysis {
// 循环索引计算
private int inc(int i, int modulus) {
if (++i >= modulus) i = 0;
return i;
}
private int dec(int i, int modulus) {
if (--i < 0) i = modulus - 1;
return i;
}
private int add(int i, int distance, int modulus) {
if ((i += distance) - modulus >= 0) {
i -= modulus;
}
return i;
}
// 头部插入 - O(1)
public void addFirst(E e) {
if (e == null)
throw new NullPointerException();
elements[head = (head - 1) & (elements.length - 1)] = e;
if (head == tail)
doubleCapacity(); // 队列满时扩容
}
// 尾部插入 - O(1)
public void addLast(E e) {
if (e == null)
throw new NullPointerException();
elements[tail] = e;
if ((tail = (tail + 1) & (elements.length - 1)) == head)
doubleCapacity(); // 队列满时扩容
}
// 头部删除 - O(1)
public E pollFirst() {
@SuppressWarnings("unchecked")
E result = (E) elements[head];
if (result == null)
return null;
elements[h = (h + 1) & (elements.length - 1)] = null; // 清理引用
return result;
}
// 尾部删除 - O(1)
public E pollLast() {
@SuppressWarnings("unchecked")
E result = (E) elements[(tail - 1) & (elements.length - 1)];
if (result == null)
return null;
elements[tail = (tail - 1) & (elements.length - 1)] = null; // 清理引用
return result;
}
}
循环数组可视化
ArrayDeque 循环数组布局(容量8):
初始状态:
┌───┬───┬───┬───┬───┬───┬───┬───┐
│ │ │ │ │ │ │ │ │
└───┴───┴───┴───┴───┴───┴───┴───┘
0 1 2 3 4 5 6 7
head=0, tail=0
添加 A, B, C:
┌───┬───┬───┬───┬───┬───┬───┬───┐
│ A │ B │ C │ │ │ │ │ │
└───┴───┴───┴───┴───┴───┴───┴───┘
0 1 2 3 4 5 6 7
head=0, tail=3
头部添加 X, 尾部添加 Y:
┌───┬───┬───┬───┬───┬───┬───┬───┐
│ A │ B │ C │ Y │ │ │ │ X │
└───┴───┴───┴───┴───┴───┴───┴───┘
0 1 2 3 4 5 6 7
head=7, tail=4
Deque 操作详解
public class DequeOperations {
public void demonstrateDequeOperations() {
Deque<String> deque = new ArrayDeque<>();
// === 队列操作 (FIFO) ===
System.out.println("=== 队列操作 ===");
deque.addLast("A"); // 入队
deque.addLast("B");
deque.addLast("C");
System.out.println("队列: " + deque); // [A, B, C]
String first = deque.pollFirst(); // 出队
System.out.println("出队: " + first); // A
System.out.println("剩余: " + deque); // [B, C]
// === 栈操作 (LIFO) ===
System.out.println("\n=== 栈操作 ===");
deque.clear();
deque.addFirst("X"); // 入栈
deque.addFirst("Y");
deque.addFirst("Z");
System.out.println("栈: " + deque); // [Z, Y, X]
String top = deque.pollFirst(); // 出栈
System.out.println("出栈: " + top); // Z
System.out.println("剩余: " + deque); // [Y, X]
// === 混合操作 ===
System.out.println("\n=== 混合操作 ===");
deque.clear();
// 头部操作
deque.addFirst("First");
deque.addFirst("Second");
// 尾部操作
deque.addLast("Third");
deque.addLast("Fourth");
System.out.println("最终队列: " + deque); // [Second, First, Third, Fourth]
System.out.println("头部: " + deque.getFirst()); // Second
System.out.println("尾部: " + deque.getLast()); // Fourth
// 两端删除
String headRemoved = deque.pollFirst();
String tailRemoved = deque.pollLast();
System.out.println("删除头部: " + headRemoved); // Second
System.out.println("删除尾部: " + tailRemoved); // Fourth
System.out.println("最终状态: " + deque); // [First, Third]
}
// 浏览器历史实现
public static class BrowserHistory {
private final Deque<String> backStack;
private final Deque<String> forwardStack;
private String currentPage;
public BrowserHistory() {
this.backStack = new ArrayDeque<>();
this.forwardStack = new ArrayDeque<>();
}
public void visit(String page) {
if (currentPage != null) {
backStack.push(currentPage);
}
currentPage = page;
forwardStack.clear(); // 访问新页面时清空前进栈
System.out.println("访问: " + currentPage);
}
public void back() {
if (backStack.isEmpty()) {
System.out.println("没有可后退的页面");
return;
}
forwardStack.push(currentPage);
currentPage = backStack.pop();
System.out.println("后退到: " + currentPage);
}
public void forward() {
if (forwardStack.isEmpty()) {
System.out.println("没有可前进的页面");
return;
}
backStack.push(currentPage);
currentPage = forwardStack.pop();
System.out.println("前进到: " + currentPage);
}
public void showHistory() {
System.out.println("当前页面: " + currentPage);
System.out.println("后退历史: " + backStack);
System.out.println("前进历史: " + forwardStack);
}
}
public void browserHistoryDemo() {
BrowserHistory browser = new BrowserHistory();
browser.visit("google.com");
browser.visit("github.com");
browser.visit("stackoverflow.com");
System.out.println("\n=== 浏览器历史 ===");
browser.showHistory();
System.out.println("\n=== 后退操作 ===");
browser.back(); // 回到 github.com
browser.back(); // 回到 google.com
System.out.println("\n=== 前进操作 ===");
browser.forward(); // 前进到 github.com
browser.visit("embracechw.top"); // 访问新页面
browser.showHistory();
}
// LRU缓存实现
public static class LRUCache<K, V> {
private final int capacity;
private final Deque<K> accessOrder;
private final Map<K, V> cache;
public LRUCache(int capacity) {
this.capacity = capacity;
this.accessOrder = new ArrayDeque<>();
this.cache = new HashMap<>();
}
public V get(K key) {
V value = cache.get(key);
if (value != null) {
// 更新访问顺序
accessOrder.remove(key);
accessOrder.addLast(key);
}
return value;
}
public void put(K key, V value) {
V existing = cache.get(key);
if (existing != null) {
// 更新现有值
cache.put(key, value);
// 更新访问顺序
accessOrder.remove(key);
accessOrder.addLast(key);
} else {
// 新增元素
if (cache.size() >= capacity) {
// 淘汰最少使用的元素
K lruKey = accessOrder.pollFirst();
if (lruKey != null) {
cache.remove(lruKey);
System.out.println("淘汰: " + lruKey);
}
}
cache.put(key, value);
accessOrder.addLast(key);
System.out.println("添加: " + key);
}
}
public void display() {
System.out.println("缓存状态: " + cache);
System.out.println("访问顺序: " + accessOrder);
}
}
public void lruCacheDemo() {
LRUCache<String, Integer> cache = new LRUCache<>(3);
cache.put("A", 1);
cache.put("B", 2);
cache.put("C", 3);
cache.display();
System.out.println("\n=== 访问 A ===");
cache.get("A");
cache.display();
System.out.println("\n=== 添加 D (触发淘汰) ===");
cache.put("D", 4);
cache.display();
System.out.println("\n=== 添加 E (触发淘汰) ===");
cache.put("E", 5);
cache.display();
}
}
🚀 并发队列机制
ConcurrentLinkedQueue - 无锁并发队列
public class ConcurrentLinkedQueueAnalysis {
// CAS操作核心
private static class Node<E> {
volatile E item;
volatile Node<E> next;
Node(E item) {
UNSAFE.putObject(this, itemOffset, item);
}
boolean casItem(E cmp, E val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
void lazySetNext(Node<E> val) {
UNSAFE.putOrderedObject(this, nextOffset, val);
}
}
// 无锁入队操作
public boolean offer(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) {
// p是最后一个节点,尝试链接新节点
if (p.casNext(null, newNode)) {
// 链接成功,尝试更新tail
if (p != t) // hop two nodes at a time
casTail(t, newNode); // Failure is OK.
return true;
}
// CAS失败,重试
}
else if (p == q)
// 遇到哨兵节点,从head重新开始
p = (t != (t = tail)) ? t : head;
else
// 移动到下一个节点
p = (p != t && t != (t = tail)) ? t : q;
}
}
// 无锁出队操作
public E poll() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
if (item != null && p.casItem(item, null)) {
// 成功取出item
if (p != h) // hop two nodes at a time
casHead(h, p);
return item;
}
else if ((q = p.next) == null) {
// 队列为空
return null;
}
else if (p == q) {
// 遇到哨兵节点,重新开始
continue restartFromHead;
}
else {
// 移动到下一个节点
p = q;
}
}
}
}
}
并发队列性能测试
public class ConcurrentQueuePerformance {
// 单线程性能测试
@Benchmark
public void singleThreadQueueTest() {
int operations = 1_000_000;
// ArrayDeque
long start = System.nanoTime();
Deque<Integer> arrayDeque = new ArrayDeque<>();
for (int i = 0; i < operations; i++) {
arrayDeque.offer(i);
if (i % 100 == 0) {
arrayDeque.poll();
}
}
long arrayDequeTime = System.nanoTime() - start;
// ConcurrentLinkedQueue
start = System.nanoTime();
Queue<Integer> concurrentQueue = new ConcurrentLinkedQueue<>();
for (int i = 0; i < operations; i++) {
concurrentQueue.offer(i);
if (i % 100 == 0) {
concurrentQueue.poll();
}
}
long concurrentQueueTime = System.nanoTime() - start;
System.out.printf("ArrayDeque: %.2f ms%n", arrayDequeTime / 1_000_000.0);
System.out.printf("ConcurrentLinkedQueue: %.2f ms%n", concurrentQueueTime / 1_000_000.0);
System.out.printf("性能差异: %.1fx%n", (double) concurrentQueueTime / arrayDequeTime);
}
// 多线程性能测试
public void multiThreadQueueTest() throws InterruptedException {
int threadCount = 4;
int operationsPerThread = 250_000;
CountDownLatch latch = new CountDownLatch(threadCount);
// 测试ArrayDeque(非线程安全)
Deque<Integer> unsafeDeque = new ArrayDeque<>();
ThreadUnsafeProducer[] unsafeProducers = new ThreadUnsafeProducer[threadCount];
for (int i = 0; i < threadCount; i++) {
unsafeProducers[i] = new ThreadUnsafeProducer(unsafeDeque, operationsPerThread, latch);
}
long unsafeStart = System.nanoTime();
for (ThreadUnsafeProducer producer : unsafeProducers) {
producer.start();
}
latch.await();
long unsafeTime = System.nanoTime() - unsafeStart;
// 测试ConcurrentLinkedQueue(线程安全)
Queue<Integer> safeQueue = new ConcurrentLinkedQueue<>();
CountDownLatch safeLatch = new CountDownLatch(threadCount);
ThreadSafeProducer[] safeProducers = new ThreadSafeProducer[threadCount];
for (int i = 0; i < threadCount; i++) {
safeProducers[i] = new ThreadSafeProducer(safeQueue, operationsPerThread, safeLatch);
}
long safeStart = System.nanoTime();
for (ThreadSafeProducer producer : safeProducers) {
producer.start();
}
safeLatch.await();
long safeTime = System.nanoTime() - safeStart;
System.out.printf("ArrayDeque (线程不安全): %.2f ms, 结果大小: %d%n",
unsafeTime / 1_000_000.0, unsafeDeque.size());
System.out.printf("ConcurrentLinkedQueue (线程安全): %.2f ms, 结果大小: %d%n",
safeTime / 1_000_000.0, safeQueue.size());
}
static class ThreadUnsafeProducer extends Thread {
private final Deque<Integer> deque;
private final int operations;
private final CountDownLatch latch;
ThreadUnsafeProducer(Deque<Integer> deque, int operations, CountDownLatch latch) {
this.deque = deque;
this.operations = operations;
this.latch = latch;
}
@Override
public void run() {
try {
for (int i = 0; i < operations; i++) {
deque.offer(i);
if (i % 10 == 0 && !deque.isEmpty()) {
deque.poll();
}
}
} finally {
latch.countDown();
}
}
}
static class ThreadSafeProducer extends Thread {
private final Queue<Integer> queue;
private final int operations;
private final CountDownLatch latch;
ThreadSafeProducer(Queue<Integer> queue, int operations, CountDownLatch latch) {
this.queue = queue;
this.operations = operations;
this.latch = latch;
}
@Override
public void run() {
try {
for (int i = 0; i < operations; i++) {
queue.offer(i);
if (i % 10 == 0) {
queue.poll();
}
}
} finally {
latch.countDown();
}
}
}
}
⏰ DelayQueue 延迟队列
DelayQueue 原理与实现
public class DelayQueueAnalysis<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {
// 内部使用PriorityQueue存储元素
private final transient PriorityQueue<E> q;
// 用于线程同步的锁
private final transient ReentrantLock lock = new ReentrantLock();
// 用于等待的线程
private final transient Condition available = lock.newCondition();
// 用于快速查询的leader线程
private transient Thread leader = null;
// 获取延迟元素
@Override
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null)
available.await(); // 队列为空,等待
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return q.poll(); // 延迟时间已到,返回元素
first = null; // don't retain ref while waiting
if (leader != null)
available.await(); // 已有leader线程,等待
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay); // 等待剩余时间
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal(); // 唤醒下一个等待线程
lock.unlock();
}
}
}
DelayQueue 应用实例
public class DelayedTaskExample {
// 延迟任务实现
public static class DelayedTask implements Delayed {
private final String name;
private final long executeTime;
private final long createTime;
public DelayedTask(String name, long delayMillis) {
this.name = name;
this.createTime = System.currentTimeMillis();
this.executeTime = this.createTime + delayMillis;
}
@Override
public long getDelay(TimeUnit unit) {
long remainingTime = executeTime - System.currentTimeMillis();
return unit.convert(remainingTime, TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
if (this == o) return 0;
long diff = this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
public void execute() {
System.out.printf("执行任务: %s, 创建时间: %d, 执行时间: %d, 延迟: %d ms%n",
name, createTime, System.currentTimeMillis(),
System.currentTimeMillis() - createTime);
}
@Override
public String toString() {
return String.format("DelayedTask{name='%s', delay=%d ms}",
name, executeTime - createTime);
}
}
// 任务调度器
public static class TaskScheduler {
private final DelayQueue<DelayedTask> delayQueue;
private final ExecutorService executor;
private volatile boolean running = false;
public TaskScheduler(int workerThreads) {
this.delayQueue = new DelayQueue<>();
this.executor = Executors.newFixedThreadPool(workerThreads);
}
public void start() {
running = true;
// 启动消费者线程
Thread consumer = new Thread(() -> {
while (running && !Thread.currentThread().isInterrupted()) {
try {
DelayedTask task = delayQueue.take(); // 阻塞等待到期任务
executor.submit(() -> {
try {
task.execute();
} catch (Exception e) {
System.err.println("任务执行失败: " + task.name);
e.printStackTrace();
}
});
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}, "DelayQueue-Consumer");
consumer.start();
System.out.println("任务调度器已启动");
}
public void schedule(String taskName, long delayMillis) {
DelayedTask task = new DelayedTask(taskName, delayMillis);
delayQueue.offer(task);
System.out.printf("已调度任务: %s, 延迟: %d ms%n", taskName, delayMillis);
}
public void stop() {
running = false;
executor.shutdown();
try {
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
System.out.println("任务调度器已停止");
}
// 获取队列状态
public QueueStatus getQueueStatus() {
List<DelayedTask> pendingTasks = new ArrayList<>();
delayQueue.drainTo(pendingTasks);
// 重新放回队列
for (DelayedTask task : pendingTasks) {
delayQueue.offer(task);
}
return new QueueStatus(pendingTasks.size(), delayQueue.size());
}
}
public static class QueueStatus {
private final int pendingCount;
private final int totalCount;
public QueueStatus(int pendingCount, int totalCount) {
this.pendingCount = pendingCount;
this.totalCount = totalCount;
}
@Override
public String toString() {
return String.format("QueueStatus{pending=%d, total=%d}", pendingCount, totalCount);
}
}
public void delayQueueDemo() throws InterruptedException {
TaskScheduler scheduler = new TaskScheduler(2);
scheduler.start();
// 添加不同延迟的任务
scheduler.schedule("即时任务", 0);
scheduler.schedule("1秒后任务", 1000);
scheduler.schedule("3秒后任务", 3000);
scheduler.schedule("2秒后任务", 2000);
scheduler.schedule("5秒后任务", 5000);
// 定期查看队列状态
ScheduledExecutorService monitor = Executors.newSingleThreadScheduledExecutor();
monitor.scheduleAtFixedRate(() -> {
QueueStatus status = scheduler.getQueueStatus();
System.out.println("队列状态: " + status);
}, 1, 1, TimeUnit.SECONDS);
// 运行8秒后停止
Thread.sleep(8000);
scheduler.stop();
monitor.shutdown();
}
// 订单超时处理
public static class OrderTimeoutManager {
private final DelayQueue<OrderTimeout> timeoutQueue;
private final Map<String, Order> activeOrders;
private final ExecutorService timeoutExecutor;
public OrderTimeoutManager() {
this.timeoutQueue = new DelayQueue<>();
this.activeOrders = new ConcurrentHashMap<>();
this.timeoutExecutor = Executors.newSingleThreadExecutor();
startTimeoutProcessor();
}
// 订单类
public static class Order {
private final String orderId;
private final String customerId;
private final BigDecimal amount;
private final long createTime;
private volatile OrderStatus status;
public Order(String orderId, String customerId, BigDecimal amount) {
this.orderId = orderId;
this.customerId = customerId;
this.amount = amount;
this.createTime = System.currentTimeMillis();
this.status = OrderStatus.PENDING;
}
// getters and setters...
}
// 订单超时事件
private static class OrderTimeout implements Delayed {
private final String orderId;
private final long timeoutTime;
public OrderTimeout(String orderId, long timeoutMillis) {
this.orderId = orderId;
this.timeoutTime = System.currentTimeMillis() + timeoutMillis;
}
@Override
public long getDelay(TimeUnit unit) {
long remaining = timeoutTime - System.currentTimeMillis();
return unit.convert(remaining, TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
return Long.compare(this.timeoutTime, ((OrderTimeout) o).timeoutTime);
}
}
// 订单状态枚举
public enum OrderStatus {
PENDING, CONFIRMED, CANCELLED, TIMEOUT
}
public void createOrder(Order order, long timeoutMillis) {
activeOrders.put(order.getOrderId(), order);
OrderTimeout timeout = new OrderTimeout(order.getOrderId(), timeoutMillis);
timeoutQueue.offer(timeout);
System.out.printf("创建订单: %s, 超时时间: %d 分钟%n",
order.getOrderId(), timeoutMillis / 60000);
}
public void confirmOrder(String orderId) {
Order order = activeOrders.get(orderId);
if (order != null && order.getStatus() == OrderStatus.PENDING) {
order.setStatus(OrderStatus.CONFIRMED);
System.out.printf("订单确认: %s%n", orderId);
}
}
public void cancelOrder(String orderId) {
Order order = activeOrders.remove(orderId);
if (order != null) {
order.setStatus(OrderStatus.CANCELLED);
System.out.printf("订单取消: %s%n", orderId);
}
}
private void startTimeoutProcessor() {
timeoutExecutor.submit(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
OrderTimeout timeout = timeoutQueue.take();
Order order = activeOrders.get(timeout.orderId);
if (order != null && order.getStatus() == OrderStatus.PENDING) {
order.setStatus(OrderStatus.TIMEOUT);
activeOrders.remove(timeout.orderId);
System.out.printf("订单超时: %s, 金额: %.2f%n",
order.getOrderId(), order.getAmount());
// 这里可以发送通知、退款等处理
handleOrderTimeout(order);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});
}
private void handleOrderTimeout(Order order) {
// 超时处理逻辑
System.out.printf("处理超时订单: 发送退款通知给客户 %s%n", order.getCustomerId());
}
public void shutdown() {
timeoutExecutor.shutdown();
try {
if (!timeoutExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
timeoutExecutor.shutdownNow();
}
} catch (InterruptedException e) {
timeoutExecutor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
public void orderTimeoutDemo() throws InterruptedException {
OrderTimeoutManager timeoutManager = new OrderTimeoutManager();
// 创建一些订单
Order order1 = new Order("ORD001", "CUST001", new BigDecimal("299.00"));
Order order2 = new Order("ORD002", "CUST002", new BigDecimal("1599.00"));
Order order3 = new Order("ORD003", "CUST003", new BigDecimal("89.00"));
timeoutManager.createOrder(order1, 3000); // 3秒超时
timeoutManager.createOrder(order2, 5000); // 5秒超时
timeoutManager.createOrder(order3, 2000); // 2秒超时
// 模拟订单确认
Thread.sleep(1000);
timeoutManager.confirmOrder("ORD001");
// 等待处理完成
Thread.sleep(6000);
timeoutManager.shutdown();
}
}
🎯 Queue 实战应用与最佳实践
1️⃣ 生产者-消费者模式
public class ProducerConsumerPattern {
// 基础的生产者-消费者实现
public static class BasicProducerConsumer<T> {
private final BlockingQueue<T> queue;
private final ExecutorService executor;
public BasicProducerConsumer(int queueCapacity, int consumerCount) {
this.queue = new ArrayBlockingQueue<>(queueCapacity);
this.executor = Executors.newFixedThreadPool(consumerCount);
}
public void startConsumers(Consumer<T> consumer) {
for (int i = 0; i < consumerCount; i++) {
executor.submit(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
T item = queue.take(); // 阻塞等待
consumer.accept(item);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});
}
}
public boolean produce(T item) {
try {
return queue.offer(item, 1, TimeUnit.SECONDS); // 1秒超时
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}
public void shutdown() {
executor.shutdown();
try {
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
// 日志处理系统
public static class LogProcessingSystem {
private final BlockingQueue<LogEvent> logQueue;
private final ExecutorService processorPool;
private final AtomicBoolean running = new AtomicBoolean(true);
public static class LogEvent {
private final String message;
private final LogLevel level;
private final long timestamp;
private final String threadName;
public LogEvent(String message, LogLevel level) {
this.message = message;
this.level = level;
this.timestamp = System.currentTimeMillis();
this.threadName = Thread.currentThread().getName();
}
// getters...
}
public enum LogLevel {
DEBUG, INFO, WARN, ERROR
}
public LogProcessingSystem(int queueSize, int processorCount) {
this.logQueue = new LinkedBlockingQueue<>(queueSize);
this.processorPool = Executors.newFixedThreadPool(processorCount);
startProcessors();
}
private void startProcessors() {
for (int i = 0; i < processorPool.getMaximumPoolSize(); i++) {
processorPool.submit(new LogProcessor());
}
}
private class LogProcessor implements Runnable {
@Override
public void run() {
List<LogEvent> batch = new ArrayList<>(100);
while (running.get() || !logQueue.isEmpty()) {
try {
// 批量拉取日志事件
LogEvent first = logQueue.poll(1, TimeUnit.SECONDS);
if (first != null) {
batch.add(first);
// 继续拉取更多事件(非阻塞)
logQueue.drainTo(batch, 99);
// 批量处理
processBatch(batch);
batch.clear();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
private void processBatch(List<LogEvent> events) {
// 按级别分组处理
Map<LogLevel, List<LogEvent>> eventsByLevel = events.stream()
.collect(Collectors.groupingBy(LogEvent::getLevel));
// 处理错误日志(需要立即处理)
eventsByLevel.getOrDefault(LogLevel.ERROR, Collections.emptyList())
.forEach(this::processErrorLog);
// 处理警告日志
eventsByLevel.getOrDefault(LogLevel.WARN, Collections.emptyList())
.forEach(this::processWarnLog);
// 处理信息日志
eventsByLevel.getOrDefault(LogLevel.INFO, Collections.emptyList())
.forEach(this::processInfoLog);
// 处理调试日志(低优先级)
eventsByLevel.getOrDefault(LogLevel.DEBUG, Collections.emptyList())
.forEach(this::processDebugLog);
}
private void processErrorLog(LogEvent event) {
System.err.printf("[ERROR] %s - %s: %s%n",
new Date(event.getTimestamp()),
event.getThreadName(),
event.getMessage());
// 可以发送到错误监控系统、发送邮件等
}
private void processWarnLog(LogEvent event) {
System.out.printf("[WARN] %s - %s: %s%n",
new Date(event.getTimestamp()),
event.getThreadName(),
event.getMessage());
}
private void processInfoLog(LogEvent event) {
System.out.printf("[INFO] %s - %s: %s%n",
new Date(event.getTimestamp()),
event.getThreadName(),
event.getMessage());
}
private void processDebugLog(LogEvent event) {
// 调试日志可以选择性处理或丢弃
// System.out.printf("[DEBUG] %s: %s%n",
// event.getThreadName(), event.getMessage());
}
}
public void log(String message, LogLevel level) {
LogEvent event = new LogEvent(message, level);
if (!logQueue.offer(event)) {
System.err.println("日志队列已满,丢弃日志: " + message);
}
}
public void shutdown() {
running.set(false);
processorPool.shutdown();
try {
if (!processorPool.awaitTermination(10, TimeUnit.SECONDS)) {
processorPool.shutdownNow();
}
} catch (InterruptedException e) {
processorPool.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
public void logProcessingDemo() throws InterruptedException {
LogProcessingSystem logSystem = new LogProcessingSystem(1000, 2);
// 启动日志生产者
ExecutorService producers = Executors.newFixedThreadPool(3);
producers.submit(() -> {
for (int i = 0; i < 50; i++) {
logSystem.log("这是一条信息日志 " + i, LogProcessingSystem.LogLevel.INFO);
try { Thread.sleep(100); } catch (InterruptedException e) {}
}
});
producers.submit(() -> {
for (int i = 0; i < 20; i++) {
logSystem.log("这是一条警告日志 " + i, LogProcessingSystem.LogLevel.WARN);
try { Thread.sleep(200); } catch (InterruptedException e) {}
}
});
producers.submit(() -> {
for (int i = 0; i < 10; i++) {
logSystem.log("这是一条错误日志 " + i, LogProcessingSystem.LogLevel.ERROR);
try { Thread.sleep(500); } catch (InterruptedException e) {}
}
});
// 运行一段时间后关闭
Thread.sleep(5000);
producers.shutdown();
logSystem.shutdown();
}
}
2️⃣ 限流器实现
public class RateLimiterImplementations {
// 基于队列的令牌桶限流器
public static class TokenBucketRateLimiter {
private final BlockingQueue<Token> bucket;
private final ScheduledExecutorService refillScheduler;
private final int capacity;
private final int refillRate;
private static class Token {
private final long timestamp;
Token() {
this.timestamp = System.currentTimeMillis();
}
}
public TokenBucketRateLimiter(int capacity, int refillRatePerSecond) {
this.capacity = capacity;
this.refillRate = refillRatePerSecond;
this.bucket = new ArrayBlockingQueue<>(capacity);
this.refillScheduler = Executors.newSingleThreadScheduledExecutor();
// 初始化桶
for (int i = 0; i < capacity; i++) {
bucket.offer(new Token());
}
// 定期补充令牌
refillScheduler.scheduleAtFixedRate(() -> {
int currentSize = bucket.size();
int tokensToAdd = Math.min(refillRate, capacity - currentSize);
for (int i = 0; i < tokensToAdd; i++) {
if (!bucket.offer(new Token())) {
break; // 桶已满
}
}
}, 1, 1, TimeUnit.SECONDS);
}
public boolean tryAcquire() {
return bucket.poll() != null;
}
public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException {
return bucket.poll(timeout, unit) != null;
}
public void acquire() throws InterruptedException {
bucket.put(new Token()); // 阻塞直到获得令牌
}
public int getAvailableTokens() {
return bucket.size();
}
public void shutdown() {
refillScheduler.shutdown();
try {
if (!refillScheduler.awaitTermination(1, TimeUnit.SECONDS)) {
refillScheduler.shutdownNow();
}
} catch (InterruptedException e) {
refillScheduler.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
// 漏桶限流器(使用队列作为缓冲区)
public static class LeakyBucketRateLimiter {
private final BlockingQueue<Runnable> bucket;
private final ScheduledExecutorService leakScheduler;
private final int capacity;
private final int leakRate;
public LeakyBucketRateLimiter(int capacity, int leakRatePerSecond) {
this.capacity = capacity;
this.leakRate = leakRatePerSecond;
this.bucket = new LinkedBlockingQueue<>(capacity);
this.leakScheduler = Executors.newSingleThreadScheduledExecutor();
// 定期"漏水" - 处理队列中的任务
leakScheduler.scheduleAtFixedRate(() -> {
int processed = 0;
while (processed < leakRate && !bucket.isEmpty()) {
Runnable task = bucket.poll();
if (task != null) {
try {
task.run();
processed++;
} catch (Exception e) {
System.err.println("任务执行失败: " + e.getMessage());
}
}
}
}, 0, 1, TimeUnit.SECONDS);
}
public boolean submit(Runnable task) {
return bucket.offer(task);
}
public boolean submit(Runnable task, long timeout, TimeUnit unit) throws InterruptedException {
return bucket.offer(task, timeout, unit);
}
public int getQueueSize() {
return bucket.size();
}
public void shutdown() {
leakScheduler.shutdown();
try {
if (!leakScheduler.awaitTermination(1, TimeUnit.SECONDS)) {
leakScheduler.shutdownNow();
}
} catch (InterruptedException e) {
leakScheduler.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
// 滑动窗口限流器
public static class SlidingWindowRateLimiter {
private final Deque<Long> requestTimestamps;
private final int maxRequests;
private final long windowSizeMillis;
public SlidingWindowRateLimiter(int maxRequests, long windowSizeMillis) {
this.maxRequests = maxRequests;
this.windowSizeMillis = windowSizeMillis;
this.requestTimestamps = new ArrayDeque<>();
}
public synchronized boolean tryAcquire() {
long now = System.currentTimeMillis();
// 清理过期的请求时间戳
while (!requestTimestamps.isEmpty() &&
now - requestTimestamps.peekFirst() > windowSizeMillis) {
requestTimestamps.pollFirst();
}
// 检查是否超过限制
if (requestTimestamps.size() < maxRequests) {
requestTimestamps.offerLast(now);
return true;
}
return false;
}
public synchronized int getCurrentRequests() {
long now = System.currentTimeMillis();
// 清理过期的请求时间戳
while (!requestTimestamps.isEmpty() &&
now - requestTimestamps.peekFirst() > windowSizeMillis) {
requestTimestamps.pollFirst();
}
return requestTimestamps.size();
}
public synchronized long getWaitTimeMillis() {
if (requestTimestamps.size() < maxRequests) {
return 0;
}
long oldestRequest = requestTimestamps.peekFirst();
return windowSizeMillis - (System.currentTimeMillis() - oldestRequest);
}
}
public void rateLimiterDemo() throws InterruptedException {
System.out.println("=== 令牌桶限流器测试 ===");
TokenBucketRateLimiter tokenBucket = new TokenBucketRateLimiter(10, 2); // 容量10,每秒补充2个
for (int i = 0; i < 15; i++) {
if (tokenBucket.tryAcquire()) {
System.out.println("请求 " + (i + 1) + " 通过");
} else {
System.out.println("请求 " + (i + 1) + " 被拒绝");
}
Thread.sleep(200);
}
System.out.println("\n=== 滑动窗口限流器测试 ===");
SlidingWindowRateLimiter slidingWindow = new SlidingWindowRateLimiter(5, 1000); // 每秒最多5个请求
for (int i = 0; i < 10; i++) {
if (slidingWindow.tryAcquire()) {
System.out.println("请求 " + (i + 1) + " 通过,当前请求数: " + slidingWindow.getCurrentRequests());
} else {
System.out.println("请求 " + (i + 1) + " 被拒绝,等待时间: " + slidingWindow.getWaitTimeMillis() + "ms");
}
Thread.sleep(200);
}
tokenBucket.shutdown();
}
}
⚡ 性能优化指南
1. 队列选择策略
public class QueuePerformanceOptimization {
// 不同场景下的最优队列选择
public static Queue<Integer> selectOptimalQueue(QueueScenario scenario, int expectedSize) {
switch (scenario) {
case HIGH_CONCURRENCY:
return new ConcurrentLinkedQueue<>();
case BOUNDED_CAPACITY:
return new ArrayBlockingQueue<>(expectedSize);
case PRIORITY_PROCESSING:
PriorityQueue<Integer> pq = new PriorityQueue<>(expectedSize);
return pq;
case LIFO_OPERATIONS:
return new ArrayDeque<>();
case MEMORY_EFFICIENT:
return new PriorityQueue<>(); // 小顶堆通常内存效率高
case SEQUENTIAL_ACCESS:
return new ArrayDeque<>();
default:
return new ArrayDeque<>();
}
}
public enum QueueScenario {
HIGH_CONCURRENCY, // 高并发
BOUNDED_CAPACITY, // 有界容量
PRIORITY_PROCESSING, // 优先级处理
LIFO_OPERATIONS, // 后进先出
MEMORY_EFFICIENT, // 内存高效
SEQUENTIAL_ACCESS // 顺序访问
}
// 批量预分配优化
public static void capacityOptimization() {
int expectedSize = 10000;
// ❌ 错误:频繁扩容
Queue<String> badQueue = new PriorityQueue<>();
for (int i = 0; i < expectedSize; i++) {
badQueue.offer("item" + i); // 可能多次扩容
}
// ✅ 正确:预估容量
Queue<String> goodQueue = new PriorityQueue<>(expectedSize);
for (int i = 0; i < expectedSize; i++) {
goodQueue.offer("item" + i); // 避免扩容
}
// 对于ArrayBlockingQueue,容量必须预先指定
BlockingQueue<String> boundedQueue = new ArrayBlockingQueue<>(expectedSize);
}
// 批量操作优化
public static class BatchQueueProcessor<T> {
private final BlockingQueue<T> queue;
private final int batchSize;
private final Consumer<List<T>> batchProcessor;
private final ExecutorService executor;
public BatchQueueProcessor(BlockingQueue<T> queue, int batchSize,
Consumer<List<T>> batchProcessor) {
this.queue = queue;
this.batchSize = batchSize;
this.batchProcessor = batchProcessor;
this.executor = Executors.newSingleThreadExecutor();
}
public void start() {
executor.submit(() -> {
List<T> batch = new ArrayList<>(batchSize);
while (!Thread.currentThread().isInterrupted()) {
try {
// 拉取第一个元素(阻塞)
T first = queue.poll(1, TimeUnit.SECONDS);
if (first != null) {
batch.add(first);
// 继续拉取剩余元素(非阻塞)
queue.drainTo(batch, batchSize - 1);
// 处理批次
if (!batch.isEmpty()) {
batchProcessor.accept(new ArrayList<>(batch));
batch.clear();
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
// 处理剩余元素
if (!batch.isEmpty()) {
batchProcessor.accept(batch);
}
});
}
public void shutdown() {
executor.shutdown();
try {
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
// 内存优化技巧
public static class MemoryOptimizedQueues {
// 使用原始类型队列(需要第三方库)
public void primitiveTypeQueue() {
// 使用 fastutil 库
// IntQueue intQueue = new IntLinkedOpenQueue();
// for (int i = 0; i < 1000000; i++) {
// intQueue.enqueue(i);
// }
}
// 及时清理队列引用
public static <T> void clearQueueWhenEmpty(Queue<T> queue, int checkInterval) {
if (queue.isEmpty()) {
// 对于大队列,显式清理有助于GC
if (queue instanceof PriorityQueue || queue instanceof ArrayDeque) {
// 这些实现可能有内部数组缓存
// 可以考虑重新创建以释放内存
// queue = new ArrayDeque<>();
}
}
}
// 使用WeakReference避免内存泄漏
public static class WeakReferenceQueue<T> {
private final Queue<WeakReference<T>> queue = new ConcurrentLinkedQueue<>();
private final ReferenceQueue<T> referenceQueue = new ReferenceQueue<>();
public void offer(T item) {
queue.offer(new WeakReference<>(item, referenceQueue));
cleanupReferences();
}
public T poll() {
cleanupReferences();
while (!queue.isEmpty()) {
WeakReference<T> ref = queue.poll();
T item = ref.get();
if (item != null) {
return item;
}
}
return null;
}
private void cleanupReferences() {
Reference<? extends T> ref;
while ((ref = referenceQueue.poll()) != null) {
// 从队列中移除已被GC的引用
queue.remove(ref);
}
}
}
}
}
📚 Queue 面试通关清单
🎯 基础概念 (必须掌握)
- Queue vs Deque 核心区别
- FIFO 和 LIFO 原则
- 不同队列实现的适用场景
- 阻塞队列和非阻塞队列的区别
🚀 进阶知识 (面试加分)
- PriorityQueue 堆排序实现
- ArrayDeque 循环数组原理
- ConcurrentLinkedQueue CAS无锁机制
- DelayQueue 时间驱动队列
- 队列的扩容机制
🔧 实战能力 (项目应用)
- 生产者-消费者模式实现
- 限流器设计与实现
- 任务调度器开发
- 性能优化技巧
- 内存泄漏避免
💎 总结
Queue 作为Java集合框架的重要组件,在数据流转、任务调度、流量控制等方面发挥着关键作用:
核心要点回顾:
- 🎯 ArrayDeque 为主力:高性能双端队列的首选实现
- 🚀 PriorityQueue 提供排序:基于堆的优先级处理能力
- 🔄 ConcurrentLinkedQueue 高并发:无锁并发场景的最佳选择
- ⏰ DelayQueue 时间驱动:延时任务和超时处理的利器
- 📦 阻塞队列背压控制:生产者-消费者模式的核心组件
- ⚡ 性能优化从预估开始:避免频繁扩容,选择合适的数据结构
实战建议:
- 单线程高性能:优先选择
ArrayDeque - 需要优先级:使用
PriorityQueue,注意容量预分配 - 高并发场景:选择
ConcurrentLinkedQueue等无锁实现 - 需要阻塞机制:使用
ArrayBlockingQueue或LinkedBlockingQueue - 时间驱动处理:
DelayQueue是延时任务的完美解决方案 - 批量操作优化:使用
drainTo等批量方法提升性能
掌握了这些知识点,你就可以在面试和实际开发中游刃有余地使用Queue和Deque了!🚀
学习建议:先掌握 ArrayDeque 的循环数组原理,再理解 PriorityQueue 的堆结构,最后学习并发队列的无锁机制。重点关注不同队列的性能特征和适用场景。
记住:选择正确的队列数据结构比算法优化更重要!