跳到主要内容

Java Queue 和 Deque 完全指南

"Queue 是数据流动的艺术" —— 从先进先出到双端操作,掌握队列就掌握了数据流转的核心

Queue 是 Java 集合框架的重要接口,用于表示**先进先出(FIFO)**的数据结构。想象一个排队系统:先来的人先得到服务,这就是Queue的精髓。

🎯 快速索引


🔍 什么是 Queue 和 Deque?

Queue 核心特性

Queue 是 Java 集合框架的核心接口,继承自 Collection,表示**先进先出(FIFO)**的数据结构:

特性说明实际场景
🚶 FIFO 原则第一个插入的元素第一个被移除任务调度、消息队列
📦 数据处理按顺序处理元素生产者-消费者模型
缓冲机制在数据生产者和消费者之间建立缓冲异步处理、流量控制
🎯 有限容量支持有界队列,防止内存溢出限流器、任务池
🔌 阻塞语义满时空时的阻塞/非阻塞行为线程池、同步机制

Deque 双端队列特性

Queue vs Deque vs List 对比

维度QueueDequeList
主要用途FIFO处理双端操作索引访问
插入位置队尾队头/队尾任意位置
删除位置队头队头/队尾任意位置
访问方式只能访问队头队头/队尾随机访问
典型场景任务调度浏览器历史数据存储

🏗️ 常见 Queue 实现对比

Queue 实现类详细对比表

实现类底层结构时间复杂度线程安全特殊功能推荐指数
PriorityQueue二叉堆O(log n)插入,O(1)peek优先级排序⭐⭐⭐⭐
ArrayDeque 🚀循环数组O(1)头尾操作高性能双端⭐⭐⭐⭐⭐
LinkedList 🔗双向链表O(1)头尾,O(n)中间支持索引访问⭐⭐⭐
ConcurrentLinkedQueue 🔄CAS链表O(1)无锁操作高并发无界⭐⭐⭐⭐
ArrayBlockingQueue 📦数组 + 锁O(1)有界阻塞公平/非公平⭐⭐⭐
LinkedBlockingQueue ⛓️链表 + 锁O(1)无界阻塞高吞吐阻塞⭐⭐⭐⭐

Deque 实现类详细对比表

实现类底层结构时间复杂度空间复杂度线程安全推荐指数
ArrayDeque循环数组O(1)头尾操作O(n)⭐⭐⭐⭐⭐
LinkedList双向链表O(1)头尾,O(n)索引O(n)⭐⭐⭐
LinkedBlockingDeque链表 + 锁O(1)阻塞操作O(n)⭐⭐⭐
ConcurrentLinkedDequeCAS链表O(1)无锁操作O(n)⭐⭐⭐⭐

💡 选择建议

  • 高性能双端操作ArrayDeque(推荐首选)
  • 需要优先级PriorityQueue
  • 高并发场景ConcurrentLinkedQueue/ConcurrentLinkedDeque
  • 需要阻塞机制ArrayBlockingQueue/LinkedBlockingQueue
  • 需要索引访问LinkedList

Queue 应用场景矩阵

业务场景推荐实现性能特点代码示例
任务调度PriorityQueue按优先级处理PriorityQueue<Task> tasks = new PriorityQueue<>();
浏览器历史ArrayDeque支持前进后退Deque<String> history = new ArrayDeque<>();
消息队列ConcurrentLinkedQueue高并发无界Queue<Message> queue = new ConcurrentLinkedQueue<>();
限流器ArrayBlockingQueue固定容量阻塞BlockingQueue<Request> limiter = new ArrayBlockingQueue<>(100);
缓存淘汰ArrayDequeLRU实现Deque<CacheItem> lru = new ArrayDeque<>();
延迟任务DelayQueue时间驱动处理DelayQueue<DelayedTask> scheduler = new DelayQueue<>();

🎯 PriorityQueue 优先级队列

核心原理

PriorityQueue 核心参数

public class PriorityQueue<E> extends AbstractQueue<E> {
// 默认初始容量
private static final int DEFAULT_INITIAL_CAPACITY = 11;

// 存储元素的数组
transient Object[] queue;

// 队列中元素的数量
private int size = 0;

// 比较器,如果为null则使用自然排序
private final Comparator<? super E> comparator;

// 修改次数(用于Fail-Fast)
transient int modCount = 0;
}

堆操作详解

public class PriorityQueueAnalysis {

// 插入操作 - 上浮调整
private void siftUp(int k, E x) {
if (comparator != null)
siftUpUsingComparator(k, x);
else
siftUpComparable(k, x);
}

// 自然排序上浮
@SuppressWarnings("unchecked")
private void siftUpComparable(int k, E x) {
Comparable<? super E> key = (Comparable<? super E>) x;
while (k > 0) {
int parent = (k - 1) >>> 1; // 计算父节点索引
Object e = queue[parent];
if (key.compareTo((E) e) >= 0) // 如果大于等于父节点,停止上浮
break;
queue[k] = e; // 父节点下沉
k = parent;
}
queue[k] = key; // 插入到正确位置
}

// 删除操作 - 下沉调整
private void siftDown(int k, E x) {
if (comparator != null)
siftDownUsingComparator(k, x);
else
siftDownComparable(k, x);
}

// 自然排序下沉
@SuppressWarnings("unchecked")
private void siftDownComparable(int k, E x) {
Comparable<? super E> key = (Comparable<? super E>) x;
int half = size >>> 1; // 只需要遍历到一半
while (k < half) {
int child = (k << 1) + 1; // 左子节点
Object c = queue[child];
int right = child + 1; // 右子节点
if (right < size &&
((Comparable<? super E>) c).compareTo((E) queue[right]) > 0)
c = queue[child = right]; // 选择较小的子节点

if (key.compareTo((E) c) <= 0) // 如果小于等于最小子节点,停止下沉
break;

queue[k] = c; // 子节点上浮
k = child;
}
queue[k] = key; // 插入到正确位置
}
}

PriorityQueue 使用示例

public class PriorityQueueExamples {

// 1. 自然排序 - 数值从小到大
public void naturalOrdering() {
PriorityQueue<Integer> numbers = new PriorityQueue<>();
numbers.addAll(Arrays.asList(5, 2, 8, 1, 9, 3));

System.out.print("自然排序: ");
while (!numbers.isEmpty()) {
System.out.print(numbers.poll() + " "); // 1 2 3 5 8 9
}
}

// 2. 自定义排序 - 大顶堆
public void customOrdering() {
// 降序排列
PriorityQueue<Integer> maxHeap = new PriorityQueue<>(Comparator.reverseOrder());
maxHeap.addAll(Arrays.asList(5, 2, 8, 1, 9, 3));

System.out.print("降序排列: ");
while (!maxHeap.isEmpty()) {
System.out.print(maxHeap.poll() + " "); // 9 8 5 3 2 1
}
}

// 3. 对象优先级
public static class Task implements Comparable<Task> {
private final String name;
private final int priority;
private final long timestamp;

public Task(String name, int priority) {
this.name = name;
this.priority = priority;
this.timestamp = System.currentTimeMillis();
}

@Override
public int compareTo(Task other) {
// 先按优先级排序(数字越小优先级越高)
int priorityCompare = Integer.compare(this.priority, other.priority);
if (priorityCompare != 0) {
return priorityCompare;
}
// 优先级相同时按时间排序(先到先服务)
return Long.compare(this.timestamp, other.timestamp);
}

@Override
public String toString() {
return String.format("Task{name='%s', priority=%d}", name, priority);
}

// getters...
}

public void taskScheduling() {
PriorityQueue<Task> taskQueue = new PriorityQueue<>();

// 添加不同优先级的任务
taskQueue.offer(new Task("低优先级任务", 3));
taskQueue.offer(new Task("高优先级任务", 1));
taskQueue.offer(new Task("中优先级任务", 2));
taskQueue.offer(new Task("紧急任务", 1));

System.out.println("任务执行顺序:");
while (!taskQueue.isEmpty()) {
System.out.println("执行: " + taskQueue.poll());
}
}

// 4. TopK 问题解决
public List<Integer> findTopK(List<Integer> numbers, int k) {
if (k <= 0) return Collections.emptyList();
if (k >= numbers.size()) return new ArrayList<>(numbers);

// 使用小顶堆保持K个最大元素
PriorityQueue<Integer> minHeap = new PriorityQueue<>(k);

for (int num : numbers) {
if (minHeap.size() < k) {
minHeap.offer(num);
} else if (num > minHeap.peek()) {
minHeap.poll(); // 移除最小的
minHeap.offer(num); // 添加当前较大的
}
}

// 堆中元素就是前K大的,需要排序输出
List<Integer> result = new ArrayList<>(minHeap);
result.sort(Comparator.reverseOrder());
return result;
}

// 5. 中位数维护
public static class MedianFinder {
private final PriorityQueue<Integer> maxHeap; // 存储较小的一半
private final PriorityQueue<Integer> minHeap; // 存储较大的一半

public MedianFinder() {
maxHeap = new PriorityQueue<>(Comparator.reverseOrder()); // 大顶堆
minHeap = new PriorityQueue<>(); // 小顶堆
}

public void addNum(int num) {
// 先加入大顶堆
maxHeap.offer(num);

// 将大顶堆的最大元素移到小顶堆
minHeap.offer(maxHeap.poll());

// 如果小顶堆比大顶堆多超过1个,平衡一下
if (minHeap.size() > maxHeap.size()) {
maxHeap.offer(minHeap.poll());
}
}

public double findMedian() {
if (maxHeap.size() == minHeap.size()) {
return (maxHeap.peek() + minHeap.peek()) / 2.0;
} else {
return maxHeap.peek();
}
}
}

public void medianMaintenance() {
MedianFinder finder = new MedianFinder();
int[] numbers = {5, 15, 1, 3};

for (int num : numbers) {
finder.addNum(num);
System.out.printf("添加 %d 后的中位数: %.1f%n", num, finder.findMedian());
}
}
}

PriorityQueue 性能特点

public class PriorityQueuePerformance {

// 容量优化
public static void capacityOptimization() {
// ❌ 错误:频繁扩容
PriorityQueue<Integer> badQueue = new PriorityQueue<>();
for (int i = 0; i < 1000; i++) {
badQueue.offer(i); // 多次扩容:11→22→33→49→73→109→...
}

// ✅ 正确:预估容量
int expectedSize = 1000;
PriorityQueue<Integer> goodQueue = new PriorityQueue<>(expectedSize);
for (int i = 0; i < 1000; i++) {
goodQueue.offer(i); // 只需一次扩容或不需要扩容
}
}

// 性能基准测试
@Benchmark
public void priorityQueueInsert() {
PriorityQueue<Integer> queue = new PriorityQueue<>(10_000);
Random random = new Random();

for (int i = 0; i < 10_000; i++) {
queue.offer(random.nextInt(100_000));
}
}

// 与其他数据结构对比
public static void performanceComparison() {
int size = 100_000;
Random random = new Random();
List<Integer> data = new ArrayList<>();
for (int i = 0; i < size; i++) {
data.add(random.nextInt(size));
}

// PriorityQueue
long start = System.nanoTime();
PriorityQueue<Integer> pq = new PriorityQueue<>(data);
while (!pq.isEmpty()) {
pq.poll();
}
long pqTime = System.nanoTime() - start;

// Arrays.sort (排序整个数组)
start = System.nanoTime();
int[] array = data.stream().mapToInt(Integer::intValue).toArray();
Arrays.sort(array);
long sortTime = System.nanoTime() - start;

System.out.printf("PriorityQueue排序时间: %.2f ms%n", pqTime / 1_000_000.0);
System.out.printf("Arrays.sort排序时间: %.2f ms%n", sortTime / 1_000_000.0);
}
}

🔄 Deque 双端队列详解

ArrayDeque - 高性能循环数组实现

ArrayDeque 是 Java 中最高效的双端队列实现,基于循环数组的数据结构:

核心数据结构

public class ArrayDeque<E> extends AbstractCollection<E>
implements Deque<E>, Cloneable, Serializable {

// 存储元素的数组
transient Object[] elements;

// 队列头部的索引
transient int head;

// 队列尾部的索引
transient int tail;

// 最小初始容量
private static final int MIN_INITIAL_CAPACITY = 8;
}

循环数组原理

public class ArrayDequeAnalysis {

// 循环索引计算
private int inc(int i, int modulus) {
if (++i >= modulus) i = 0;
return i;
}

private int dec(int i, int modulus) {
if (--i < 0) i = modulus - 1;
return i;
}

private int add(int i, int distance, int modulus) {
if ((i += distance) - modulus >= 0) {
i -= modulus;
}
return i;
}

// 头部插入 - O(1)
public void addFirst(E e) {
if (e == null)
throw new NullPointerException();
elements[head = (head - 1) & (elements.length - 1)] = e;
if (head == tail)
doubleCapacity(); // 队列满时扩容
}

// 尾部插入 - O(1)
public void addLast(E e) {
if (e == null)
throw new NullPointerException();
elements[tail] = e;
if ((tail = (tail + 1) & (elements.length - 1)) == head)
doubleCapacity(); // 队列满时扩容
}

// 头部删除 - O(1)
public E pollFirst() {
@SuppressWarnings("unchecked")
E result = (E) elements[head];
if (result == null)
return null;
elements[h = (h + 1) & (elements.length - 1)] = null; // 清理引用
return result;
}

// 尾部删除 - O(1)
public E pollLast() {
@SuppressWarnings("unchecked")
E result = (E) elements[(tail - 1) & (elements.length - 1)];
if (result == null)
return null;
elements[tail = (tail - 1) & (elements.length - 1)] = null; // 清理引用
return result;
}
}

循环数组可视化

ArrayDeque 循环数组布局(容量8):

初始状态:
┌───┬───┬───┬───┬───┬───┬───┬───┐
│ │ │ │ │ │ │ │ │
└───┴───┴───┴───┴───┴───┴───┴───┘
0 1 2 3 4 5 6 7
head=0, tail=0

添加 A, B, C:
┌───┬───┬───┬───┬───┬───┬───┬───┐
│ A │ B │ C │ │ │ │ │ │
└───┴───┴───┴───┴───┴───┴───┴───┘
0 1 2 3 4 5 6 7
head=0, tail=3

头部添加 X, 尾部添加 Y:
┌───┬───┬───┬───┬───┬───┬───┬───┐
│ A │ B │ C │ Y │ │ │ │ X │
└───┴───┴───┴───┴───┴───┴───┴───┘
0 1 2 3 4 5 6 7
head=7, tail=4

Deque 操作详解

public class DequeOperations {

public void demonstrateDequeOperations() {
Deque<String> deque = new ArrayDeque<>();

// === 队列操作 (FIFO) ===
System.out.println("=== 队列操作 ===");
deque.addLast("A"); // 入队
deque.addLast("B");
deque.addLast("C");
System.out.println("队列: " + deque); // [A, B, C]

String first = deque.pollFirst(); // 出队
System.out.println("出队: " + first); // A
System.out.println("剩余: " + deque); // [B, C]

// === 栈操作 (LIFO) ===
System.out.println("\n=== 栈操作 ===");
deque.clear();
deque.addFirst("X"); // 入栈
deque.addFirst("Y");
deque.addFirst("Z");
System.out.println("栈: " + deque); // [Z, Y, X]

String top = deque.pollFirst(); // 出栈
System.out.println("出栈: " + top); // Z
System.out.println("剩余: " + deque); // [Y, X]

// === 混合操作 ===
System.out.println("\n=== 混合操作 ===");
deque.clear();

// 头部操作
deque.addFirst("First");
deque.addFirst("Second");

// 尾部操作
deque.addLast("Third");
deque.addLast("Fourth");

System.out.println("最终队列: " + deque); // [Second, First, Third, Fourth]
System.out.println("头部: " + deque.getFirst()); // Second
System.out.println("尾部: " + deque.getLast()); // Fourth

// 两端删除
String headRemoved = deque.pollFirst();
String tailRemoved = deque.pollLast();
System.out.println("删除头部: " + headRemoved); // Second
System.out.println("删除尾部: " + tailRemoved); // Fourth
System.out.println("最终状态: " + deque); // [First, Third]
}

// 浏览器历史实现
public static class BrowserHistory {
private final Deque<String> backStack;
private final Deque<String> forwardStack;
private String currentPage;

public BrowserHistory() {
this.backStack = new ArrayDeque<>();
this.forwardStack = new ArrayDeque<>();
}

public void visit(String page) {
if (currentPage != null) {
backStack.push(currentPage);
}
currentPage = page;
forwardStack.clear(); // 访问新页面时清空前进栈
System.out.println("访问: " + currentPage);
}

public void back() {
if (backStack.isEmpty()) {
System.out.println("没有可后退的页面");
return;
}

forwardStack.push(currentPage);
currentPage = backStack.pop();
System.out.println("后退到: " + currentPage);
}

public void forward() {
if (forwardStack.isEmpty()) {
System.out.println("没有可前进的页面");
return;
}

backStack.push(currentPage);
currentPage = forwardStack.pop();
System.out.println("前进到: " + currentPage);
}

public void showHistory() {
System.out.println("当前页面: " + currentPage);
System.out.println("后退历史: " + backStack);
System.out.println("前进历史: " + forwardStack);
}
}

public void browserHistoryDemo() {
BrowserHistory browser = new BrowserHistory();

browser.visit("google.com");
browser.visit("github.com");
browser.visit("stackoverflow.com");

System.out.println("\n=== 浏览器历史 ===");
browser.showHistory();

System.out.println("\n=== 后退操作 ===");
browser.back(); // 回到 github.com
browser.back(); // 回到 google.com

System.out.println("\n=== 前进操作 ===");
browser.forward(); // 前进到 github.com
browser.visit("embracechw.top"); // 访问新页面

browser.showHistory();
}

// LRU缓存实现
public static class LRUCache<K, V> {
private final int capacity;
private final Deque<K> accessOrder;
private final Map<K, V> cache;

public LRUCache(int capacity) {
this.capacity = capacity;
this.accessOrder = new ArrayDeque<>();
this.cache = new HashMap<>();
}

public V get(K key) {
V value = cache.get(key);
if (value != null) {
// 更新访问顺序
accessOrder.remove(key);
accessOrder.addLast(key);
}
return value;
}

public void put(K key, V value) {
V existing = cache.get(key);

if (existing != null) {
// 更新现有值
cache.put(key, value);
// 更新访问顺序
accessOrder.remove(key);
accessOrder.addLast(key);
} else {
// 新增元素
if (cache.size() >= capacity) {
// 淘汰最少使用的元素
K lruKey = accessOrder.pollFirst();
if (lruKey != null) {
cache.remove(lruKey);
System.out.println("淘汰: " + lruKey);
}
}

cache.put(key, value);
accessOrder.addLast(key);
System.out.println("添加: " + key);
}
}

public void display() {
System.out.println("缓存状态: " + cache);
System.out.println("访问顺序: " + accessOrder);
}
}

public void lruCacheDemo() {
LRUCache<String, Integer> cache = new LRUCache<>(3);

cache.put("A", 1);
cache.put("B", 2);
cache.put("C", 3);
cache.display();

System.out.println("\n=== 访问 A ===");
cache.get("A");
cache.display();

System.out.println("\n=== 添加 D (触发淘汰) ===");
cache.put("D", 4);
cache.display();

System.out.println("\n=== 添加 E (触发淘汰) ===");
cache.put("E", 5);
cache.display();
}
}

🚀 并发队列机制

ConcurrentLinkedQueue - 无锁并发队列

public class ConcurrentLinkedQueueAnalysis {

// CAS操作核心
private static class Node<E> {
volatile E item;
volatile Node<E> next;

Node(E item) {
UNSAFE.putObject(this, itemOffset, item);
}

boolean casItem(E cmp, E val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}

void lazySetNext(Node<E> val) {
UNSAFE.putOrderedObject(this, nextOffset, val);
}
}

// 无锁入队操作
public boolean offer(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);

for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) {
// p是最后一个节点,尝试链接新节点
if (p.casNext(null, newNode)) {
// 链接成功,尝试更新tail
if (p != t) // hop two nodes at a time
casTail(t, newNode); // Failure is OK.
return true;
}
// CAS失败,重试
}
else if (p == q)
// 遇到哨兵节点,从head重新开始
p = (t != (t = tail)) ? t : head;
else
// 移动到下一个节点
p = (p != t && t != (t = tail)) ? t : q;
}
}

// 无锁出队操作
public E poll() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;

if (item != null && p.casItem(item, null)) {
// 成功取出item
if (p != h) // hop two nodes at a time
casHead(h, p);
return item;
}
else if ((q = p.next) == null) {
// 队列为空
return null;
}
else if (p == q) {
// 遇到哨兵节点,重新开始
continue restartFromHead;
}
else {
// 移动到下一个节点
p = q;
}
}
}
}
}

并发队列性能测试

public class ConcurrentQueuePerformance {

// 单线程性能测试
@Benchmark
public void singleThreadQueueTest() {
int operations = 1_000_000;

// ArrayDeque
long start = System.nanoTime();
Deque<Integer> arrayDeque = new ArrayDeque<>();
for (int i = 0; i < operations; i++) {
arrayDeque.offer(i);
if (i % 100 == 0) {
arrayDeque.poll();
}
}
long arrayDequeTime = System.nanoTime() - start;

// ConcurrentLinkedQueue
start = System.nanoTime();
Queue<Integer> concurrentQueue = new ConcurrentLinkedQueue<>();
for (int i = 0; i < operations; i++) {
concurrentQueue.offer(i);
if (i % 100 == 0) {
concurrentQueue.poll();
}
}
long concurrentQueueTime = System.nanoTime() - start;

System.out.printf("ArrayDeque: %.2f ms%n", arrayDequeTime / 1_000_000.0);
System.out.printf("ConcurrentLinkedQueue: %.2f ms%n", concurrentQueueTime / 1_000_000.0);
System.out.printf("性能差异: %.1fx%n", (double) concurrentQueueTime / arrayDequeTime);
}

// 多线程性能测试
public void multiThreadQueueTest() throws InterruptedException {
int threadCount = 4;
int operationsPerThread = 250_000;
CountDownLatch latch = new CountDownLatch(threadCount);

// 测试ArrayDeque(非线程安全)
Deque<Integer> unsafeDeque = new ArrayDeque<>();
ThreadUnsafeProducer[] unsafeProducers = new ThreadUnsafeProducer[threadCount];

for (int i = 0; i < threadCount; i++) {
unsafeProducers[i] = new ThreadUnsafeProducer(unsafeDeque, operationsPerThread, latch);
}

long unsafeStart = System.nanoTime();
for (ThreadUnsafeProducer producer : unsafeProducers) {
producer.start();
}
latch.await();
long unsafeTime = System.nanoTime() - unsafeStart;

// 测试ConcurrentLinkedQueue(线程安全)
Queue<Integer> safeQueue = new ConcurrentLinkedQueue<>();
CountDownLatch safeLatch = new CountDownLatch(threadCount);
ThreadSafeProducer[] safeProducers = new ThreadSafeProducer[threadCount];

for (int i = 0; i < threadCount; i++) {
safeProducers[i] = new ThreadSafeProducer(safeQueue, operationsPerThread, safeLatch);
}

long safeStart = System.nanoTime();
for (ThreadSafeProducer producer : safeProducers) {
producer.start();
}
safeLatch.await();
long safeTime = System.nanoTime() - safeStart;

System.out.printf("ArrayDeque (线程不安全): %.2f ms, 结果大小: %d%n",
unsafeTime / 1_000_000.0, unsafeDeque.size());
System.out.printf("ConcurrentLinkedQueue (线程安全): %.2f ms, 结果大小: %d%n",
safeTime / 1_000_000.0, safeQueue.size());
}

static class ThreadUnsafeProducer extends Thread {
private final Deque<Integer> deque;
private final int operations;
private final CountDownLatch latch;

ThreadUnsafeProducer(Deque<Integer> deque, int operations, CountDownLatch latch) {
this.deque = deque;
this.operations = operations;
this.latch = latch;
}

@Override
public void run() {
try {
for (int i = 0; i < operations; i++) {
deque.offer(i);
if (i % 10 == 0 && !deque.isEmpty()) {
deque.poll();
}
}
} finally {
latch.countDown();
}
}
}

static class ThreadSafeProducer extends Thread {
private final Queue<Integer> queue;
private final int operations;
private final CountDownLatch latch;

ThreadSafeProducer(Queue<Integer> queue, int operations, CountDownLatch latch) {
this.queue = queue;
this.operations = operations;
this.latch = latch;
}

@Override
public void run() {
try {
for (int i = 0; i < operations; i++) {
queue.offer(i);
if (i % 10 == 0) {
queue.poll();
}
}
} finally {
latch.countDown();
}
}
}
}

⏰ DelayQueue 延迟队列

DelayQueue 原理与实现

public class DelayQueueAnalysis<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {

// 内部使用PriorityQueue存储元素
private final transient PriorityQueue<E> q;

// 用于线程同步的锁
private final transient ReentrantLock lock = new ReentrantLock();

// 用于等待的线程
private final transient Condition available = lock.newCondition();

// 用于快速查询的leader线程
private transient Thread leader = null;

// 获取延迟元素
@Override
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null)
available.await(); // 队列为空,等待
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return q.poll(); // 延迟时间已到,返回元素

first = null; // don't retain ref while waiting

if (leader != null)
available.await(); // 已有leader线程,等待
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay); // 等待剩余时间
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal(); // 唤醒下一个等待线程
lock.unlock();
}
}
}

DelayQueue 应用实例

public class DelayedTaskExample {

// 延迟任务实现
public static class DelayedTask implements Delayed {
private final String name;
private final long executeTime;
private final long createTime;

public DelayedTask(String name, long delayMillis) {
this.name = name;
this.createTime = System.currentTimeMillis();
this.executeTime = this.createTime + delayMillis;
}

@Override
public long getDelay(TimeUnit unit) {
long remainingTime = executeTime - System.currentTimeMillis();
return unit.convert(remainingTime, TimeUnit.MILLISECONDS);
}

@Override
public int compareTo(Delayed o) {
if (this == o) return 0;

long diff = this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}

public void execute() {
System.out.printf("执行任务: %s, 创建时间: %d, 执行时间: %d, 延迟: %d ms%n",
name, createTime, System.currentTimeMillis(),
System.currentTimeMillis() - createTime);
}

@Override
public String toString() {
return String.format("DelayedTask{name='%s', delay=%d ms}",
name, executeTime - createTime);
}
}

// 任务调度器
public static class TaskScheduler {
private final DelayQueue<DelayedTask> delayQueue;
private final ExecutorService executor;
private volatile boolean running = false;

public TaskScheduler(int workerThreads) {
this.delayQueue = new DelayQueue<>();
this.executor = Executors.newFixedThreadPool(workerThreads);
}

public void start() {
running = true;

// 启动消费者线程
Thread consumer = new Thread(() -> {
while (running && !Thread.currentThread().isInterrupted()) {
try {
DelayedTask task = delayQueue.take(); // 阻塞等待到期任务
executor.submit(() -> {
try {
task.execute();
} catch (Exception e) {
System.err.println("任务执行失败: " + task.name);
e.printStackTrace();
}
});
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}, "DelayQueue-Consumer");

consumer.start();
System.out.println("任务调度器已启动");
}

public void schedule(String taskName, long delayMillis) {
DelayedTask task = new DelayedTask(taskName, delayMillis);
delayQueue.offer(task);
System.out.printf("已调度任务: %s, 延迟: %d ms%n", taskName, delayMillis);
}

public void stop() {
running = false;
executor.shutdown();
try {
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
System.out.println("任务调度器已停止");
}

// 获取队列状态
public QueueStatus getQueueStatus() {
List<DelayedTask> pendingTasks = new ArrayList<>();
delayQueue.drainTo(pendingTasks);

// 重新放回队列
for (DelayedTask task : pendingTasks) {
delayQueue.offer(task);
}

return new QueueStatus(pendingTasks.size(), delayQueue.size());
}
}

public static class QueueStatus {
private final int pendingCount;
private final int totalCount;

public QueueStatus(int pendingCount, int totalCount) {
this.pendingCount = pendingCount;
this.totalCount = totalCount;
}

@Override
public String toString() {
return String.format("QueueStatus{pending=%d, total=%d}", pendingCount, totalCount);
}
}

public void delayQueueDemo() throws InterruptedException {
TaskScheduler scheduler = new TaskScheduler(2);
scheduler.start();

// 添加不同延迟的任务
scheduler.schedule("即时任务", 0);
scheduler.schedule("1秒后任务", 1000);
scheduler.schedule("3秒后任务", 3000);
scheduler.schedule("2秒后任务", 2000);
scheduler.schedule("5秒后任务", 5000);

// 定期查看队列状态
ScheduledExecutorService monitor = Executors.newSingleThreadScheduledExecutor();
monitor.scheduleAtFixedRate(() -> {
QueueStatus status = scheduler.getQueueStatus();
System.out.println("队列状态: " + status);
}, 1, 1, TimeUnit.SECONDS);

// 运行8秒后停止
Thread.sleep(8000);
scheduler.stop();
monitor.shutdown();
}

// 订单超时处理
public static class OrderTimeoutManager {
private final DelayQueue<OrderTimeout> timeoutQueue;
private final Map<String, Order> activeOrders;
private final ExecutorService timeoutExecutor;

public OrderTimeoutManager() {
this.timeoutQueue = new DelayQueue<>();
this.activeOrders = new ConcurrentHashMap<>();
this.timeoutExecutor = Executors.newSingleThreadExecutor();
startTimeoutProcessor();
}

// 订单类
public static class Order {
private final String orderId;
private final String customerId;
private final BigDecimal amount;
private final long createTime;
private volatile OrderStatus status;

public Order(String orderId, String customerId, BigDecimal amount) {
this.orderId = orderId;
this.customerId = customerId;
this.amount = amount;
this.createTime = System.currentTimeMillis();
this.status = OrderStatus.PENDING;
}

// getters and setters...
}

// 订单超时事件
private static class OrderTimeout implements Delayed {
private final String orderId;
private final long timeoutTime;

public OrderTimeout(String orderId, long timeoutMillis) {
this.orderId = orderId;
this.timeoutTime = System.currentTimeMillis() + timeoutMillis;
}

@Override
public long getDelay(TimeUnit unit) {
long remaining = timeoutTime - System.currentTimeMillis();
return unit.convert(remaining, TimeUnit.MILLISECONDS);
}

@Override
public int compareTo(Delayed o) {
return Long.compare(this.timeoutTime, ((OrderTimeout) o).timeoutTime);
}
}

// 订单状态枚举
public enum OrderStatus {
PENDING, CONFIRMED, CANCELLED, TIMEOUT
}

public void createOrder(Order order, long timeoutMillis) {
activeOrders.put(order.getOrderId(), order);
OrderTimeout timeout = new OrderTimeout(order.getOrderId(), timeoutMillis);
timeoutQueue.offer(timeout);
System.out.printf("创建订单: %s, 超时时间: %d 分钟%n",
order.getOrderId(), timeoutMillis / 60000);
}

public void confirmOrder(String orderId) {
Order order = activeOrders.get(orderId);
if (order != null && order.getStatus() == OrderStatus.PENDING) {
order.setStatus(OrderStatus.CONFIRMED);
System.out.printf("订单确认: %s%n", orderId);
}
}

public void cancelOrder(String orderId) {
Order order = activeOrders.remove(orderId);
if (order != null) {
order.setStatus(OrderStatus.CANCELLED);
System.out.printf("订单取消: %s%n", orderId);
}
}

private void startTimeoutProcessor() {
timeoutExecutor.submit(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
OrderTimeout timeout = timeoutQueue.take();
Order order = activeOrders.get(timeout.orderId);

if (order != null && order.getStatus() == OrderStatus.PENDING) {
order.setStatus(OrderStatus.TIMEOUT);
activeOrders.remove(timeout.orderId);
System.out.printf("订单超时: %s, 金额: %.2f%n",
order.getOrderId(), order.getAmount());

// 这里可以发送通知、退款等处理
handleOrderTimeout(order);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});
}

private void handleOrderTimeout(Order order) {
// 超时处理逻辑
System.out.printf("处理超时订单: 发送退款通知给客户 %s%n", order.getCustomerId());
}

public void shutdown() {
timeoutExecutor.shutdown();
try {
if (!timeoutExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
timeoutExecutor.shutdownNow();
}
} catch (InterruptedException e) {
timeoutExecutor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}

public void orderTimeoutDemo() throws InterruptedException {
OrderTimeoutManager timeoutManager = new OrderTimeoutManager();

// 创建一些订单
Order order1 = new Order("ORD001", "CUST001", new BigDecimal("299.00"));
Order order2 = new Order("ORD002", "CUST002", new BigDecimal("1599.00"));
Order order3 = new Order("ORD003", "CUST003", new BigDecimal("89.00"));

timeoutManager.createOrder(order1, 3000); // 3秒超时
timeoutManager.createOrder(order2, 5000); // 5秒超时
timeoutManager.createOrder(order3, 2000); // 2秒超时

// 模拟订单确认
Thread.sleep(1000);
timeoutManager.confirmOrder("ORD001");

// 等待处理完成
Thread.sleep(6000);

timeoutManager.shutdown();
}
}

🎯 Queue 实战应用与最佳实践

1️⃣ 生产者-消费者模式

public class ProducerConsumerPattern {

// 基础的生产者-消费者实现
public static class BasicProducerConsumer<T> {
private final BlockingQueue<T> queue;
private final ExecutorService executor;

public BasicProducerConsumer(int queueCapacity, int consumerCount) {
this.queue = new ArrayBlockingQueue<>(queueCapacity);
this.executor = Executors.newFixedThreadPool(consumerCount);
}

public void startConsumers(Consumer<T> consumer) {
for (int i = 0; i < consumerCount; i++) {
executor.submit(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
T item = queue.take(); // 阻塞等待
consumer.accept(item);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});
}
}

public boolean produce(T item) {
try {
return queue.offer(item, 1, TimeUnit.SECONDS); // 1秒超时
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}

public void shutdown() {
executor.shutdown();
try {
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}

// 日志处理系统
public static class LogProcessingSystem {
private final BlockingQueue<LogEvent> logQueue;
private final ExecutorService processorPool;
private final AtomicBoolean running = new AtomicBoolean(true);

public static class LogEvent {
private final String message;
private final LogLevel level;
private final long timestamp;
private final String threadName;

public LogEvent(String message, LogLevel level) {
this.message = message;
this.level = level;
this.timestamp = System.currentTimeMillis();
this.threadName = Thread.currentThread().getName();
}

// getters...
}

public enum LogLevel {
DEBUG, INFO, WARN, ERROR
}

public LogProcessingSystem(int queueSize, int processorCount) {
this.logQueue = new LinkedBlockingQueue<>(queueSize);
this.processorPool = Executors.newFixedThreadPool(processorCount);
startProcessors();
}

private void startProcessors() {
for (int i = 0; i < processorPool.getMaximumPoolSize(); i++) {
processorPool.submit(new LogProcessor());
}
}

private class LogProcessor implements Runnable {
@Override
public void run() {
List<LogEvent> batch = new ArrayList<>(100);

while (running.get() || !logQueue.isEmpty()) {
try {
// 批量拉取日志事件
LogEvent first = logQueue.poll(1, TimeUnit.SECONDS);
if (first != null) {
batch.add(first);

// 继续拉取更多事件(非阻塞)
logQueue.drainTo(batch, 99);

// 批量处理
processBatch(batch);
batch.clear();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}

private void processBatch(List<LogEvent> events) {
// 按级别分组处理
Map<LogLevel, List<LogEvent>> eventsByLevel = events.stream()
.collect(Collectors.groupingBy(LogEvent::getLevel));

// 处理错误日志(需要立即处理)
eventsByLevel.getOrDefault(LogLevel.ERROR, Collections.emptyList())
.forEach(this::processErrorLog);

// 处理警告日志
eventsByLevel.getOrDefault(LogLevel.WARN, Collections.emptyList())
.forEach(this::processWarnLog);

// 处理信息日志
eventsByLevel.getOrDefault(LogLevel.INFO, Collections.emptyList())
.forEach(this::processInfoLog);

// 处理调试日志(低优先级)
eventsByLevel.getOrDefault(LogLevel.DEBUG, Collections.emptyList())
.forEach(this::processDebugLog);
}

private void processErrorLog(LogEvent event) {
System.err.printf("[ERROR] %s - %s: %s%n",
new Date(event.getTimestamp()),
event.getThreadName(),
event.getMessage());
// 可以发送到错误监控系统、发送邮件等
}

private void processWarnLog(LogEvent event) {
System.out.printf("[WARN] %s - %s: %s%n",
new Date(event.getTimestamp()),
event.getThreadName(),
event.getMessage());
}

private void processInfoLog(LogEvent event) {
System.out.printf("[INFO] %s - %s: %s%n",
new Date(event.getTimestamp()),
event.getThreadName(),
event.getMessage());
}

private void processDebugLog(LogEvent event) {
// 调试日志可以选择性处理或丢弃
// System.out.printf("[DEBUG] %s: %s%n",
// event.getThreadName(), event.getMessage());
}
}

public void log(String message, LogLevel level) {
LogEvent event = new LogEvent(message, level);
if (!logQueue.offer(event)) {
System.err.println("日志队列已满,丢弃日志: " + message);
}
}

public void shutdown() {
running.set(false);
processorPool.shutdown();
try {
if (!processorPool.awaitTermination(10, TimeUnit.SECONDS)) {
processorPool.shutdownNow();
}
} catch (InterruptedException e) {
processorPool.shutdownNow();
Thread.currentThread().interrupt();
}
}
}

public void logProcessingDemo() throws InterruptedException {
LogProcessingSystem logSystem = new LogProcessingSystem(1000, 2);

// 启动日志生产者
ExecutorService producers = Executors.newFixedThreadPool(3);

producers.submit(() -> {
for (int i = 0; i < 50; i++) {
logSystem.log("这是一条信息日志 " + i, LogProcessingSystem.LogLevel.INFO);
try { Thread.sleep(100); } catch (InterruptedException e) {}
}
});

producers.submit(() -> {
for (int i = 0; i < 20; i++) {
logSystem.log("这是一条警告日志 " + i, LogProcessingSystem.LogLevel.WARN);
try { Thread.sleep(200); } catch (InterruptedException e) {}
}
});

producers.submit(() -> {
for (int i = 0; i < 10; i++) {
logSystem.log("这是一条错误日志 " + i, LogProcessingSystem.LogLevel.ERROR);
try { Thread.sleep(500); } catch (InterruptedException e) {}
}
});

// 运行一段时间后关闭
Thread.sleep(5000);
producers.shutdown();
logSystem.shutdown();
}
}

2️⃣ 限流器实现

public class RateLimiterImplementations {

// 基于队列的令牌桶限流器
public static class TokenBucketRateLimiter {
private final BlockingQueue<Token> bucket;
private final ScheduledExecutorService refillScheduler;
private final int capacity;
private final int refillRate;

private static class Token {
private final long timestamp;

Token() {
this.timestamp = System.currentTimeMillis();
}
}

public TokenBucketRateLimiter(int capacity, int refillRatePerSecond) {
this.capacity = capacity;
this.refillRate = refillRatePerSecond;
this.bucket = new ArrayBlockingQueue<>(capacity);
this.refillScheduler = Executors.newSingleThreadScheduledExecutor();

// 初始化桶
for (int i = 0; i < capacity; i++) {
bucket.offer(new Token());
}

// 定期补充令牌
refillScheduler.scheduleAtFixedRate(() -> {
int currentSize = bucket.size();
int tokensToAdd = Math.min(refillRate, capacity - currentSize);

for (int i = 0; i < tokensToAdd; i++) {
if (!bucket.offer(new Token())) {
break; // 桶已满
}
}
}, 1, 1, TimeUnit.SECONDS);
}

public boolean tryAcquire() {
return bucket.poll() != null;
}

public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException {
return bucket.poll(timeout, unit) != null;
}

public void acquire() throws InterruptedException {
bucket.put(new Token()); // 阻塞直到获得令牌
}

public int getAvailableTokens() {
return bucket.size();
}

public void shutdown() {
refillScheduler.shutdown();
try {
if (!refillScheduler.awaitTermination(1, TimeUnit.SECONDS)) {
refillScheduler.shutdownNow();
}
} catch (InterruptedException e) {
refillScheduler.shutdownNow();
Thread.currentThread().interrupt();
}
}
}

// 漏桶限流器(使用队列作为缓冲区)
public static class LeakyBucketRateLimiter {
private final BlockingQueue<Runnable> bucket;
private final ScheduledExecutorService leakScheduler;
private final int capacity;
private final int leakRate;

public LeakyBucketRateLimiter(int capacity, int leakRatePerSecond) {
this.capacity = capacity;
this.leakRate = leakRatePerSecond;
this.bucket = new LinkedBlockingQueue<>(capacity);
this.leakScheduler = Executors.newSingleThreadScheduledExecutor();

// 定期"漏水" - 处理队列中的任务
leakScheduler.scheduleAtFixedRate(() -> {
int processed = 0;
while (processed < leakRate && !bucket.isEmpty()) {
Runnable task = bucket.poll();
if (task != null) {
try {
task.run();
processed++;
} catch (Exception e) {
System.err.println("任务执行失败: " + e.getMessage());
}
}
}
}, 0, 1, TimeUnit.SECONDS);
}

public boolean submit(Runnable task) {
return bucket.offer(task);
}

public boolean submit(Runnable task, long timeout, TimeUnit unit) throws InterruptedException {
return bucket.offer(task, timeout, unit);
}

public int getQueueSize() {
return bucket.size();
}

public void shutdown() {
leakScheduler.shutdown();
try {
if (!leakScheduler.awaitTermination(1, TimeUnit.SECONDS)) {
leakScheduler.shutdownNow();
}
} catch (InterruptedException e) {
leakScheduler.shutdownNow();
Thread.currentThread().interrupt();
}
}
}

// 滑动窗口限流器
public static class SlidingWindowRateLimiter {
private final Deque<Long> requestTimestamps;
private final int maxRequests;
private final long windowSizeMillis;

public SlidingWindowRateLimiter(int maxRequests, long windowSizeMillis) {
this.maxRequests = maxRequests;
this.windowSizeMillis = windowSizeMillis;
this.requestTimestamps = new ArrayDeque<>();
}

public synchronized boolean tryAcquire() {
long now = System.currentTimeMillis();

// 清理过期的请求时间戳
while (!requestTimestamps.isEmpty() &&
now - requestTimestamps.peekFirst() > windowSizeMillis) {
requestTimestamps.pollFirst();
}

// 检查是否超过限制
if (requestTimestamps.size() < maxRequests) {
requestTimestamps.offerLast(now);
return true;
}

return false;
}

public synchronized int getCurrentRequests() {
long now = System.currentTimeMillis();

// 清理过期的请求时间戳
while (!requestTimestamps.isEmpty() &&
now - requestTimestamps.peekFirst() > windowSizeMillis) {
requestTimestamps.pollFirst();
}

return requestTimestamps.size();
}

public synchronized long getWaitTimeMillis() {
if (requestTimestamps.size() < maxRequests) {
return 0;
}

long oldestRequest = requestTimestamps.peekFirst();
return windowSizeMillis - (System.currentTimeMillis() - oldestRequest);
}
}

public void rateLimiterDemo() throws InterruptedException {
System.out.println("=== 令牌桶限流器测试 ===");
TokenBucketRateLimiter tokenBucket = new TokenBucketRateLimiter(10, 2); // 容量10,每秒补充2个

for (int i = 0; i < 15; i++) {
if (tokenBucket.tryAcquire()) {
System.out.println("请求 " + (i + 1) + " 通过");
} else {
System.out.println("请求 " + (i + 1) + " 被拒绝");
}
Thread.sleep(200);
}

System.out.println("\n=== 滑动窗口限流器测试 ===");
SlidingWindowRateLimiter slidingWindow = new SlidingWindowRateLimiter(5, 1000); // 每秒最多5个请求

for (int i = 0; i < 10; i++) {
if (slidingWindow.tryAcquire()) {
System.out.println("请求 " + (i + 1) + " 通过,当前请求数: " + slidingWindow.getCurrentRequests());
} else {
System.out.println("请求 " + (i + 1) + " 被拒绝,等待时间: " + slidingWindow.getWaitTimeMillis() + "ms");
}
Thread.sleep(200);
}

tokenBucket.shutdown();
}
}

⚡ 性能优化指南

1. 队列选择策略

public class QueuePerformanceOptimization {

// 不同场景下的最优队列选择
public static Queue<Integer> selectOptimalQueue(QueueScenario scenario, int expectedSize) {
switch (scenario) {
case HIGH_CONCURRENCY:
return new ConcurrentLinkedQueue<>();

case BOUNDED_CAPACITY:
return new ArrayBlockingQueue<>(expectedSize);

case PRIORITY_PROCESSING:
PriorityQueue<Integer> pq = new PriorityQueue<>(expectedSize);
return pq;

case LIFO_OPERATIONS:
return new ArrayDeque<>();

case MEMORY_EFFICIENT:
return new PriorityQueue<>(); // 小顶堆通常内存效率高

case SEQUENTIAL_ACCESS:
return new ArrayDeque<>();

default:
return new ArrayDeque<>();
}
}

public enum QueueScenario {
HIGH_CONCURRENCY, // 高并发
BOUNDED_CAPACITY, // 有界容量
PRIORITY_PROCESSING, // 优先级处理
LIFO_OPERATIONS, // 后进先出
MEMORY_EFFICIENT, // 内存高效
SEQUENTIAL_ACCESS // 顺序访问
}

// 批量预分配优化
public static void capacityOptimization() {
int expectedSize = 10000;

// ❌ 错误:频繁扩容
Queue<String> badQueue = new PriorityQueue<>();
for (int i = 0; i < expectedSize; i++) {
badQueue.offer("item" + i); // 可能多次扩容
}

// ✅ 正确:预估容量
Queue<String> goodQueue = new PriorityQueue<>(expectedSize);
for (int i = 0; i < expectedSize; i++) {
goodQueue.offer("item" + i); // 避免扩容
}

// 对于ArrayBlockingQueue,容量必须预先指定
BlockingQueue<String> boundedQueue = new ArrayBlockingQueue<>(expectedSize);
}

// 批量操作优化
public static class BatchQueueProcessor<T> {
private final BlockingQueue<T> queue;
private final int batchSize;
private final Consumer<List<T>> batchProcessor;
private final ExecutorService executor;

public BatchQueueProcessor(BlockingQueue<T> queue, int batchSize,
Consumer<List<T>> batchProcessor) {
this.queue = queue;
this.batchSize = batchSize;
this.batchProcessor = batchProcessor;
this.executor = Executors.newSingleThreadExecutor();
}

public void start() {
executor.submit(() -> {
List<T> batch = new ArrayList<>(batchSize);

while (!Thread.currentThread().isInterrupted()) {
try {
// 拉取第一个元素(阻塞)
T first = queue.poll(1, TimeUnit.SECONDS);
if (first != null) {
batch.add(first);

// 继续拉取剩余元素(非阻塞)
queue.drainTo(batch, batchSize - 1);

// 处理批次
if (!batch.isEmpty()) {
batchProcessor.accept(new ArrayList<>(batch));
batch.clear();
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}

// 处理剩余元素
if (!batch.isEmpty()) {
batchProcessor.accept(batch);
}
});
}

public void shutdown() {
executor.shutdown();
try {
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}

// 内存优化技巧
public static class MemoryOptimizedQueues {

// 使用原始类型队列(需要第三方库)
public void primitiveTypeQueue() {
// 使用 fastutil 库
// IntQueue intQueue = new IntLinkedOpenQueue();
// for (int i = 0; i < 1000000; i++) {
// intQueue.enqueue(i);
// }
}

// 及时清理队列引用
public static <T> void clearQueueWhenEmpty(Queue<T> queue, int checkInterval) {
if (queue.isEmpty()) {
// 对于大队列,显式清理有助于GC
if (queue instanceof PriorityQueue || queue instanceof ArrayDeque) {
// 这些实现可能有内部数组缓存
// 可以考虑重新创建以释放内存
// queue = new ArrayDeque<>();
}
}
}

// 使用WeakReference避免内存泄漏
public static class WeakReferenceQueue<T> {
private final Queue<WeakReference<T>> queue = new ConcurrentLinkedQueue<>();
private final ReferenceQueue<T> referenceQueue = new ReferenceQueue<>();

public void offer(T item) {
queue.offer(new WeakReference<>(item, referenceQueue));
cleanupReferences();
}

public T poll() {
cleanupReferences();

while (!queue.isEmpty()) {
WeakReference<T> ref = queue.poll();
T item = ref.get();
if (item != null) {
return item;
}
}
return null;
}

private void cleanupReferences() {
Reference<? extends T> ref;
while ((ref = referenceQueue.poll()) != null) {
// 从队列中移除已被GC的引用
queue.remove(ref);
}
}
}
}
}

📚 Queue 面试通关清单

🎯 基础概念 (必须掌握)

  • Queue vs Deque 核心区别
  • FIFO 和 LIFO 原则
  • 不同队列实现的适用场景
  • 阻塞队列和非阻塞队列的区别

🚀 进阶知识 (面试加分)

  • PriorityQueue 堆排序实现
  • ArrayDeque 循环数组原理
  • ConcurrentLinkedQueue CAS无锁机制
  • DelayQueue 时间驱动队列
  • 队列的扩容机制

🔧 实战能力 (项目应用)

  • 生产者-消费者模式实现
  • 限流器设计与实现
  • 任务调度器开发
  • 性能优化技巧
  • 内存泄漏避免

💎 总结

Queue 作为Java集合框架的重要组件,在数据流转、任务调度、流量控制等方面发挥着关键作用:

核心要点回顾

  • 🎯 ArrayDeque 为主力:高性能双端队列的首选实现
  • 🚀 PriorityQueue 提供排序:基于堆的优先级处理能力
  • 🔄 ConcurrentLinkedQueue 高并发:无锁并发场景的最佳选择
  • DelayQueue 时间驱动:延时任务和超时处理的利器
  • 📦 阻塞队列背压控制:生产者-消费者模式的核心组件
  • 性能优化从预估开始:避免频繁扩容,选择合适的数据结构

实战建议

  1. 单线程高性能:优先选择 ArrayDeque
  2. 需要优先级:使用 PriorityQueue,注意容量预分配
  3. 高并发场景:选择 ConcurrentLinkedQueue 等无锁实现
  4. 需要阻塞机制:使用 ArrayBlockingQueueLinkedBlockingQueue
  5. 时间驱动处理DelayQueue 是延时任务的完美解决方案
  6. 批量操作优化:使用 drainTo 等批量方法提升性能

掌握了这些知识点,你就可以在面试和实际开发中游刃有余地使用Queue和Deque了!🚀

学习建议:先掌握 ArrayDeque 的循环数组原理,再理解 PriorityQueue 的堆结构,最后学习并发队列的无锁机制。重点关注不同队列的性能特征和适用场景。

记住:选择正确的队列数据结构比算法优化更重要!