Java 线程池完全指南 📚
本文适合:零基础初学者、准备面试的工程师、需要系统性掌握线程池的开发者
🚀 目录
- 1. 什么是线程池?为什么需要它?
- 2. 线程池的基本使用
- 3. ThreadPoolExecutor 核心参数详解
- 4. 任务执行流程图解
- 5. 拒绝策略全解析
- 6. 实战:创建自定义线程池
- 7. Executors 工具类的"陷阱"
- 8. 面试高频考点
- 9. 性能调优与监控
- 10. 常见问题与解决方案
1. 什么是线程池?为什么需要它?
🤔 先看一个问题
// 传统方式:每次任务都创建新线程
for (int i = 0; i < 1000; i++) {
new Thread(() -> {
// 执行任务...
System.out.println("处理任务: " + Thread.currentThread().getName());
}).start();
}
问题:这样写有什么问题?
- 资源浪费:创建线程需要系统资源,1000个线程会消耗大量内存
- 性能下降:频繁创建和销毁线程会降低系统性能
- 不可控:无法控制线程数量,可能导致系统崩溃
💡 线程池的概念
线程池 = 线程的复用池子
就像游泳池一样:
- 游泳池有固定的泳道(核心线程数)
- 人多时可以临时增加泳道(最大线程数)
- 没人时关闭临时泳道(线程回收)
// 线程池方式:线程复用
ExecutorService pool = Executors.newFixedThreadPool(10); // 创建10个线程的池子
for (int i = 0; i < 1000; i++) {
pool.execute(() -> { // 提交任务到池子
// 执行任务...
System.out.println("处理任务: " + Thread.currentThread().getName());
});
}
pool.shutdown(); // 关闭池子
}
优势对比:
| 方面 | 传统方式 | 线程池方式 |
|---|---|---|
| 线程创建 | 每次新建 | 复用已有线程 |
| 资源消耗 | 高 | 低 |
| 性能 | 差 | 好 |
| 可控性 | 难控制 | 易控制 |
2. 线程池的基本使用
2.1 ExecutorService 接口
// 创建线程池的几种方式
ExecutorService pool1 = Executors.newFixedThreadPool(5); // 固定大小
ExecutorService pool2 = Executors.newCachedThreadPool(); // 可变大小
ExecutorService pool3 = Executors.newSingleThreadExecutor(); // 单线程
ExecutorService pool4 = Executors.newScheduledThreadPool(3); // 定时任务
// 提交任务的方式
pool1.execute(new RunnableTask()); // 无返回值
Future<String> future = pool1.submit(new CallableTask()); // 有返回值
// 关闭线程池
pool1.shutdown(); // 平滑关闭
pool1.shutdownNow(); // 立即关闭
2.2 完整示例
public class ThreadPoolDemo {
public static void main(String[] args) {
// 1. 创建线程池
ExecutorService pool = Executors.newFixedThreadPool(3);
// 2. 提交任务
for (int i = 0; i < 10; i++) {
final int taskId = i;
pool.submit(() -> {
System.out.println("任务 " + taskId + " 开始执行,线程: "
+ Thread.currentThread().getName());
try {
Thread.sleep(1000); // 模拟耗时操作
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务 " + taskId + " 执行完毕");
});
}
// 3. 关闭线程池
pool.shutdown();
try {
// 等待所有任务完成
pool.awaitTermination(1, TimeUnit.MINUTES);
} catch (InterruptedException e) {
pool.shutdownNow();
}
}
}
3. ThreadPoolExecutor 核心参数详解
public ThreadPoolExecutor(
int corePoolSize, // 核心线程数
int maximumPoolSize, // 最大线程数
long keepAliveTime, // 空闲线程存活时间
TimeUnit unit, // 时间单位
BlockingQueue<Runnable> workQueue, // 任务队列
ThreadFactory threadFactory, // 线程工厂
RejectedExecutionHandler handler // 拒绝策略
);
3.1 参数详解(生活中的类比)
| 参数 | 作用 | 生活类比 | 设置建议 |
|---|---|---|---|
corePoolSize | 核心线程数 | 公司正式员工 | CPU密集型: CPU核心数+1 IO密集型: CPU核心数*2 |
maximumPoolSize | 最大线程数 | 公司最多容纳人数 | 考虑系统承载能力,避免OOM |
keepAliveTime | 空闲线程存活时间 | 兼职人员没活多久辞退 | IO密集型可长,CPU密集型可短 |
workQueue | 任务队列 | 待办事项列表 | 有界队列防止内存溢出 |
threadFactory | 线程工厂 | 员工命名规则 | 设置有意义的线程名,便于排查 |
handler | 拒绝策略 | 任务太多怎么办 | 根据业务需要选择策略 |
3.2 任务队列的选择
// 1. 有界队列 - 推荐
new ArrayBlockingQueue<>(100); // 先进先出,固定大小
new LinkedBlockingQueue<>(100); // 先进先出,可设容量
// 2. 无界队列 - 谨慎使用
new LinkedBlockingQueue<>(); // 可能导致内存溢出
// 3. 同步队列 - 直接传递
new SynchronousQueue<>(); // 不存储任务,直接交给线程执行
// 4. 优先级队列
new PriorityBlockingQueue<>(); // 根据优先级执行任务
4. 任务执行流程图解
流程说明:
- 第一步:线程数小于核心线程数 → 创建新线程
- 第二步:核心线程已满 → 尝试放入队列
- 第三步:队列已满 → 尝试创建非核心线程
- 第四步:达到最大线程数 → 执行拒绝策略
5. 拒绝策略全解析
当任务队列满了,线程数达到最大,新任务来临时怎么办?
5.1 JDK内置的4种策略
// 1. 默认策略:抛出异常
RejectedExecutionHandler abortPolicy = new ThreadPoolExecutor.AbortPolicy();
// 抛出 RejectedExecutionException 异常
// 2. 调用者运行策略
RejectedExecutionHandler callerRunsPolicy = new ThreadPoolExecutor.CallerRunsPolicy();
// 由提交任务的线程自己执行
// 3. 丢弃策略
RejectedExecutionHandler discardPolicy = new ThreadPoolExecutor.DiscardPolicy();
// 静默丢弃任务,不通知
// 4. 丢弃最老任务策略
RejectedExecutionHandler discardOldestPolicy = new ThreadPoolExecutor.DiscardOldestPolicy();
// 丢弃队列中最老的任务,执行新任务
5.2 实际应用示例
public class RejectionPolicyExample {
public static void main(String[] args) {
ThreadPoolExecutor pool = new ThreadPoolExecutor(
2, 4, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2),
new ThreadPoolExecutor.CallerRunsPolicy() // 调用者运行策略
);
// 提交7个任务(2+4+2=8个容量,这里故意测试)
for (int i = 1; i <= 7; i++) {
final int taskId = i;
System.out.println("提交任务: " + taskId);
pool.execute(() -> {
System.out.println("执行任务: " + taskId + ", 线程: "
+ Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
pool.shutdown();
}
}
5.3 自定义拒绝策略
// 自定义拒绝策略:记录日志并降级处理
public class CustomRejectionHandler implements RejectedExecutionHandler {
private static final Logger logger = LoggerFactory.getLogger(CustomRejectionHandler.class);
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
logger.error("任务被拒绝执行: {}", r);
// 可以选择:1.放入队列重试 2.写入数据库 3.发送MQ
if (!executor.isShutdown()) {
try {
// 尝试放入队列,失败则记录
if (!executor.getQueue().offer(r, 100, TimeUnit.MILLISECONDS)) {
logger.error("重试放入队列失败,任务将被丢弃");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
6. 实战:创建自定义线程池
6.1 推荐的线程池配置
public class ThreadPoolConfig {
/**
* IO密集型线程池
* 适用于:文件读写、网络请求、数据库操作
*/
public static ThreadPoolExecutor createIOThreadPool() {
int coreSize = Runtime.getRuntime().availableProcessors() * 2;
int maxSize = coreSize * 2;
return new ThreadPoolExecutor(
coreSize,
maxSize,
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
new NamedThreadFactory("io-worker"),
new ThreadPoolExecutor.CallerRunsPolicy()
);
}
/**
* CPU密集型线程池
* 适用于:计算密集、算法处理、图像渲染
*/
public static ThreadPoolExecutor createCPUThreadPool() {
int coreSize = Runtime.getRuntime().availableProcessors() + 1;
return new ThreadPoolExecutor(
coreSize,
coreSize,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(200),
new NamedThreadFactory("cpu-worker"),
new ThreadPoolExecutor.AbortPolicy()
);
}
/**
* 自定义线程工厂
*/
static class NamedThreadFactory implements ThreadFactory {
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
public NamedThreadFactory(String namePrefix) {
this.namePrefix = "pool-" + namePrefix + "-thread-";
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, namePrefix + threadNumber.getAndIncrement());
t.setDaemon(false);
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
}
6.2 线程池使用最佳实践
public class ThreadPoolBestPractices {
public static void main(String[] args) throws Exception {
ThreadPoolExecutor pool = ThreadPoolConfig.createIOThreadPool();
try {
// 1. 提交任务并获取结果
Future<String> future = pool.submit(() -> {
Thread.sleep(2000);
return "任务完成";
});
// 2. 设置超时获取结果
String result = future.get(3, TimeUnit.SECONDS);
System.out.println(result);
// 3. 批量提交任务
List<Future<Integer>> futures = new ArrayList<>();
for (int i = 0; i < 10; i++) {
final int taskId = i;
futures.add(pool.submit(() -> {
Thread.sleep(1000);
return taskId * taskId;
}));
}
// 4. 获取所有结果
for (Future<Integer> f : futures) {
System.out.println(f.get());
}
} finally {
// 5. 优雅关闭
shutdownPool(pool);
}
}
/**
* 优雅关闭线程池
*/
private static void shutdownPool(ThreadPoolExecutor pool) {
pool.shutdown(); // 不接受新任务
try {
// 等待现有任务完成
if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
pool.shutdownNow(); // 强制关闭
if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
System.err.println("线程池未能完全关闭");
}
}
} catch (InterruptedException e) {
pool.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
7. Executors 工具类的"陷阱"
7.1 为什么生产环境不建议使用Executors?
// ❌ 不推荐的方式
ExecutorService pool1 = Executors.newFixedThreadPool(10); // 队列无界,可能OOM
ExecutorService pool2 = Executors.newCachedThreadPool(); // 线程无界,可能撑爆
ExecutorService pool3 = Executors.newSingleThreadExecutor(); // 队列无界
问题分析:
newFixedThreadPool: 使用LinkedBlockingQueue,容量为Integer.MAX_VALUEnewCachedThreadPool: 最大线程数为Integer.MAX_VALUEnewSingleThreadExecutor: 同样使用无界队列
7.2 推荐的替代方案
// ✅ 推荐的方式
public class SafeThreadPool {
public static ThreadPoolExecutor createSafeFixedPool(int size) {
return new ThreadPoolExecutor(
size, size,
0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1000), // 使用有界队列
new NamedThreadFactory("safe-pool"),
new ThreadPoolExecutor.CallerRunsPolicy()
);
}
public static ThreadPoolExecutor createSafeCachedPool() {
return new ThreadPoolExecutor(
0, 50, // 限制最大线程数
60L, TimeUnit.SECONDS,
new SynchronousQueue<>(),
new NamedThreadFactory("safe-cached"),
new ThreadPoolExecutor.AbortPolicy()
);
}
}
8. 面试高频考点
8.1 线程池大小如何确定?
/**
* CPU密集型任务(计算多,IO少)
* 线程数 = CPU核心数 + 1
*/
int cpuIntensiveSize = Runtime.getRuntime().availableProcessors() + 1;
/**
* IO密集型任务(等待多,计算少)
* 线程数 = CPU核心数 * (1 + 平均等待时间/平均计算时间)
*/
int ioIntensiveSize = Runtime.getRuntime().availableProcessors() * 2;
/**
* 混合型任务
* 根据实际压测结果调整
*/
8.2 核心面试题详解
Q1: 线程池的工作原理是什么?
答:线程池的工作分为以下几个步骤:
- 提交任务时,如果当前线程数小于
corePoolSize,创建新线程执行 - 否则尝试将任务放入
workQueue - 如果队列已满,且线程数小于
maximumPoolSize,创建非核心线程 - 如果队列已满且线程数达到最大,执行拒绝策略
Q2: shutdown() 和 shutdownNow() 的区别?
// shutdown(): 平滑关闭
// 1. 不再接受新任务
// 2. 已提交的任务会继续执行
// 3. 等待所有任务完成后关闭
// shutdownNow(): 强制关闭
// 1. 不再接受新任务
// 2. 尝试中断正在执行的任务
// 3. 清空队列中的等待任务
// 4. 返回未执行的任务列表
Q3: 如何避免线程池中的任务丢失?
// 方案1: 使用可靠的拒绝策略
new ThreadPoolExecutor.CallerRunsPolicy() // 调用者执行
// 方案2: 重试机制
public class RetryRejectedHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// 放入MQ或数据库重试
retryQueue.offer(r);
}
}
// 方案3: 优雅关闭确保任务完成
public static void gracefulShutdown(ThreadPoolExecutor pool) {
pool.shutdown();
try {
if (!pool.awaitTermination(30, TimeUnit.SECONDS)) {
pool.shutdownNow();
}
} catch (InterruptedException e) {
pool.shutdownNow();
Thread.currentThread().interrupt();
}
}
Q4: 线程池的参数如何动态调整?
ThreadPoolExecutor pool = new ThreadPoolExecutor(...);
// 运行时调整参数
pool.setCorePoolSize(20); // 调整核心线程数
pool.setMaximumPoolSize(50); // 调整最大线程数
pool.setKeepAliveTime(120, TimeUnit.SECONDS); // 调整存活时间
// 可以结合配置中心实现热更新
@Scheduled(fixedRate = 60000) // 每分钟检查
public void adjustPoolSize() {
PoolConfig config = configCenter.getConfig();
pool.setCorePoolSize(config.getCoreSize());
pool.setMaximumPoolSize(config.getMaxSize());
}
8.3 面试必问场景题
场景:高并发接口中使用线程池,如何设计?
public class HighConcurrencyService {
private final ThreadPoolExecutor businessPool;
private final ThreadPoolExecutor asyncPool;
public HighConcurrencyService() {
// 业务线程池:核心接口处理
this.businessPool = new ThreadPoolExecutor(
20, 50, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(500),
new NamedThreadFactory("business"),
new ThreadPoolExecutor.CallerRunsPolicy()
);
// 异步线程池:非核心任务
this.asyncPool = new ThreadPoolExecutor(
10, 30, 120, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
new NamedThreadFactory("async"),
new ThreadPoolExecutor.DiscardOldestPolicy()
);
}
/**
* 核心业务接口
*/
public CompletableFuture<Result> processRequest(Request request) {
// 快速验证,无效直接返回
if (!isValid(request)) {
return CompletableFuture.completedFuture(Result.error("参数无效"));
}
return CompletableFuture.supplyAsync(() -> {
// 核心业务逻辑
return doBusinessLogic(request);
}, businessPool)
.thenApplyAsync(result -> {
// 异步后续处理:日志、通知等
asyncPool.execute(() -> {
logResult(result);
sendNotification(result);
});
return result;
}, businessPool)
.exceptionally(throwable -> {
// 异常处理
log.error("业务处理失败", throwable);
return Result.error("系统繁忙");
});
}
}
9. 性能调优与监控
9.1 线程池监控指标
public class ThreadPoolMonitor {
private final ThreadPoolExecutor pool;
public ThreadPoolMonitor(ThreadPoolExecutor pool) {
this.pool = pool;
}
/**
* 获取线程池状态信息
*/
public PoolStatus getStatus() {
return PoolStatus.builder()
.coreSize(pool.getCorePoolSize())
.maximumSize(pool.getMaximumPoolSize())
.activeCount(pool.getActiveCount())
.poolSize(pool.getPoolSize())
.taskCount(pool.getTaskCount())
.completedTaskCount(pool.getCompletedTaskCount())
.queueSize(pool.getQueue().size())
.queueRemainingCapacity(pool.getQueue().remainingCapacity())
.isShutdown(pool.isShutdown())
.isTerminated(pool.isTerminated())
.build();
}
/**
* 监控线程池健康状态
*/
@Scheduled(fixedRate = 5000) // 每5秒监控一次
public void monitor() {
PoolStatus status = getStatus();
// 检查队列积压
if (status.getQueueSize() > status.getMaximumSize() * 0.8) {
log.warn("任务队列积压严重: {}/{}",
status.getQueueSize(), status.getMaximumSize());
}
// 检查活跃线程比例
double activeRatio = (double) status.getActiveCount() / status.getPoolSize();
if (activeRatio > 0.9) {
log.warn("线程池繁忙: {}/{}",
status.getActiveCount(), status.getPoolSize());
}
// 检查拒绝次数(需要自定义拒绝策略统计)
if (rejectedCount > 100) { // 假设有rejectCount字段
log.warn("拒绝策略频繁触发: {}", rejectedCount);
}
}
}
9.2 性能调优建议
public class ThreadPoolTuning {
/**
* 根据业务场景调整线程池参数
*/
public static ThreadPoolExecutor createOptimizedPool(String businessType) {
switch (businessType) {
case "WEB_API":
// Web接口:快速响应,线程数适中
return new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors() * 2,
30, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(200),
new NamedThreadFactory("web-api"),
new ThreadPoolExecutor.CallerRunsPolicy()
);
case "BATCH_JOB":
// 批处理任务:高吞吐,队列容量大
return new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors() * 2,
Runtime.getRuntime().availableProcessors() * 4,
60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
new NamedThreadFactory("batch-job"),
new ThreadPoolExecutor.AbortPolicy()
);
case "ASYNC_TASK":
// 异步任务:长存活时间,队列适中
return new ThreadPoolExecutor(
5,
20,
300, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(500),
new NamedThreadFactory("async-task"),
new ThreadPoolExecutor.DiscardOldestPolicy()
);
default:
return createDefaultPool();
}
}
}
10. 常见问题与解决方案
10.1 OOM(内存溢出)
问题:线程池导致内存溢出
原因:
- 任务队列无界
- 最大线程数过大
- 任务对象过大
解决方案:
// 使用有界队列
new ArrayBlockingQueue<>(1000)
// 限制最大线程数
.setMaximumPoolSize(100)
// 使用内存友好的任务对象
Runnable task = () -> {
// 避免在任务中创建大对象
processDataWithSmallBuffer();
};
10.2 死锁
问题:线程池中的任务相互等待
示例:
// ❌ 可能导致死锁
pool.submit(() -> {
Future<String> f = pool.submit(() -> "hello"); // 等待自己执行
System.out.println(f.get()); // 死锁!
});
解决方案:
// ✅ 使用不同的线程池
ThreadPoolExecutor pool1 = new ThreadPoolExecutor(...);
ThreadPoolExecutor pool2 = new ThreadPoolExecutor(...);
pool1.submit(() -> {
Future<String> f = pool2.submit(() -> "hello"); // 使用不同的池
System.out.println(f.get());
});
10.3 任务执行异常
问题:任务异常导致线程终止
解决方案:
// 在任务内部处理异常
pool.submit(() -> {
try {
doSomething();
} catch (Exception e) {
log.error("任务执行失败", e);
// 不抛出异常,避免线程终止
}
});
// 或者使用UncaughtExceptionHandler
ThreadFactory factory = r -> {
Thread t = new Thread(r);
t.setUncaughtExceptionHandler((thread, ex) -> {
log.error("线程异常: " + thread.getName(), ex);
});
return t;
};
10.4 线程泄漏
问题:忘记关闭线程池
解决方案:
public class ResourceManager implements AutoCloseable {
private final ThreadPoolExecutor pool;
public ResourceManager() {
this.pool = new ThreadPoolExecutor(...);
}
public void execute(Runnable task) {
pool.execute(task);
}
@Override
public void close() {
if (pool != null && !pool.isShutdown()) {
pool.shutdown();
try {
if (!pool.awaitTermination(30, TimeUnit.SECONDS)) {
pool.shutdownNow();
}
} catch (InterruptedException e) {
pool.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
}
// 使用try-with-resources自动关闭
try (ResourceManager manager = new ResourceManager()) {
manager.execute(() -> doWork());
manager.execute(() -> doMoreWork());
} // 自动调用close()
🎯 总结
核心要点回顾
- 线程池的本质:线程的复用池,避免频繁创建销毁线程
- 核心参数:corePoolSize、maximumPoolSize、keepAliveTime、workQueue、threadFactory、handler
- 执行流程:核心线程 → 队列 → 非核心线程 → 拒绝策略
- 生产实践:不使用Executors默认方法,自定义参数
- 监控调优:关注队列长度、活跃线程数、任务完成时间
- 常见陷阱:OOM、死锁、线程泄漏、异常处理
面试必背清单
- 线程池的工作原理和参数含义
- shutdown()与shutdownNow()的区别
- 如何确定线程池大小
- 为什么不推荐使用Executors
- 拒绝策略的类型和应用场景
- 如何避免任务丢失
- 线程池监控指标
- 常见问题的解决方案
进阶学习路径
- 深入源码:阅读ThreadPoolExecutor源码
- 分布式线程池:了解分布式环境下的线程池管理
- 线程池框架:学习Hystrix、Sentinel等框架的线程池应用
- 性能压测:使用JMeter等工具进行压力测试
- 监控集成:集成Prometheus、Grafana进行监控
💡 记住:线程池是Java并发编程的基础,掌握它不仅能提升代码质量,也是面试的必考点。建议动手实践每个示例,加深理解!