跳到主要内容

Java 并发集合完全指南

"并发集合是高性能Java应用的基石" —— 掌握JUC集合框架,构建安全高效的多线程应用

🎯 快速索引

🔄 并发编程基础

线程安全的挑战

// ❌ 非线程安全示例:ArrayList在多线程环境下的问题
public class UnsafeExample {
private final List<Integer> list = new ArrayList<>();

public void addElement(int element) {
list.add(element); // 多线程同时调用可能导致数据丢失或ArrayIndexOutOfBoundsException
}

public int getSize() {
return list.size(); // 可能读取到不一致的状态
}
}

// ✅ 线程安全解决方案
public class SafeExample {
// 方案1:使用ConcurrentHashMap的KeySet
private final Set<Integer> concurrentSet = ConcurrentHashMap.newKeySet();

// 方案2:使用CopyOnWriteArrayList(读多写少)
private final List<Integer> copyOnWriteList = new CopyOnWriteArrayList<>();

// 方案3:使用Collections.synchronizedList(性能较差)
private final List<Integer> synchronizedList = Collections.synchronizedList(new ArrayList<>());

public void addElement(int element) {
concurrentSet.add(element); // 高性能,无锁
copyOnWriteList.add(element); // 读无锁,写复制
synchronizedList.add(element); // 全局同步,性能差
}
}

并发集合的核心价值

特性传统集合同步并发集合性能优势
锁粒度整个对象加锁细粒度锁/无锁减少锁竞争
读操作需要获取锁通常无锁读性能大幅提升
写操作串行执行并发执行写性能显著改善
迭代器需要额外同步弱一致性避免ConcurrentModificationException
特殊语义需要手动实现内置支持阻塞、超时、原子操作

🏗️ 并发集合架构概览

JUC集合框架全景

设计原则与技术选型

设计模式适用场景典型实现性能特征
分段锁高并发读写ConcurrentHashMap (JDK7)读写并发,读多写少
CAS无锁高频操作,低竞争ConcurrentLinkedQueue延迟低,CPU友好
写时复制读远多于写CopyOnWriteArrayList读极快,写较慢
阻塞机制生产者-消费者BlockingQueue家族自动背压控制
跳表结构需要排序ConcurrentSkipListMap有序,性能稳定

⚙️ 核心设计原理

1. CAS无锁算法

public class CASPrinciple {

// 模拟CAS操作
public static boolean compareAndSet(int[] array, int index, int expected, int update) {
synchronized (array) { // 实际CAS是原子操作,这里仅为演示
if (array[index] == expected) {
array[index] = update;
return true;
}
return false;
}
}

// 实际应用:并发计数器
private final AtomicInteger counter = new AtomicInteger(0);

public void increment() {
int oldValue, newValue;
do {
oldValue = counter.get(); // 获取当前值
newValue = oldValue + 1; // 计算新值
} while (!counter.compareAndSet(oldValue, newValue)); // CAS操作直到成功
}

// 无锁栈的实现
private static class ConcurrentStack<E> {
private final AtomicReference<Node<E>> top = new AtomicReference<>();

private static class Node<E> {
final E item;
volatile Node<E> next;

Node(E item) {
this.item = item;
}
}

public void push(E item) {
Node<E> newNode = new Node<>(item);
Node<E> oldNode;
do {
oldNode = top.get();
newNode.next = oldNode;
} while (!top.compareAndSet(oldNode, newNode));
}

public E pop() {
Node<E> oldNode, newNode;
do {
oldNode = top.get();
if (oldNode == null) {
return null;
}
newNode = oldNode.next;
} while (!top.compareAndSet(oldNode, newNode));
return oldNode.item;
}
}
}

2. 写时复制(Copy-On-Write)机制

public class CopyOnWriteMechanism {

// 简化的CopyOnWriteArrayList实现
private static class SimpleCopyOnWriteList<E> {
private volatile Object[] array = new Object[0];
private final ReentrantLock lock = new ReentrantLock();

public E get(int index) {
return getArray()[index]; // 读取操作无需同步
}

public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1); // 复制数组
newElements[len] = e; // 修改新数组
setArray(newElements); // 原子替换
return true;
} finally {
lock.unlock();
}
}

private Object[] getArray() {
return array;
}

private void setArray(Object[] a) {
array = a;
}

// 快照迭代器 - 永远不会抛出ConcurrentModificationException
public Iterator<E> iterator() {
return new SnapshotIterator<>(getArray().clone());
}

private static class SnapshotIterator<E> implements Iterator<E> {
private final Object[] snapshot;
private int cursor;

SnapshotIterator(Object[] snapshot) {
this.snapshot = snapshot;
}

@Override
public boolean hasNext() {
return cursor < snapshot.length;
}

@Override
@SuppressWarnings("unchecked")
public E next() {
return (E) snapshot[cursor++];
}
}
}
}

3. 分段锁机制

public class SegmentedLockPrinciple {

// 简化的分段锁实现
private static class SegmentedConcurrentMap<K, V> {
private final int segmentCount;
private final Segment<K, V>[] segments;

@SuppressWarnings("unchecked")
public SegmentedConcurrentMap(int segmentCount) {
this.segmentCount = segmentCount;
this.segments = new Segment[segmentCount];
for (int i = 0; i < segmentCount; i++) {
segments[i] = new Segment<>();
}
}

private static class Segment<K, V> {
private final ReentrantLock lock = new ReentrantLock();
private final HashMap<K, V> map = new HashMap<>();

public V put(K key, V value) {
lock.lock();
try {
return map.put(key, value);
} finally {
lock.unlock();
}
}

public V get(K key) {
lock.lock();
try {
return map.get(key);
} finally {
lock.unlock();
}
}
}

private Segment<K, V> segmentForKey(Object key) {
int hash = key.hashCode();
int segmentIndex = Math.abs(hash % segmentCount);
return segments[segmentIndex];
}

public V put(K key, V value) {
return segmentForKey(key).put(key, value);
}

public V get(K key) {
return segmentForKey(key).get(key);
}
}

// JDK 8+ 优化:桶级别锁
public static class BucketLockExample {
// ConcurrentHashMap在JDK 8中的优化策略
// 不再使用分段锁,而是在链表/红黑树的头部加锁
// 这减少了锁的数量,提高了空间利用率

public static void optimizedConcurrentAccess() {
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();

// 不同key的哈希值可能落在一个桶中,产生锁竞争
// 但相比全局锁,竞争大大减少
map.put("key1", 1);
map.put("key2", 2);
map.put("key3", 3);

// 读操作几乎无锁
Integer value = map.get("key1");

// 复合操作推荐使用原子方法
map.computeIfAbsent("key4", k -> k.length());
map.merge("key5", 1, Integer::sum);
}
}
}

🗺️ ConcurrentHashMap深度解析

JDK 8+ 核心优化

public class ConcurrentHashMapDeepDive {

// 1. 核心数据结构
/*
+-------------------+
| Node[] table | // 哈希表数组
+-------------------+
| sizeCtl | // 控制大小和扩容
+-------------------+
| transferIndex | // 扩容时的索引
+-------------------+
*/

// 2. 节点类型
static class NodeTypes {
// 普通节点
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
}

// TreeNode - 红黑树节点
static class TreeNode<K,V> extends Node<K,V> {
TreeNode<K,V> parent;
TreeNode<K,V> left;
TreeNode<K,V> right;
TreeNode<K,V> prev;
boolean red;
}

// TreeBin - 红黑树的头节点
static final class TreeBin<K,V> extends Node<K,V> {
volatile TreeNode<K,V> root;
volatile TreeNode<K,V> first;
volatile Thread waiter;
volatile int lockState;
}

// ForwardingNode - 扩容时的转发节点
static final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K,V>[] nextTable;
}
}

// 3. 实战应用:高性能缓存
public static class HighPerformanceCache<K, V> {
private final ConcurrentHashMap<K, CacheEntry<V>> cache;
private final ScheduledExecutorService cleanupExecutor;
private final long expireAfterWriteMillis;

public HighPerformanceCache(int initialCapacity, long expireAfterWriteMillis) {
this.cache = new ConcurrentHashMap<>(initialCapacity);
this.expireAfterWriteMillis = expireAfterWriteMillis;
this.cleanupExecutor = Executors.newSingleThreadScheduledExecutor();

// 定期清理过期条目
cleanupExecutor.scheduleAtFixedRate(this::cleanupExpiredEntries,
expireAfterWriteMillis, expireAfterWriteMillis, TimeUnit.MILLISECONDS);
}

public void put(K key, V value) {
CacheEntry<V> entry = new CacheEntry<>(value, System.currentTimeMillis() + expireAfterWriteMillis);
cache.put(key, entry);
}

public V get(K key) {
CacheEntry<V> entry = cache.get(key);
if (entry == null || entry.isExpired()) {
cache.remove(key);
return null;
}
return entry.getValue();
}

public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) {
return cache.computeIfAbsent(key, k -> {
V value = mappingFunction.apply(k);
return new CacheEntry<>(value, System.currentTimeMillis() + expireAfterWriteMillis);
}).getValue();
}

private void cleanupExpiredEntries() {
long currentTime = System.currentTimeMillis();
cache.forEach((key, entry) -> {
if (entry.isExpired()) {
cache.remove(key, entry);
}
});
}

private static class CacheEntry<V> {
private final V value;
private final long expireTime;

public CacheEntry(V value, long expireTime) {
this.value = value;
this.expireTime = expireTime;
}

public V getValue() {
return value;
}

public boolean isExpired() {
return System.currentTimeMillis() > expireTime;
}
}

public void shutdown() {
cleanupExecutor.shutdown();
}
}

// 4. 分布式计数器
public static class DistributedCounter {
private final ConcurrentHashMap<String, LongAdder> counters;

public DistributedCounter() {
this.counters = new ConcurrentHashMap<>();
}

public void increment(String key) {
counters.computeIfAbsent(key, k -> new LongAdder()).increment();
}

public long getCount(String key) {
LongAdder adder = counters.get(key);
return adder != null ? adder.sum() : 0L;
}

public Map<String, Long> getAllCounts() {
return counters.entrySet()
.stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
entry -> entry.getValue().sum()
));
}

public void reset(String key) {
counters.remove(key);
}

public void resetAll() {
counters.clear();
}
}
}

性能调优技巧

public class ConcurrentHashMapTuning {

// 1. 容量预分配
public void capacityPreAllocation() {
// 根据预期数据量和负载因子计算初始容量
int expectedSize = 100000;
float loadFactor = 0.75f;
int initialCapacity = (int) (expectedSize / loadFactor) + 1;

ConcurrentHashMap<String, String> optimizedMap =
new ConcurrentHashMap<>(initialCapacity, loadFactor, 16); // 16并发级别

// 避免扩容带来的性能损耗
for (int i = 0; i < expectedSize; i++) {
optimizedMap.put("key" + i, "value" + i);
}
}

// 2. 并发级别调优
public void concurrencyLevelTuning() {
int processorCount = Runtime.getRuntime().availableProcessors();

// JDK 7: 并发级别通常是CPU核心数的4倍
// ConcurrentHashMap map = new ConcurrentHashMap(16, 0.75f, processorCount * 4);

// JDK 8+: 不再需要设置并发级别,JVM会自动优化
ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
}

// 3. 使用原子操作避免复合操作竞争
public void atomicOperations() {
ConcurrentHashMap<String, AtomicInteger> counterMap = new ConcurrentHashMap<>();

// ❌ 错误方式:存在竞争条件
// AtomicInteger counter = counterMap.get("key");
// if (counter == null) {
// counterMap.put("key", new AtomicInteger(1));
// } else {
// counter.incrementAndGet();
// }

// ✅ 正确方式:使用原子方法
counterMap.computeIfAbsent("key", k -> new AtomicInteger(0)).incrementAndGet();

// 或者使用merge方法
counterMap.merge("key", new AtomicInteger(1), (oldVal, newVal) -> {
oldVal.addAndGet(newVal.get());
return oldVal;
});
}

// 4. 批量操作优化
public void batchOperations() {
ConcurrentHashMap<String, Integer> sourceMap = new ConcurrentHashMap<>();
ConcurrentHashMap<String, Integer> targetMap = new ConcurrentHashMap<>();

// 填充源数据
for (int i = 0; i < 10000; i++) {
sourceMap.put("key" + i, i);
}

// ✅ 高效的批量操作
targetMap.putAll(sourceMap);

// ✅ 并行流处理
Map<String, Integer> filteredResult = sourceMap.entrySet()
.parallelStream()
.filter(entry -> entry.getValue() % 2 == 0)
.collect(Collectors.toConcurrentMap(
Map.Entry::getKey,
Map.Entry::getValue
));
}

// 5. 监控和诊断
public void monitoring() {
ConcurrentHashMap<String, Object> map = new ConcurrentHashMap<>();

// 定期监控Map状态
ScheduledExecutorService monitor = Executors.newSingleThreadScheduledExecutor();
monitor.scheduleAtFixedRate(() -> {
// 注意:ConcurrentHashMap没有直接的大小方法
// 可以使用size()方法,但可能不准确(只是近似值)
long size = mappingCount(); // JDK 8+的精确大小方法

System.out.printf("Map size: %d, Load factor: %.2f%n",
size, (double) size / map.size());

// 检查是否有严重的扩容操作
// 可以通过监控GC和内存使用来间接判断
}, 10, 10, TimeUnit.SECONDS);
}

private long mappingCount() {
return 0; // 示例方法,实际实现会更复杂
}
}

📝 CopyOnWrite系列解析

CopyOnWriteArrayList实战应用

public class CopyOnWriteApplications {

// 1. 事件监听器管理 - 最经典的应用场景
public static class EventManager {
private final CopyOnWriteArrayList<EventListener> listeners = new CopyOnWriteArrayList<>();

public void addListener(EventListener listener) {
listeners.addIfAbsent(listener); // 避免重复添加
}

public void removeListener(EventListener listener) {
listeners.remove(listener);
}

public void fireEvent(Event event) {
// 读取操作完全无锁,性能极高
for (EventListener listener : listeners) {
try {
listener.onEvent(event);
} catch (Exception e) {
// 异常不会影响其他监听器的执行
System.err.println("Listener execution failed: " + e.getMessage());
}
}
}

public interface EventListener {
void onEvent(Event event);
}

public static class Event {
private final String type;
private final Object data;

public Event(String type, Object data) {
this.type = type;
this.data = data;
}

public String getType() { return type; }
public Object getData() { return data; }
}
}

// 2. 配置管理
public static class ConfigurationManager {
private final CopyOnWriteArrayList<ConfigItem> configItems = new CopyOnWriteArrayList<>();

public void updateConfig(String key, String value) {
ConfigItem newItem = new ConfigItem(key, value, System.currentTimeMillis());

// 查找并替换配置项
for (int i = 0; i < configItems.size(); i++) {
ConfigItem item = configItems.get(i);
if (item.getKey().equals(key)) {
configItems.set(i, newItem);
return;
}
}

// 新配置项
configItems.add(newItem);
}

public String getConfig(String key) {
return configItems.stream()
.filter(item -> item.getKey().equals(key))
.map(ConfigItem::getValue)
.findFirst()
.orElse(null);
}

public List<ConfigItem> getAllConfigs() {
// 返回不可变列表,保护内部数据
return Collections.unmodifiableList(new ArrayList<>(configItems));
}

private static class ConfigItem {
private final String key;
private final String value;
private final long updateTime;

public ConfigItem(String key, String value, long updateTime) {
this.key = key;
this.value = value;
this.updateTime = updateTime;
}

public String getKey() { return key; }
public String getValue() { return value; }
public long getUpdateTime() { return updateTime; }
}
}

// 3. 在线用户管理
public static class OnlineUserManager {
private final CopyOnWriteArrayList<UserSession> onlineUsers = new CopyOnWriteArrayList<>();

public void userLogin(UserSession session) {
onlineUsers.addIfAbsent(session);
}

public void userLogout(String userId) {
onlineUsers.removeIf(session -> session.getUserId().equals(userId));
}

public boolean isUserOnline(String userId) {
return onlineUsers.stream()
.anyMatch(session -> session.getUserId().equals(userId));
}

public List<UserSession> getOnlineUsers() {
return new ArrayList<>(onlineUsers);
}

public int getOnlineUserCount() {
return onlineUsers.size();
}

public void cleanupExpiredSessions(long currentTime) {
long expireTime = currentTime - TimeUnit.MINUTES.toMillis(30);

// 批量清理过期会话
onlineUsers.removeIf(session -> session.getLastActiveTime() < expireTime);
}

public static class UserSession {
private final String userId;
private final String sessionId;
private final long loginTime;
private volatile long lastActiveTime;

public UserSession(String userId, String sessionId) {
this.userId = userId;
this.sessionId = sessionId;
this.loginTime = System.currentTimeMillis();
this.lastActiveTime = this.loginTime;
}

public String getUserId() { return userId; }
public String getSessionId() { return sessionId; }
public long getLoginTime() { return loginTime; }
public long getLastActiveTime() { return lastActiveTime; }

public void updateActiveTime() {
this.lastActiveTime = System.currentTimeMillis();
}
}
}

// 4. 内存和性能监控
public static class CopyOnWriteMonitor {
private final CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();

public void demonstrateMemoryUsage() {
// 填充初始数据
for (int i = 0; i < 10000; i++) {
list.add("Item " + i);
}

// 监控内存使用
MemoryMXBean memoryBean = ManagementFactory.getMemoryMXBean();

System.out.println("添加前堆内存: " + memoryBean.getHeapMemoryUsage().getUsed() / 1024 + " KB");

// 写操作会复制整个数组
list.add("New Item");

System.out.println("添加后堆内存: " + memoryBean.getHeapMemoryUsage().getUsed() / 1024 + " KB");

// 大量写操作会触发多次数组复制
for (int i = 0; i < 100; i++) {
list.add("Bulk Item " + i);
}

System.out.println("批量添加后堆内存: " + memoryBean.getHeapMemoryUsage().getUsed() / 1024 + " KB");
}

public void performanceTest() {
// 读操作性能测试
long startTime = System.nanoTime();
for (int i = 0; i < 100000; i++) {
String item = list.get(i % list.size());
}
long readTime = System.nanoTime() - startTime;

// 写操作性能测试
startTime = System.nanoTime();
for (int i = 0; i < 1000; i++) {
list.add("Performance Test " + i);
}
long writeTime = System.nanoTime() - startTime;

System.out.printf("读取操作平均时间: %.2f ns%n", (double) readTime / 100000);
System.out.printf("写入操作平均时间: %.2f ns%n", (double) writeTime / 1000);
System.out.printf("读写性能比: %.1f:1%n", (double) writeTime / readTime * 100000);
}
}
}

CopyOnWriteArraySet特性

public class CopyOnWriteSetApplications {

// 基于CopyOnWriteArraySet的白名单管理
public static class WhiteListManager {
private final CopyOnWriteArraySet<String> whiteList = new CopyOnWriteArraySet<>();

public boolean addToWhiteList(String item) {
return whiteList.add(item);
}

public boolean removeFromWhiteList(String item) {
return whiteList.remove(item);
}

public boolean isInWhiteList(String item) {
return whiteList.contains(item);
}

public void batchUpdateWhiteList(Collection<String> newItems) {
// 原子性地更新白名单
whiteList.clear();
whiteList.addAll(newItems);
}

public Set<String> getWhiteListSnapshot() {
return new HashSet<>(whiteList);
}
}

// 标签管理系统
public static class TagManager {
private final CopyOnWriteArraySet<String> tags = new CopyOnWriteArraySet<>();

public void addTag(String tag) {
tags.add(tag.toLowerCase().trim());
}

public void removeTag(String tag) {
tags.remove(tag.toLowerCase().trim());
}

public boolean hasTag(String tag) {
return tags.contains(tag.toLowerCase().trim());
}

public Set<String> getAllTags() {
return new HashSet<>(tags);
}

// 标签过滤
public boolean matchesTags(String content, Set<String> requiredTags) {
String lowerContent = content.toLowerCase();
return requiredTags.stream()
.anyMatch(tag -> lowerContent.contains(tag.toLowerCase()));
}
}
}

🚶‍♂️ 并发队列体系

阻塞队列家族

public class BlockingQueueFamily {

// 1. ArrayBlockingQueue - 有界数组队列
public static class ArrayBlockingQueueExample {
private final ArrayBlockingQueue<Task> taskQueue;
private final ExecutorService executor;

public ArrayBlockingQueueExample(int capacity, int threadPoolSize) {
this.taskQueue = new ArrayBlockingQueue<>(capacity);
this.executor = Executors.newFixedThreadPool(threadPoolSize);

// 启动消费者
for (int i = 0; i < threadPoolSize; i++) {
executor.submit(this::consumerTask);
}
}

public void submitTask(Task task) throws InterruptedException {
taskQueue.put(task); // 队列满时阻塞
}

public boolean trySubmitTask(Task task, long timeout, TimeUnit unit) throws InterruptedException {
return taskQueue.offer(task, timeout, unit); // 超时返回
}

private void consumerTask() {
try {
while (!Thread.currentThread().isInterrupted()) {
Task task = taskQueue.take(); // 队列空时阻塞
processTask(task);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

private void processTask(Task task) {
System.out.println("Processing task: " + task.getId() + " by " + Thread.currentThread().getName());
try {
Thread.sleep(100); // 模拟任务处理
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

public void shutdown() {
executor.shutdown();
}

static class Task {
private final String id;

public Task(String id) {
this.id = id;
}

public String getId() {
return id;
}
}
}

// 2. LinkedBlockingQueue - 可选有界链表队列
public static class LinkedBlockingQueueExample {
private final LinkedBlockingQueue<WorkItem> workQueue = new LinkedBlockingQueue<>(1000);

// 生产者
public void producer() {
Random random = new Random();
for (int i = 0; i < 10000; i++) {
WorkItem item = new WorkItem("Task-" + i, random.nextInt(1000));
try {
workQueue.put(item); // 可阻塞
System.out.println("Produced: " + item.getId());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}

// 消费者
public void consumer() {
try {
while (!Thread.currentThread().isInterrupted()) {
WorkItem item = workQueue.take(); // 可阻塞
processWorkItem(item);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

private void processWorkItem(WorkItem item) {
System.out.println("Consumed: " + item.getId() + " by " + Thread.currentThread().getName());
}

static class WorkItem {
private final String id;
private final int processingTime;

public WorkItem(String id, int processingTime) {
this.id = id;
this.processingTime = processingTime;
}

public String getId() { return id; }
public int getProcessingTime() { return processingTime; }
}
}

// 3. SynchronousQueue - 同步队列(容量为0)
public static class SynchronousQueueExample {
private final SynchronousQueue<Runnable> handoffQueue = new SynchronousQueue<>();
private final ExecutorService executor;

public SynchronousQueueExample() {
this.executor = Executors.newCachedThreadPool();
startWorkers();
}

public void submitTask(Runnable task) throws InterruptedException {
handoffQueue.put(task); // 必须等待消费者接收
}

private void startWorkers() {
for (int i = 0; i < 5; i++) {
executor.submit(() -> {
try {
while (!Thread.currentThread().isInterrupted()) {
Runnable task = handoffQueue.take(); // 直接交接
task.run();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
}

public void shutdown() {
executor.shutdown();
}
}

// 4. PriorityBlockingQueue - 优先级队列
public static class PriorityBlockingQueueExample {
private final PriorityBlockingQueue<PriorityTask> taskQueue =
new PriorityBlockingQueue<>(11, Comparator.comparingInt(PriorityTask::getPriority).reversed());

public void addTask(String description, int priority) {
PriorityTask task = new PriorityTask(description, priority);
taskQueue.put(task);
}

public PriorityTask getNextTask() throws InterruptedException {
return taskQueue.take(); // 总是返回优先级最高的任务
}

static class PriorityTask {
private final String description;
private final int priority;

public PriorityTask(String description, int priority) {
this.description = description;
this.priority = priority;
}

public String getDescription() { return description; }
public int getPriority() { return priority; }
}
}

// 5. DelayQueue - 延迟队列
public static class DelayQueueExample {
private final DelayQueue<DelayedTask> delayQueue = new DelayQueue<>();

public void scheduleTask(Runnable task, long delayMillis) {
DelayedTask delayedTask = new DelayedTask(task, delayMillis);
delayQueue.put(delayedTask);
}

public void start() {
new Thread(() -> {
try {
while (!Thread.currentThread().isInterrupted()) {
DelayedTask delayedTask = delayQueue.take();
delayedTask.run();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
}

static class DelayedTask implements Delayed {
private final Runnable task;
private final long executeTime;

public DelayedTask(Runnable task, long delayMillis) {
this.task = task;
this.executeTime = System.currentTimeMillis() + delayMillis;
}

@Override
public long getDelay(TimeUnit unit) {
long remaining = executeTime - System.currentTimeMillis();
return unit.convert(remaining, TimeUnit.MILLISECONDS);
}

@Override
public int compareTo(Delayed other) {
return Long.compare(this.getDelay(TimeUnit.MILLISECONDS),
other.getDelay(TimeUnit.MILLISECONDS));
}

public void run() {
task.run();
}
}
}
}

非阻塞队列应用

public class NonBlockingQueueApplications {

// 1. ConcurrentLinkedQueue - 无界非阻塞队列
public static class EventProcessor {
private final ConcurrentLinkedQueue<Event> eventQueue = new ConcurrentLinkedQueue<>();
private final AtomicBoolean isRunning = new AtomicBoolean(false);
private final ExecutorService processor = Executors.newFixedThreadPool(3);

public void start() {
if (isRunning.compareAndSet(false, true)) {
// 启动多个消费者线程
for (int i = 0; i < 3; i++) {
processor.submit(this::processEvents);
}
}
}

public void submitEvent(Event event) {
eventQueue.offer(event); // 无锁操作,永不阻塞
}

private void processEvents() {
while (isRunning.get()) {
Event event = eventQueue.poll(); // 非阻塞获取
if (event != null) {
handleEvent(event);
} else {
// 没有事件时短暂休眠,避免CPU空转
try {
Thread.sleep(1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
}

private void handleEvent(Event event) {
System.out.println("Processing event: " + event.getId() +
" by " + Thread.currentThread().getName());
}

public void stop() {
isRunning.set(false);
processor.shutdown();
}

static class Event {
private final String id;
private final Object data;

public Event(String id, Object data) {
this.id = id;
this.data = data;
}

public String getId() { return id; }
public Object getData() { return data; }
}
}

// 2. ConcurrentLinkedDeque - 双端队列应用
public static class WorkStealingPool {
private final ConcurrentLinkedDeque<Runnable> workQueue = new ConcurrentLinkedDeque<>();
private final List<WorkerThread> workers;

public WorkStealingPool(int workerCount) {
this.workers = new ArrayList<>();
for (int i = 0; i < workerCount; i++) {
WorkerThread worker = new WorkerThread("Worker-" + i);
workers.add(worker);
worker.start();
}
}

public void submit(Runnable task) {
workQueue.addLast(task); // 添加到队列尾部
}

private class WorkerThread extends Thread {
public WorkerThread(String name) {
super(name);
}

@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
Runnable task = workQueue.pollLast(); // 从尾部获取(LIFO,减少竞争)

if (task == null) {
// 尝试从其他线程"偷"任务
task = stealTask();
}

if (task != null) {
try {
task.run();
} catch (Exception e) {
e.printStackTrace();
}
} else {
// 没有任务时短暂休眠
try {
Thread.sleep(10);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
}

private Runnable stealTask() {
// 从头部获取任务(FIFO,让其他线程优先处理自己的任务)
return workQueue.pollFirst();
}
}

public void shutdown() {
workers.forEach(Thread::interrupt);
}
}

// 3. 性能对比测试
public static class QueuePerformanceComparison {
public void compareQueuePerformance() {
int operationCount = 1000000;
int producerCount = 4;
int consumerCount = 4;

// 测试ConcurrentLinkedQueue
testConcurrentLinkedQueue(operationCount, producerCount, consumerCount);

// 测试LinkedBlockingQueue
testLinkedBlockingQueue(operationCount, producerCount, consumerCount);
}

private void testConcurrentLinkedQueue(int operationCount, int producerCount, int consumerCount) {
ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();
CountDownLatch startLatch = new CountDownLatch(1);
CountDownLatch endLatch = new CountDownLatch(producerCount + consumerCount);
AtomicInteger totalOperations = new AtomicInteger(0);

// 启动生产者
for (int i = 0; i < producerCount; i++) {
new Thread(() -> {
try {
startLatch.await();
for (int j = 0; j < operationCount / producerCount; j++) {
queue.offer(j);
totalOperations.incrementAndGet();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
endLatch.countDown();
}
}).start();
}

// 启动消费者
for (int i = 0; i < consumerCount; i++) {
new Thread(() -> {
try {
startLatch.await();
while (totalOperations.get() > 0 || !queue.isEmpty()) {
Integer item = queue.poll();
if (item != null) {
totalOperations.decrementAndGet();
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
endLatch.countDown();
}
}).start();
}

long startTime = System.nanoTime();
startLatch.countDown(); // 开始测试
try {
endLatch.await(); // 等待完成
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
long endTime = System.nanoTime();

System.out.printf("ConcurrentLinkedQueue: %.2f ms%n",
(endTime - startTime) / 1_000_000.0);
}

private void testLinkedBlockingQueue(int operationCount, int producerCount, int consumerCount) {
LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
CountDownLatch startLatch = new CountDownLatch(1);
CountDownLatch endLatch = new CountDownLatch(producerCount + consumerCount);

// 启动生产者
for (int i = 0; i < producerCount; i++) {
new Thread(() -> {
try {
startLatch.await();
for (int j = 0; j < operationCount / producerCount; j++) {
queue.put(j);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
endLatch.countDown();
}
}).start();
}

// 启动消费者
for (int i = 0; i < consumerCount; i++) {
new Thread(() -> {
try {
startLatch.await();
for (int j = 0; j < operationCount / consumerCount; j++) {
queue.take();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
endLatch.countDown();
}
}).start();
}

long startTime = System.nanoTime();
startLatch.countDown();
try {
endLatch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
long endTime = System.nanoTime();

System.out.printf("LinkedBlockingQueue: %.2f ms%n",
(endTime - startTime) / 1_000_000.0);
}
}
}

🏃‍♂️ 并发跳表集合

ConcurrentSkipListMap深度解析

public class ConcurrentSkipListMapApplications {

// 1. 时间窗口计数器
public static class TimeWindowCounter {
private final ConcurrentSkipListMap<Long, AtomicInteger> counterMap =
new ConcurrentSkipListMap<>();
private final long windowSizeMillis;
private final ScheduledExecutorService cleanupExecutor;

public TimeWindowCounter(long windowSizeMillis) {
this.windowSizeMillis = windowSizeMillis;
this.cleanupExecutor = Executors.newSingleThreadScheduledExecutor();

// 定期清理过期的时间窗口
cleanupExecutor.scheduleAtFixedRate(this::cleanupExpiredWindows,
windowSizeMillis, windowSizeMillis, TimeUnit.MILLISECONDS);
}

public void increment() {
long timeWindow = getCurrentTimeWindow();
counterMap.computeIfAbsent(timeWindow, k -> new AtomicInteger(0)).incrementAndGet();
}

public long getCountInCurrentWindow() {
long currentWindow = getCurrentTimeWindow();
AtomicInteger counter = counterMap.get(currentWindow);
return counter != null ? counter.get() : 0;
}

public long getCountInLastNWindows(int n) {
long currentWindow = getCurrentTimeWindow();
long total = 0;

for (int i = 0; i < n; i++) {
long window = currentWindow - i;
AtomicInteger counter = counterMap.get(window);
if (counter != null) {
total += counter.get();
}
}

return total;
}

public Map<Long, Integer> getSnapshot() {
Map<Long, Integer> snapshot = new TreeMap<>();
counterMap.forEach((timeWindow, counter) -> {
snapshot.put(timeWindow, counter.get());
});
return snapshot;
}

private long getCurrentTimeWindow() {
return System.currentTimeMillis() / windowSizeMillis;
}

private void cleanupExpiredWindows() {
long cutoffTime = System.currentTimeMillis() - windowSizeMillis * 10; // 保留10个窗口
long cutoffWindow = cutoffTime / windowSizeMillis;

// 使用headMap获取需要删除的条目
counterMap.headMap(cutoffWindow).clear();
}

public void shutdown() {
cleanupExecutor.shutdown();
}
}

// 2. 实时排行榜
public static class LeaderBoard {
private final ConcurrentSkipListMap<Double, String> sortedScores =
new ConcurrentSkipListMap<>(Comparator.reverseOrder());
private final ConcurrentSkipListMap<String, Double> userScores =
new ConcurrentSkipListMap<>();

public void updateScore(String user, double newScore) {
Double oldScore = userScores.get(user);

// 更新用户分数
userScores.put(user, newScore);

// 从排行榜中移除旧分数
if (oldScore != null) {
// 注意:可能多个用户有相同分数,需要小心删除
removeUserFromScoreList(user, oldScore);
}

// 添加新分数到排行榜
sortedScores.computeIfAbsent(newScore, k -> user).compareTo(user);
}

private void removeUserFromScoreList(String user, double score) {
ConcurrentNavigableMap<Double, String> subMap = sortedScores.subMap(score, true, score, true);
for (Map.Entry<Double, String> entry : subMap.entrySet()) {
if (entry.getValue().equals(user)) {
subMap.remove(entry.getKey(), entry.getValue());
break;
}
}
}

public List<String> getTopPlayers(int n) {
List<String> topPlayers = new ArrayList<>();
int count = 0;

for (Map.Entry<Double, String> entry : sortedScores.entrySet()) {
if (count >= n) break;
topPlayers.add(entry.getValue());
count++;
}

return topPlayers;
}

public int getUserRank(String user) {
Double userScore = userScores.get(user);
if (userScore == null) return -1;

int rank = 1;
for (Map.Entry<Double, String> entry : sortedScores.entrySet()) {
if (entry.getValue().equals(user)) {
return rank;
}
rank++;
}

return -1;
}

public Double getUserScore(String user) {
return userScores.get(user);
}
}

// 3. 延迟任务调度器
public static class DelayedTaskScheduler {
private final ConcurrentSkipListMap<Long, List<Runnable>> scheduleMap =
new ConcurrentSkipListMap<>();
private final ScheduledExecutorService scheduler =
Executors.newSingleThreadScheduledExecutor();
private final AtomicBoolean isRunning = new AtomicBoolean(false);

public void scheduleTask(Runnable task, long delayMillis) {
long executeTime = System.currentTimeMillis() + delayMillis;
scheduleMap.computeIfAbsent(executeTime, k -> new ArrayList<>()).add(task);
}

public void scheduleTaskAtTime(Runnable task, long executeTimeMillis) {
scheduleMap.computeIfAbsent(executeTimeMillis, k -> new ArrayList<>()).add(task);
}

public void start() {
if (isRunning.compareAndSet(false, true)) {
scheduler.submit(this::executeScheduledTasks);
}
}

private void executeScheduledTasks() {
while (isRunning.get()) {
long currentTime = System.currentTimeMillis();

// 获取所有到期的任务
ConcurrentNavigableMap<Long, List<Runnable>> expiredTasks =
scheduleMap.headMap(currentTime, true);

if (!expiredTasks.isEmpty()) {
// 执行到期任务
for (Map.Entry<Long, List<Runnable>> entry : expiredTasks.entrySet()) {
List<Runnable> tasks = entry.getValue();
for (Runnable task : tasks) {
try {
task.run();
} catch (Exception e) {
e.printStackTrace();
}
}
}

// 清理已执行的任务
expiredTasks.clear();
}

try {
// 根据下一个任务的执行时间休眠
Long nextExecuteTime = scheduleMap.ceilingKey(currentTime);
if (nextExecuteTime != null) {
long sleepTime = nextExecuteTime - currentTime;
if (sleepTime > 0) {
Thread.sleep(Math.min(sleepTime, 1000)); // 最多休眠1秒
}
} else {
Thread.sleep(1000); // 没有待执行任务,休眠1秒
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}

public void stop() {
isRunning.set(false);
scheduler.shutdown();
}

public int getPendingTaskCount() {
return scheduleMap.values().stream()
.mapToInt(List::size)
.sum();
}

public Long getNextTaskTime() {
return scheduleMap.ceilingKey(System.currentTimeMillis());
}
}

// 4. 范围查询应用
public static class RangeQueryExample {
private final ConcurrentSkipListMap<Long, String> dataMap =
new ConcurrentSkipListMap<>();

public void addData(long timestamp, String data) {
dataMap.put(timestamp, data);
}

// 查询指定时间范围内的数据
public Map<Long, String> getDataInRange(long startTime, long endTime) {
return new TreeMap<>(dataMap.subMap(startTime, true, endTime, true));
}

// 查询最近N个时间点的数据
public Map<Long, String> getLatestData(int count) {
return new TreeMap<>(dataMap.tailMap(dataMap.lastKey() - count));
}

// 查询最早N个时间点的数据
public Map<Long, String> getEarliestData(int count) {
Map<Long, String> result = new TreeMap<>();
int currentCount = 0;

for (Map.Entry<Long, String> entry : dataMap.entrySet()) {
if (currentCount >= count) break;
result.put(entry.getKey(), entry.getValue());
currentCount++;
}

return result;
}

// 获取指定时间戳之后的数据
public Map<Long, String> getDataAfter(long timestamp) {
return new TreeMap<>(dataMap.tailMap(timestamp, false));
}

// 获取指定时间戳之前的数据
public Map<Long, String> getDataBefore(long timestamp) {
return new TreeMap<>(dataMap.headMap(timestamp, false));
}
}
}

ConcurrentSkipListSet特性

public class ConcurrentSkipListSetApplications {

// 1. 并发有序集合操作
public static class SortedSetOperations {
private final ConcurrentSkipListSet<Integer> sortedSet =
new ConcurrentSkipListSet<>();

public void addNumbers(int... numbers) {
for (int num : numbers) {
sortedSet.add(num);
}
}

public Set<Integer> getRange(int start, int end) {
return new TreeSet<>(sortedSet.subSet(start, true, end, true));
}

public Set<Integer> getGreaterThan(int threshold) {
return new TreeSet<>(sortedSet.tailSet(threshold, false));
}

public Set<Integer> getLessThan(int threshold) {
return new TreeSet<>(sortedSet.headSet(threshold, false));
}

public Integer getFirst() {
return sortedSet.first();
}

public Integer getLast() {
return sortedSet.last();
}

public Integer getFloor(Integer element) {
return sortedSet.floor(element);
}

public Integer getCeiling(Integer element) {
return sortedSet.ceiling(element);
}
}

// 2. 并发优先级任务队列
public static class PriorityTaskQueue {
private final ConcurrentSkipListSet<PriorityTask> taskSet =
new ConcurrentSkipListSet<>(Comparator.comparingInt(PriorityTask::getPriority)
.thenComparing(PriorityTask::getSubmissionTime));

public void addTask(PriorityTask task) {
taskSet.add(task);
}

public PriorityTask getNextTask() {
return taskSet.pollFirst(); // 获取优先级最高(数字最小)的任务
}

public boolean removeTask(PriorityTask task) {
return taskSet.remove(task);
}

public int getTaskCount() {
return taskSet.size();
}

public List<PriorityTask> getAllTasks() {
return new ArrayList<>(taskSet);
}

static class PriorityTask {
private final String id;
private final int priority; // 数字越小优先级越高
private final long submissionTime;
private final Runnable task;

public PriorityTask(String id, int priority, Runnable task) {
this.id = id;
this.priority = priority;
this.submissionTime = System.currentTimeMillis();
this.task = task;
}

public String getId() { return id; }
public int getPriority() { return priority; }
public long getSubmissionTime() { return submissionTime; }
public Runnable getTask() { return task; }
}
}
}

📊 性能对比与选择

并发集合性能特征

public class ConcurrentCollectionComparison {

// 性能测试框架
public static class PerformanceTester {
private static final int THREAD_COUNT = 8;
private static final int OPERATIONS_PER_THREAD = 100000;

public static void main(String[] args) throws InterruptedException {
System.out.println("并发集合性能对比测试");
System.out.println("=".repeat(50));

// 测试Map性能
testMapPerformance();

// 测试List性能
testListPerformance();

// 测试Queue性能
testQueuePerformance();

// 测试Set性能
testSetPerformance();
}

private static void testMapPerformance() throws InterruptedException {
System.out.println("\nMap性能测试:");

// ConcurrentHashMap测试
long chmTime = testMap(() -> new ConcurrentHashMap<>(), "ConcurrentHashMap");

// Collections.synchronizedMap测试
long syncMapTime = testMap(() -> {
Map<String, Integer> map = new HashMap<>();
return Collections.synchronizedMap(map);
}, "SynchronizedHashMap");

System.out.printf("性能提升: %.1fx%n", (double) syncMapTime / chmTime);
}

private static long testMap(Supplier<Map<String, Integer>> mapSupplier, String name)
throws InterruptedException {
Map<String, Integer> map = mapSupplier.get();

CountDownLatch latch = new CountDownLatch(THREAD_COUNT);
long startTime = System.nanoTime();

for (int i = 0; i < THREAD_COUNT; i++) {
final int threadId = i;
new Thread(() -> {
try {
Random random = new Random(threadId);
for (int j = 0; j < OPERATIONS_PER_THREAD; j++) {
String key = "key-" + random.nextInt(10000);

// 混合读写操作
if (random.nextBoolean()) {
map.put(key, random.nextInt());
} else {
map.get(key);
}
}
} finally {
latch.countDown();
}
}).start();
}

latch.await();
long endTime = System.nanoTime();

System.out.printf("%s: %.2f ms%n", name, (endTime - startTime) / 1_000_000.0);
return endTime - startTime;
}

private static void testListPerformance() throws InterruptedException {
System.out.println("\nList性能测试:");

// CopyOnWriteArrayList测试
long cowTime = testList(() -> new CopyOnWriteArrayList<>(), "CopyOnWriteArrayList");

// Collections.synchronizedList测试
long syncListTime = testList(() -> {
List<String> list = new ArrayList<>();
return Collections.synchronizedList(list);
}, "SynchronizedArrayList");

System.out.printf("读性能提升: %.1fx%n", (double) syncListTime / cowTime);
}

private static long testList(Supplier<List<String>> listSupplier, String name)
throws InterruptedException {
List<String> list = listSupplier.get();

// 预填充数据
for (int i = 0; i < 1000; i++) {
list.add("item-" + i);
}

CountDownLatch latch = new CountDownLatch(THREAD_COUNT);
long startTime = System.nanoTime();

for (int i = 0; i < THREAD_COUNT; i++) {
new Thread(() -> {
try {
Random random = new Random();
for (int j = 0; j < OPERATIONS_PER_THREAD; j++) {
if (random.nextBoolean()) {
// 90%读操作
list.get(random.nextInt(list.size()));
} else {
// 10%写操作
list.add("new-item-" + random.nextInt(1000));
}
}
} finally {
latch.countDown();
}
}).start();
}

latch.await();
long endTime = System.nanoTime();

System.out.printf("%s: %.2f ms%n", name, (endTime - startTime) / 1_000_000.0);
return endTime - startTime;
}

private static void testQueuePerformance() throws InterruptedException {
System.out.println("\nQueue性能测试:");

// ConcurrentLinkedQueue测试
long clqTime = testQueue(() -> new ConcurrentLinkedQueue<>(), "ConcurrentLinkedQueue");

// LinkedBlockingQueue测试
long lbqTime = testQueue(() -> new LinkedBlockingQueue<>(), "LinkedBlockingQueue");

System.out.printf("非阻塞性能提升: %.1fx%n", (double) lbqTime / clqTime);
}

private static long testQueue(Supplier<Queue<Integer>> queueSupplier, String name)
throws InterruptedException {
Queue<Integer> queue = queueSupplier.get();

CountDownLatch latch = new CountDownLatch(THREAD_COUNT);
long startTime = System.nanoTime();

for (int i = 0; i < THREAD_COUNT; i++) {
new Thread(() -> {
try {
Random random = new Random();
for (int j = 0; j < OPERATIONS_PER_THREAD; j++) {
if (random.nextBoolean()) {
queue.offer(random.nextInt());
} else {
queue.poll();
}
}
} finally {
latch.countDown();
}
}).start();
}

latch.await();
long endTime = System.nanoTime();

System.out.printf("%s: %.2f ms%n", name, (endTime - startTime) / 1_000_000.0);
return endTime - startTime;
}

private static void testSetPerformance() throws InterruptedException {
System.out.println("\nSet性能测试:");

// ConcurrentHashMap.KeySet测试
long chmSetTime = testSet(() -> ConcurrentHashMap.newKeySet(), "ConcurrentHashMap.KeySet");

// Collections.synchronizedSet测试
long syncSetTime = testSet(() -> {
Set<Integer> set = new HashSet<>();
return Collections.synchronizedSet(set);
}, "SynchronizedHashSet");

System.out.printf("性能提升: %.1fx%n", (double) syncSetTime / chmSetTime);
}

private static long testSet(Supplier<Set<Integer>> setSupplier, String name)
throws InterruptedException {
Set<Integer> set = setSupplier.get();

CountDownLatch latch = new CountDownLatch(THREAD_COUNT);
long startTime = System.nanoTime();

for (int i = 0; i < THREAD_COUNT; i++) {
new Thread(() -> {
try {
Random random = new Random();
for (int j = 0; j < OPERATIONS_PER_THREAD; j++) {
if (random.nextBoolean()) {
set.add(random.nextInt(10000));
} else {
set.contains(random.nextInt(10000));
}
}
} finally {
latch.countDown();
}
}).start();
}

latch.await();
long endTime = System.nanoTime();

System.out.printf("%s: %.2f ms%n", name, (endTime - startTime) / 1_000_000.0);
return endTime - startTime;
}
}
}

选择决策矩阵

public class ConcurrentCollectionSelector {

/**
* 并发集合选择决策矩阵
*/
public static class SelectionCriteria {
private final boolean requiresOrdering;
private final boolean requiresBlocking;
private final boolean requiresBounded;
private final double readWriteRatio; // 读操作比例
private final boolean requiresRangeQueries;
private final boolean requiresAtomicOperations;
private final int expectedSize;
private final int concurrencyLevel;

public SelectionCriteria(boolean requiresOrdering, boolean requiresBlocking,
boolean requiresBounded, double readWriteRatio,
boolean requiresRangeQueries, boolean requiresAtomicOperations,
int expectedSize, int concurrencyLevel) {
this.requiresOrdering = requiresOrdering;
this.requiresBlocking = requiresBlocking;
this.requiresBounded = requiresBounded;
this.readWriteRatio = readWriteRatio;
this.requiresRangeQueries = requiresRangeQueries;
this.requiresAtomicOperations = requiresAtomicOperations;
this.expectedSize = expectedSize;
this.concurrencyLevel = concurrencyLevel;
}

// Getters
public boolean requiresOrdering() { return requiresOrdering; }
public boolean requiresBlocking() { return requiresBlocking; }
public boolean requiresBounded() { return requiresBounded; }
public double getReadWriteRatio() { return readWriteRatio; }
public boolean requiresRangeQueries() { return requiresRangeQueries; }
public boolean requiresAtomicOperations() { return requiresAtomicOperations; }
public int getExpectedSize() { return expectedSize; }
public int getConcurrencyLevel() { return concurrencyLevel; }
}

/**
* 并发集合推荐器
*/
public static class ConcurrentCollectionRecommender {

public static String recommendMap(SelectionCriteria criteria) {
if (criteria.requiresOrdering() || criteria.requiresRangeQueries()) {
return "ConcurrentSkipListMap - 有序支持,范围查询性能好";
} else if (criteria.requiresAtomicOperations()) {
return "ConcurrentHashMap - 高性能,支持原子操作";
} else {
return "ConcurrentHashMap - 通用并发Map,性能优秀";
}
}

public static String recommendList(SelectionCriteria criteria) {
if (criteria.getReadWriteRatio() > 0.8) {
return "CopyOnWriteArrayList - 读多写少,读操作无锁";
} else if (criteria.requiresBlocking()) {
if (criteria.requiresBounded()) {
return "ArrayBlockingQueue - 有界阻塞,适合生产者-消费者";
} else {
return "LinkedBlockingQueue - 无界阻塞,容量灵活";
}
} else {
return "ConcurrentLinkedQueue - 无锁非阻塞,高性能";
}
}

public static String recommendSet(SelectionCriteria criteria) {
if (criteria.requiresOrdering() || criteria.requiresRangeQueries()) {
return "ConcurrentSkipListSet - 有序支持,范围查询";
} else if (criteria.getReadWriteRatio() > 0.8) {
return "CopyOnWriteArraySet - 读多写少,基于COW";
} else {
return "ConcurrentHashMap.newKeySet() - 高性能并发Set";
}
}

public static String recommendQueue(SelectionCriteria criteria) {
if (criteria.requiresBlocking()) {
if (criteria.requiresBounded()) {
return "ArrayBlockingQueue - 有界阻塞,容量固定";
} else {
return "LinkedBlockingQueue - 无界阻塞,吞吐量高";
}
} else {
return "ConcurrentLinkedQueue - 无锁非阻塞,延迟低";
}
}

// 详细的决策树
public static class DecisionTree {

public static String decideMap(SelectionCriteria criteria) {
if (criteria.requiresOrdering()) {
// 需要有序
if (criteria.requiresRangeQueries()) {
return "ConcurrentSkipListMap - 支持范围查询和有序遍历";
} else {
return "ConcurrentSkipListMap - 有序Map,性能稳定";
}
} else {
// 不需要有序
if (criteria.getExpectedSize() > 1000000) {
return "ConcurrentHashMap - 大数据量,内存效率高";
} else if (criteria.getConcurrencyLevel() > 50) {
return "ConcurrentHashMap - 高并发场景优化";
} else {
return "ConcurrentHashMap - 通用高性能Map";
}
}
}

public static String decideList(SelectionCriteria criteria) {
if (criteria.getReadWriteRatio() > 0.9) {
// 读操作占主导
return "CopyOnWriteArrayList - 99%读操作,写时复制";
} else if (criteria.getReadWriteRatio() > 0.7) {
// 读操作较多
if (criteria.getExpectedSize() < 1000) {
return "CopyOnWriteArrayList - 中小数据量,写操作可接受";
} else {
return "Vector 或 Collections.synchronizedList - 大数据量考虑内存使用";
}
} else {
// 读写操作相当
if (criteria.requiresBlocking()) {
return "LinkedBlockingQueue - 阻塞语义,自动背压";
} else {
return "ConcurrentLinkedQueue - 非阻塞,高性能";
}
}
}

public static String decideQueue(SelectionCriteria criteria) {
if (criteria.requiresBlocking()) {
// 需要阻塞语义
if (criteria.requiresBounded()) {
if (criteria.getConcurrencyLevel() > 20) {
return "LinkedBlockingQueue - 公平性好,适合高并发";
} else {
return "ArrayBlockingQueue - 内存紧凑,性能稳定";
}
} else {
if (criteria.requiresFairness()) {
return "LinkedBlockingQueue - 公平锁支持";
} else {
return "LinkedBlockingQueue - 高吞吐量";
}
}
} else {
// 非阻塞
if (criteria.requiresDeque()) {
return "ConcurrentLinkedDeque - 双端队列支持";
} else {
return "ConcurrentLinkedQueue - 高性能单端队列";
}
}
}
}
}

// 实际应用示例
public static class RealWorldExamples {

public static void main(String[] args) {
// 场景1:高并发缓存
SelectionCriteria cacheCriteria = new SelectionCriteria(
false, // 不需要有序
false, // 不需要阻塞
false, // 不需要固定容量
0.8, // 80%读操作
false, // 不需要范围查询
true, // 需要原子操作
100000, // 预期大小
16 // 并发级别
);

System.out.println("缓存场景推荐: " +
ConcurrentCollectionRecommender.recommendMap(cacheCriteria));

// 场景2:事件监听器
SelectionCriteria eventCriteria = new SelectionCriteria(
false, // 不需要有序
false, // 不需要阻塞
false, // 不需要固定容量
0.95, // 95%读操作
false, // 不需要范围查询
false, // 不需要原子操作
1000, // 预期大小
8 // 并发级别
);

System.out.println("事件监听器场景推荐: " +
ConcurrentCollectionRecommender.recommendList(eventCriteria));

// 场景3:任务队列
SelectionCriteria taskQueueCriteria = new SelectionCriteria(
false, // 不需要有序
true, // 需要阻塞
true, // 需要固定容量
0.5, // 读写平衡
false, // 不需要范围查询
false, // 不需要原子操作
10000, // 预期大小
4 // 并发级别
);

System.out.println("任务队列场景推荐: " +
ConcurrentCollectionRecommender.recommendQueue(taskQueueCriteria));
}
}
}

🏭 实战应用场景

1. 分布式缓存实现

public class DistributedCacheImplementation {

/**
* 高性能分布式缓存实现
* 结合了多种并发集合的特性
*/
public static class DistributedCache<K, V> {
private final ConcurrentHashMap<K, CacheEntry<V>> cache;
private final ConcurrentLinkedQueue<K> accessQueue; // LRU访问顺序
private final ScheduledExecutorService cleanupExecutor;
private final int maxSize;
private final long expireAfterWriteMillis;
private final long expireAfterAccessMillis;

public DistributedCache(int maxSize, long expireAfterWriteMillis, long expireAfterAccessMillis) {
this.maxSize = maxSize;
this.expireAfterWriteMillis = expireAfterWriteMillis;
this.expireAfterAccessMillis = expireAfterAccessMillis;
this.cache = new ConcurrentHashMap<>(maxSize);
this.accessQueue = new ConcurrentLinkedQueue<>();
this.cleanupExecutor = Executors.newSingleThreadScheduledExecutor();

startCleanupTask();
}

public void put(K key, V value) {
long currentTime = System.currentTimeMillis();
CacheEntry<V> entry = new CacheEntry<>(value, currentTime, currentTime);
cache.put(key, entry);
accessQueue.offer(key);

// 检查容量限制
if (cache.size() > maxSize) {
evictLRU();
}
}

public V get(K key) {
CacheEntry<V> entry = cache.get(key);
if (entry == null) {
return null;
}

long currentTime = System.currentTimeMillis();

// 检查是否过期
if (isExpired(entry, currentTime)) {
cache.remove(key);
return null;
}

// 更新访问时间
entry.setLastAccessTime(currentTime);
accessQueue.offer(key); // 记录访问

return entry.getValue();
}

public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) {
return cache.computeIfAbsent(key, k -> {
long currentTime = System.currentTimeMillis();
V value = mappingFunction.apply(k);
CacheEntry<V> entry = new CacheEntry<>(value, currentTime, currentTime);
accessQueue.offer(k);
return entry;
}).getValue();
}

public void remove(K key) {
cache.remove(key);
}

public void clear() {
cache.clear();
accessQueue.clear();
}

public int size() {
return cache.size();
}

public Map<K, V> getSnapshot() {
long currentTime = System.currentTimeMillis();
Map<K, V> snapshot = new HashMap<>();

cache.forEach((key, entry) -> {
if (!isExpired(entry, currentTime)) {
snapshot.put(key, entry.getValue());
}
});

return snapshot;
}

private boolean isExpired(CacheEntry<?> entry, long currentTime) {
if (expireAfterWriteMillis > 0 &&
currentTime - entry.getWriteTime() > expireAfterWriteMillis) {
return true;
}

if (expireAfterAccessMillis > 0 &&
currentTime - entry.getLastAccessTime() > expireAfterAccessMillis) {
return true;
}

return false;
}

private void evictLRU() {
// 简化的LRU实现,实际项目中可能需要更复杂的算法
Set<K> toRemove = ConcurrentHashMap.newKeySet();
int targetSize = (int) (maxSize * 0.8); // 清理到80%容量

// 从访问队列中移除最近访问的元素
K key;
while ((key = accessQueue.poll()) != null && cache.size() > targetSize) {
if (cache.containsKey(key)) {
toRemove.add(key);
}
}

// 批量删除
toRemove.forEach(cache::remove);
}

private void startCleanupTask() {
cleanupExecutor.scheduleAtFixedRate(() -> {
long currentTime = System.currentTimeMillis();
Set<K> expiredKeys = ConcurrentHashMap.newKeySet();

cache.forEach((key, entry) -> {
if (isExpired(entry, currentTime)) {
expiredKeys.add(key);
}
});

expiredKeys.forEach(cache::remove);

}, 60000, 60000, TimeUnit.MILLISECONDS); // 每分钟清理一次
}

public void shutdown() {
cleanupExecutor.shutdown();
}

private static class CacheEntry<V> {
private final V value;
private final long writeTime;
private volatile long lastAccessTime;

public CacheEntry(V value, long writeTime, long lastAccessTime) {
this.value = value;
this.writeTime = writeTime;
this.lastAccessTime = lastAccessTime;
}

public V getValue() { return value; }
public long getWriteTime() { return writeTime; }
public long getLastAccessTime() { return lastAccessTime; }
public void setLastAccessTime(long lastAccessTime) { this.lastAccessTime = lastAccessTime; }
}
}

/**
* 缓存使用示例
*/
public static class CacheUsageExample {
public static void main(String[] args) {
// 创建缓存实例
DistributedCache<String, User> userCache = new DistributedCache<>(
1000, // 最大1000个条目
TimeUnit.MINUTES.toMillis(30), // 写入30分钟后过期
TimeUnit.MINUTES.toMillis(10) // 访问10分钟后过期
);

// 使用缓存
User user = userCache.computeIfAbsent("user123", userId -> {
// 模拟从数据库加载用户
return loadUserFromDatabase(userId);
});

System.out.println("Loaded user: " + user.getName());

// 手动添加到缓存
userCache.put("user456", new User("user456", "Alice"));

// 获取缓存统计
System.out.println("Cache size: " + userCache.size());

// 关闭缓存
userCache.shutdown();
}

private static User loadUserFromDatabase(String userId) {
// 模拟数据库访问
return new User(userId, "User " + userId);
}

private static class User {
private final String id;
private final String name;

public User(String id, String name) {
this.id = id;
this.name = name;
}

public String getId() { return id; }
public String getName() { return name; }
}
}
}

2. 实时数据分析系统

public class RealTimeAnalyticsSystem {

/**
* 实时指标收集器
*/
public static class MetricsCollector {
private final ConcurrentHashMap<String, LongAdder> counters = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, ConcurrentHashMap<String, Integer>> dimensions =
new ConcurrentHashMap<>();
private final ConcurrentSkipListMap<Long, Map<String, Long>> timeSeriesData =
new ConcurrentSkipListMap<>();
private final ScheduledExecutorService reporter;

public MetricsCollector() {
this.reporter = Executors.newSingleThreadScheduledExecutor();
startPeriodicReporting();
}

public void incrementCounter(String name) {
counters.computeIfAbsent(name, k -> new LongAdder()).increment();
}

public void incrementCounter(String name, long delta) {
counters.computeIfAbsent(name, k -> new LongAdder()).add(delta);
}

public void recordDimension(String metricName, String dimensionValue) {
dimensions.computeIfAbsent(metricName, k -> new ConcurrentHashMap<>())
.merge(dimensionValue, 1, Integer::sum);
}

public void recordTimeSeriesValue(long timestamp, String metricName, long value) {
timeSeriesData.computeIfAbsent(timestamp, k -> new ConcurrentHashMap<>())
.put(metricName, value);
}

public Map<String, Long> getSnapshot() {
Map<String, Long> snapshot = new HashMap<>();
counters.forEach((name, adder) -> snapshot.put(name, adder.sum()));
return snapshot;
}

public Map<String, Map<String, Integer>> getDimensionSnapshot() {
Map<String, Map<String, Integer>> snapshot = new HashMap<>();
dimensions.forEach((metricName, dimMap) -> {
snapshot.put(metricName, new HashMap<>(dimMap));
});
return snapshot;
}

public Map<String, Long> getMetricsInTimeRange(long startTime, long endTime) {
Map<String, Long> result = new HashMap<>();

timeSeriesData.subMap(startTime, true, endTime, true)
.forEach((timestamp, metrics) -> {
metrics.forEach((metricName, value) -> {
result.merge(metricName, value, Long::sum);
});
});

return result;
}

private void startPeriodicReporting() {
reporter.scheduleAtFixedRate(() -> {
try {
generateReport();
cleanupOldData();
} catch (Exception e) {
e.printStackTrace();
}
}, 60, 60, TimeUnit.SECONDS); // 每分钟报告一次
}

private void generateReport() {
Map<String, Long> counterSnapshot = getSnapshot();
long currentTime = System.currentTimeMillis();

System.out.println("=== 实时指标报告 [" + new Date(currentTime) + "] ===");
counterSnapshot.forEach((name, value) -> {
System.out.printf("%s: %d%n", name, value);
});

// 保存到时间序列数据
counterSnapshot.forEach((name, value) -> {
recordTimeSeriesValue(currentTime, name, value);
});
}

private void cleanupOldData() {
long cutoffTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(24);
timeSeriesData.headMap(cutoffTime).clear();
}

public void shutdown() {
reporter.shutdown();
}
}

/**
* 事件处理管道
*/
public static class EventProcessingPipeline {
private final BlockingQueue<Event> inputQueue;
private final List<BlockingQueue<Event>> stageQueues;
private final List<ExecutorService> stageExecutors;
private final ConcurrentLinkedQueue<Event> outputQueue;
private final AtomicInteger processedCount = new AtomicInteger(0);
private final AtomicInteger errorCount = new AtomicInteger(0);

public EventProcessingPipeline(int queueCapacity, int... stageThreadCounts) {
this.inputQueue = new LinkedBlockingQueue<>(queueCapacity);
this.stageQueues = new ArrayList<>();
this.stageExecutors = new ArrayList<>();
this.outputQueue = new ConcurrentLinkedQueue<>();

// 为每个处理阶段创建队列和线程池
for (int i = 0; i < stageThreadCounts.length; i++) {
stageQueues.add(new LinkedBlockingQueue<>(queueCapacity));
stageExecutors.add(Executors.newFixedThreadPool(stageThreadCounts[i]));
}
}

public void submitEvent(Event event) throws InterruptedException {
inputQueue.put(event);
}

public void start() {
// 启动输入处理线程
new Thread(this::processInputStage).start();

// 启动中间处理阶段
for (int i = 0; i < stageQueues.size(); i++) {
final int stageIndex = i;
new Thread(() -> processMiddleStage(stageIndex)).start();
}

// 启动输出处理线程
new Thread(this::processOutputStage).start();
}

private void processInputStage() {
try {
while (!Thread.currentThread().isInterrupted()) {
Event event = inputQueue.take();
// 预处理
preprocessEvent(event);

if (!stageQueues.isEmpty()) {
stageQueues.get(0).offer(event);
} else {
outputQueue.offer(event);
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

private void processMiddleStage(int stageIndex) {
BlockingQueue<Event> currentQueue = stageQueues.get(stageIndex);
BlockingQueue<Event> nextQueue = stageIndex + 1 < stageQueues.size() ?
stageQueues.get(stageIndex + 1) : null;

ExecutorService executor = stageExecutors.get(stageIndex);

while (!Thread.currentThread().isInterrupted()) {
try {
Event event = currentQueue.take();

executor.submit(() -> {
try {
processEventStage(event, stageIndex);

if (nextQueue != null) {
nextQueue.offer(event);
} else {
outputQueue.offer(event);
}
processedCount.incrementAndGet();
} catch (Exception e) {
errorCount.incrementAndGet();
e.printStackTrace();
}
});
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}

private void processOutputStage() {
try {
while (!Thread.currentThread().isInterrupted()) {
Event event = outputQueue.poll(100, TimeUnit.MILLISECONDS);
if (event != null) {
handleCompletedEvent(event);
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

private void preprocessEvent(Event event) {
// 预处理逻辑
event.setPreprocessTime(System.currentTimeMillis());
}

private void processEventStage(Event event, int stageIndex) {
// 模拟处理时间
try {
Thread.sleep(10);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
event.addProcessedStage(stageIndex);
}

private void handleCompletedEvent(Event event) {
event.setCompletedTime(System.currentTimeMillis());
// 保存处理结果或发送到下游系统
}

public int getProcessedCount() {
return processedCount.get();
}

public int getErrorCount() {
return errorCount.get();
}

public void shutdown() {
stageExecutors.forEach(ExecutorService::shutdown);
}

static class Event {
private final String id;
private final Map<String, Object> data;
private long preprocessTime;
private long completedTime;
private final List<Integer> processedStages = new ArrayList<>();

public Event(String id, Map<String, Object> data) {
this.id = id;
this.data = data;
}

public String getId() { return id; }
public Map<String, Object> getData() { return data; }
public long getPreprocessTime() { return preprocessTime; }
public void setPreprocessTime(long preprocessTime) { this.preprocessTime = preprocessTime; }
public long getCompletedTime() { return completedTime; }
public void setCompletedTime(long completedTime) { this.completedTime = completedTime; }
public void addProcessedStage(int stage) { processedStages.add(stage); }
public List<Integer> getProcessedStages() { return new ArrayList<>(processedStages); }
}
}
}

⚠️ 常见陷阱与最佳实践

1. 并发集合使用陷阱

public class ConcurrentCollectionPitfalls {

/**
* 陷阱1:误用迭代器进行修改操作
*/
public static class IteratorModificationTrap {
public static void demonstrate() {
List<String> list = new CopyOnWriteArrayList<>();
list.add("item1");
list.add("item2");
list.add("item3");

// ❌ 错误:使用迭代器remove可能不工作
for (Iterator<String> it = list.iterator(); it.hasNext(); ) {
String item = it.next();
if ("item2".equals(item)) {
// 对于CopyOnWriteArrayList,迭代器是快照,remove操作无效
// it.remove(); // 会抛出UnsupportedOperationException
}
}

// ✅ 正确:直接使用集合的remove方法
list.remove("item2");

// ✅ 或者使用removeIf(Java 8+)
list.removeIf("item3"::equals);
}
}

/**
* 陷阱2:复合操作的原子性问题
*/
public static class CompoundOperationTrap {
private final ConcurrentHashMap<String, Integer> counterMap = new ConcurrentHashMap<>();

// ❌ 错误:非原子性的检查-然后-操作
public void incrementBadWay(String key) {
if (counterMap.containsKey(key)) {
int value = counterMap.get(key);
counterMap.put(key, value + 1); // 可能在获取值后被其他线程修改
} else {
counterMap.put(key, 1);
}
}

// ✅ 正确:使用原子方法
public void incrementGoodWay(String key) {
counterMap.compute(key, (k, v) -> (v == null) ? 1 : v + 1);
}

// ✅ 更好:使用LongAdder
private final ConcurrentHashMap<String, LongAdder> adderMap = new ConcurrentHashMap<>();

public void incrementBestWay(String key) {
adderMap.computeIfAbsent(key, k -> new LongAdder()).increment();
}

public long getValue(String key) {
LongAdder adder = adderMap.get(key);
return adder != null ? adder.sum() : 0;
}
}

/**
* 陷阱3:内存泄漏风险
*/
public static class MemoryLeakTrap {
// ❌ 问题:无界集合可能导致内存泄漏
private final ConcurrentLinkedQueue<LargeObject> queue = new ConcurrentLinkedQueue<>();

public void addToQueue(LargeObject obj) {
queue.offer(obj); // 如果没有消费者,队列会无限增长
}

// ✅ 解决方案1:使用有界队列
private final BlockingQueue<LargeObject> boundedQueue = new LinkedBlockingQueue<>(1000);

public boolean addToBoundedQueue(LargeObject obj) {
return boundedQueue.offer(obj); // 队列满时拒绝新元素
}

// ✅ 解决方案2:定期清理
public void cleanupOldEntries() {
long cutoffTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1);
queue.removeIf(obj -> obj.getCreateTime() < cutoffTime);
}

// ✅ 解决方案3:使用弱引用
private final ConcurrentLinkedQueue<WeakReference<LargeObject>> weakQueue =
new ConcurrentLinkedQueue<>();

public void addToWeakQueue(LargeObject obj) {
weakQueue.offer(new WeakReference<>(obj));
}

public void cleanupWeakReferences() {
weakQueue.removeIf(ref -> ref.get() == null);
}

static class LargeObject {
private final long createTime;
private final byte[] data = new byte[1024 * 1024]; // 1MB

public LargeObject() {
this.createTime = System.currentTimeMillis();
}

public long getCreateTime() { return createTime; }
}
}

/**
* 陷阱4:CopyOnWrite写性能陷阱
*/
public static class CopyOnWritePerformanceTrap {
private final CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();

public void demonstratePerformanceIssue() {
// 预填充数据
for (int i = 0; i < 100000; i++) {
list.add("item-" + i);
}

long startTime = System.nanoTime();

// ❌ 问题:批量写操作性能极差
for (int i = 0; i < 1000; i++) {
list.add("new-item-" + i); // 每次添加都会复制整个数组
}

long endTime = System.nanoTime();
System.out.printf("1000次写操作耗时: %.2f ms%n",
(endTime - startTime) / 1_000_000.0);

// ✅ 解决方案:批量操作
startTime = System.nanoTime();
List<String> newItems = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
newItems.add("batch-item-" + i);
}
list.addAll(newItems); // 只复制一次数组
endTime = System.nanoTime();

System.out.printf("批量添加1000项耗时: %.2f ms%n",
(endTime - startTime) / 1_000_000.0);
}
}

/**
* 陷阱5:假共享问题
*/
public static class FalseSharingTrap {
// ❌ 问题:多个原子变量在同一缓存行中
private final AtomicLong counter1 = new AtomicLong(0);
private final AtomicLong counter2 = new AtomicLong(0);

public void incrementCounters() {
counter1.incrementAndGet(); // 可能导致缓存行失效,影响counter2的性能
counter2.incrementAndGet();
}

// ✅ 解决方案:使用@Contended注解或填充字节避免假共享
@sun.misc.Contended // 需要JVM参数:-XX:-RestrictContended
private static class PaddedCounters {
private volatile long p1, p2, p3, p4, p5, p6, p7; // 填充
private final AtomicLong counter1 = new AtomicLong(0);
private volatile long q1, q2, q3, q4, q5, q6, q7; // 填充

private final AtomicLong counter2 = new AtomicLong(0);
private volatile long r1, r2, r3, r4, r5, r6, r7; // 填充
}

private final PaddedCounters paddedCounters = new PaddedCounters();

public void incrementPaddedCounters() {
paddedCounters.counter1.incrementAndGet();
paddedCounters.counter2.incrementAndGet();
}

// ✅ 更简单的解决方案:使用LongAdder
private final LongAdder adder1 = new LongAdder();
private final LongAdder adder2 = new LongAdder();

public void incrementAdders() {
adder1.increment();
adder2.increment();
}
}
}

2. 最佳实践指南

public class ConcurrentCollectionBestPractices {

/**
* 最佳实践1:容量预分配
*/
public static class CapacityPreallocation {

// ✅ 好的做法:预估容量
public ConcurrentHashMap<String, String> createOptimizedMap(int expectedSize) {
// 根据负载因子计算初始容量
float loadFactor = 0.75f;
int initialCapacity = (int) (expectedSize / loadFactor) + 1;
return new ConcurrentHashMap<>(initialCapacity, loadFactor);
}

// ✅ 好的做法:根据场景选择合适的队列容量
public BlockingQueue<Runnable> createOptimizedQueue(int producerCount, int consumerCount) {
// 队列容量应该是生产者和消费者处理能力差的一个合理估计
int queueCapacity = Math.max(producerCount, consumerCount) * 10;
return new LinkedBlockingQueue<>(queueCapacity);
}
}

/**
* 最佳实践2:批量操作优化
*/
public static class BatchOperationOptimization {
private final ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();

// ❌ 低效:逐个操作
public void batchUpdateBad(Map<String, Integer> newData) {
newData.forEach((key, value) -> {
map.put(key, value);
});
}

// ✅ 高效:批量操作
public void batchUpdateGood(Map<String, Integer> newData) {
map.putAll(newData);
}

// ✅ 更高效:使用并行流
public void batchUpdateParallel(Map<String, Integer> newData) {
newData.entrySet()
.parallelStream()
.forEach(entry -> map.put(entry.getKey(), entry.getValue()));
}

// ✅ 原子性批量操作
public void atomicBatchUpdate(Map<String, Integer> updates) {
Map<String, Integer> snapshot = new HashMap<>(map);
snapshot.putAll(updates);

// 原子性地替换整个map
map.clear();
map.putAll(snapshot);
}
}

/**
* 最佳实践3:监控和调试
*/
public static class MonitoringAndDebugging {
private final ConcurrentHashMap<String, Object> collection = new ConcurrentHashMap<>();
private final AtomicLong totalOperations = new AtomicLong(0);
private final AtomicLong failedOperations = new AtomicLong(0);

public Object get(String key) {
totalOperations.incrementAndGet();
try {
Object result = collection.get(key);
return result;
} catch (Exception e) {
failedOperations.incrementAndGet();
throw e;
}
}

public void put(String key, Object value) {
totalOperations.incrementAndGet();
try {
collection.put(key, value);
} catch (Exception e) {
failedOperations.incrementAndGet();
throw e;
}
}

// 监控指标
public void printMetrics() {
long total = totalOperations.get();
long failed = failedOperations.get();
double successRate = total == 0 ? 0.0 : (double) (total - failed) / total;

System.out.println("=== 集合监控指标 ===");
System.out.println("总操作数: " + total);
System.out.println("失败操作数: " + failed);
System.out.println("成功率: " + String.format("%.2f%%", successRate * 100));
System.out.println("集合大小: " + collection.size());
}

// 定期监控
public static void scheduleMonitoring(ConcurrentCollectionBestPractices instance) {
ScheduledExecutorService monitor = Executors.newSingleThreadScheduledExecutor();
monitor.scheduleAtFixedRate(instance::printMetrics, 30, 30, TimeUnit.SECONDS);
}
}

/**
* 最佳实践4:资源管理
*/
public static class ResourceManagement {

// ✅ 好的做法:使用try-with-resources管理ExecutorService
public void autoManagedResources() {
try (ExecutorService executor = Executors.newFixedThreadPool(4)) {
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();

// 使用资源
for (int i = 0; i < 100; i++) {
final int taskId = i;
executor.submit(() -> processTask(taskId));
}

// 等待所有任务完成
executor.shutdown();
executor.awaitTermination(1, TimeUnit.MINUTES);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

// ✅ 好的做法:使用工具类简化资源管理
public static class ResourceManager implements AutoCloseable {
private final List<ExecutorService> executors = new ArrayList<>();
private final List<ScheduledExecutorService> scheduledExecutors = new ArrayList<>();

public ExecutorService newFixedThreadPool(int nThreads) {
ExecutorService executor = Executors.newFixedThreadPool(nThreads);
executors.add(executor);
return executor;
}

public ScheduledExecutorService newSingleThreadScheduledExecutor() {
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
scheduledExecutors.add(executor);
return executor;
}

@Override
public void close() {
// 关闭所有线程池
executors.forEach(executor -> {
executor.shutdown();
try {
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
});

scheduledExecutors.forEach(executor -> {
executor.shutdown();
try {
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
});
}
}

private static void processTask(int taskId) {
System.out.println("Processing task: " + taskId);
}
}

/**
* 最佳实践5:异常处理和恢复
*/
public static class ExceptionHandlingAndRecovery {
private final BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>();
private final AtomicBoolean isProcessing = new AtomicBoolean(false);

public void submitTask(Runnable task) {
taskQueue.offer(task);
startProcessingIfNecessary();
}

private void startProcessingIfNecessary() {
if (isProcessing.compareAndSet(false, true)) {
new Thread(this::processTasks).start();
}
}

private void processTasks() {
try {
while (!Thread.currentThread().isInterrupted()) {
Runnable task = taskQueue.take();
try {
task.run();
} catch (Exception e) {
// 记录异常但继续处理其他任务
System.err.println("任务执行失败: " + e.getMessage());
// 可以考虑将失败的任务重新加入队列或发送到死信队列
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
isProcessing.set(false);
// 如果队列中还有任务,启动新的处理线程
if (!taskQueue.isEmpty()) {
startProcessingIfNecessary();
}
}
}
}
}

📚 学习路径与总结

学习路线图

初学者阶段(1-2周)

  1. 理解线程安全问题:传统集合在多线程环境下的问题
  2. 掌握基础并发集合:ConcurrentHashMap、CopyOnWriteArrayList
  3. 理解阻塞队列:生产者-消费者模式
  4. 实践简单应用:多线程计数器、缓存

进阶阶段(2-3周)

  1. 深入设计原理:CAS、分段锁、写时复制机制
  2. 掌握跳表集合:ConcurrentSkipListMap/Set
  3. 性能调优技巧:容量预分配、批量操作
  4. 复杂应用场景:分布式缓存、实时数据分析

专家阶段(3-4周)

  1. 源码级别分析:JUC集合的底层实现
  2. 性能基准测试:不同场景下的性能对比
  3. 自定义并发集合:根据特殊需求设计并发数据结构
  4. 大规模应用:高并发系统的并发集合选择和优化

核心要点总结

必须掌握的概念

  • 线程安全机制:锁、CAS、写时复制
  • 性能特征:读写性能、内存使用、扩展性
  • 适用场景:读写比例、有序需求、容量限制
  • API特点:原子方法、弱一致性迭代器、特殊语义

实战应用关键点

  • 正确选择集合类型:根据业务场景选择最优实现
  • 避免常见陷阱:复合操作、内存泄漏、性能问题
  • 性能优化技巧:容量规划、批量操作、监控调优
  • 异常处理策略:容错机制、恢复策略

面试准备重点

  • 设计原理:各种并发集合的底层实现机制
  • 性能对比:不同场景下的性能特征分析
  • 实际应用:真实项目中的使用经验和问题解决
  • 源码理解:关键类和方法的作用和实现

💡 学习建议:并发集合是多线程编程的基础工具,不仅要掌握API使用,更要理解设计思想和适用场景。建议每个知识点都通过实际编程验证,培养并发编程的思维模式。

接下来建议学习顺序:

  1. ConcurrentHashMap深度解析 - 最常用的并发Map
  2. 并发队列体系 - 生产者消费者模式
  3. 线程池与并发集合 - 集合在线程池中的应用
  4. 并发编程模式 - 整体并发编程知识体系