跳到主要内容

Java 线程池完全指南 📚

本文适合:零基础初学者、准备面试的工程师、需要系统性掌握线程池的开发者

🚀 目录

1. 什么是线程池?为什么需要它?

🤔 先看一个问题

// 传统方式:每次任务都创建新线程
for (int i = 0; i < 1000; i++) {
new Thread(() -> {
// 执行任务...
System.out.println("处理任务: " + Thread.currentThread().getName());
}).start();
}

问题:这样写有什么问题?

  1. 资源浪费:创建线程需要系统资源,1000个线程会消耗大量内存
  2. 性能下降:频繁创建和销毁线程会降低系统性能
  3. 不可控:无法控制线程数量,可能导致系统崩溃

💡 线程池的概念

线程池 = 线程的复用池子

就像游泳池一样:

  • 游泳池有固定的泳道(核心线程数)
  • 人多时可以临时增加泳道(最大线程数)
  • 没人时关闭临时泳道(线程回收)
// 线程池方式:线程复用
ExecutorService pool = Executors.newFixedThreadPool(10); // 创建10个线程的池子

for (int i = 0; i < 1000; i++) {
pool.execute(() -> { // 提交任务到池子
// 执行任务...
System.out.println("处理任务: " + Thread.currentThread().getName());
});
}

pool.shutdown(); // 关闭池子
}

优势对比

方面传统方式线程池方式
线程创建每次新建复用已有线程
资源消耗
性能
可控性难控制易控制

2. 线程池的基本使用

2.1 ExecutorService 接口

// 创建线程池的几种方式
ExecutorService pool1 = Executors.newFixedThreadPool(5); // 固定大小
ExecutorService pool2 = Executors.newCachedThreadPool(); // 可变大小
ExecutorService pool3 = Executors.newSingleThreadExecutor(); // 单线程
ExecutorService pool4 = Executors.newScheduledThreadPool(3); // 定时任务

// 提交任务的方式
pool1.execute(new RunnableTask()); // 无返回值
Future<String> future = pool1.submit(new CallableTask()); // 有返回值

// 关闭线程池
pool1.shutdown(); // 平滑关闭
pool1.shutdownNow(); // 立即关闭

2.2 完整示例

public class ThreadPoolDemo {
public static void main(String[] args) {
// 1. 创建线程池
ExecutorService pool = Executors.newFixedThreadPool(3);

// 2. 提交任务
for (int i = 0; i < 10; i++) {
final int taskId = i;
pool.submit(() -> {
System.out.println("任务 " + taskId + " 开始执行,线程: "
+ Thread.currentThread().getName());
try {
Thread.sleep(1000); // 模拟耗时操作
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务 " + taskId + " 执行完毕");
});
}

// 3. 关闭线程池
pool.shutdown();
try {
// 等待所有任务完成
pool.awaitTermination(1, TimeUnit.MINUTES);
} catch (InterruptedException e) {
pool.shutdownNow();
}
}
}

3. ThreadPoolExecutor 核心参数详解

public ThreadPoolExecutor(
int corePoolSize, // 核心线程数
int maximumPoolSize, // 最大线程数
long keepAliveTime, // 空闲线程存活时间
TimeUnit unit, // 时间单位
BlockingQueue<Runnable> workQueue, // 任务队列
ThreadFactory threadFactory, // 线程工厂
RejectedExecutionHandler handler // 拒绝策略
);

3.1 参数详解(生活中的类比)

参数作用生活类比设置建议
corePoolSize核心线程数公司正式员工CPU密集型: CPU核心数+1 IO密集型: CPU核心数*2
maximumPoolSize最大线程数公司最多容纳人数考虑系统承载能力,避免OOM
keepAliveTime空闲线程存活时间兼职人员没活多久辞退IO密集型可长,CPU密集型可短
workQueue任务队列待办事项列表有界队列防止内存溢出
threadFactory线程工厂员工命名规则设置有意义的线程名,便于排查
handler拒绝策略任务太多怎么办根据业务需要选择策略

3.2 任务队列的选择

// 1. 有界队列 - 推荐
new ArrayBlockingQueue<>(100); // 先进先出,固定大小
new LinkedBlockingQueue<>(100); // 先进先出,可设容量

// 2. 无界队列 - 谨慎使用
new LinkedBlockingQueue<>(); // 可能导致内存溢出

// 3. 同步队列 - 直接传递
new SynchronousQueue<>(); // 不存储任务,直接交给线程执行

// 4. 优先级队列
new PriorityBlockingQueue<>(); // 根据优先级执行任务

4. 任务执行流程图解

流程说明

  1. 第一步:线程数小于核心线程数 → 创建新线程
  2. 第二步:核心线程已满 → 尝试放入队列
  3. 第三步:队列已满 → 尝试创建非核心线程
  4. 第四步:达到最大线程数 → 执行拒绝策略

5. 拒绝策略全解析

当任务队列满了,线程数达到最大,新任务来临时怎么办?

5.1 JDK内置的4种策略

// 1. 默认策略:抛出异常
RejectedExecutionHandler abortPolicy = new ThreadPoolExecutor.AbortPolicy();
// 抛出 RejectedExecutionException 异常

// 2. 调用者运行策略
RejectedExecutionHandler callerRunsPolicy = new ThreadPoolExecutor.CallerRunsPolicy();
// 由提交任务的线程自己执行

// 3. 丢弃策略
RejectedExecutionHandler discardPolicy = new ThreadPoolExecutor.DiscardPolicy();
// 静默丢弃任务,不通知

// 4. 丢弃最老任务策略
RejectedExecutionHandler discardOldestPolicy = new ThreadPoolExecutor.DiscardOldestPolicy();
// 丢弃队列中最老的任务,执行新任务

5.2 实际应用示例

public class RejectionPolicyExample {
public static void main(String[] args) {
ThreadPoolExecutor pool = new ThreadPoolExecutor(
2, 4, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2),
new ThreadPoolExecutor.CallerRunsPolicy() // 调用者运行策略
);

// 提交7个任务(2+4+2=8个容量,这里故意测试)
for (int i = 1; i <= 7; i++) {
final int taskId = i;
System.out.println("提交任务: " + taskId);

pool.execute(() -> {
System.out.println("执行任务: " + taskId + ", 线程: "
+ Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}

pool.shutdown();
}
}

5.3 自定义拒绝策略

// 自定义拒绝策略:记录日志并降级处理
public class CustomRejectionHandler implements RejectedExecutionHandler {
private static final Logger logger = LoggerFactory.getLogger(CustomRejectionHandler.class);

@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
logger.error("任务被拒绝执行: {}", r);

// 可以选择:1.放入队列重试 2.写入数据库 3.发送MQ
if (!executor.isShutdown()) {
try {
// 尝试放入队列,失败则记录
if (!executor.getQueue().offer(r, 100, TimeUnit.MILLISECONDS)) {
logger.error("重试放入队列失败,任务将被丢弃");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}

6. 实战:创建自定义线程池

6.1 推荐的线程池配置

public class ThreadPoolConfig {

/**
* IO密集型线程池
* 适用于:文件读写、网络请求、数据库操作
*/
public static ThreadPoolExecutor createIOThreadPool() {
int coreSize = Runtime.getRuntime().availableProcessors() * 2;
int maxSize = coreSize * 2;

return new ThreadPoolExecutor(
coreSize,
maxSize,
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
new NamedThreadFactory("io-worker"),
new ThreadPoolExecutor.CallerRunsPolicy()
);
}

/**
* CPU密集型线程池
* 适用于:计算密集、算法处理、图像渲染
*/
public static ThreadPoolExecutor createCPUThreadPool() {
int coreSize = Runtime.getRuntime().availableProcessors() + 1;

return new ThreadPoolExecutor(
coreSize,
coreSize,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(200),
new NamedThreadFactory("cpu-worker"),
new ThreadPoolExecutor.AbortPolicy()
);
}

/**
* 自定义线程工厂
*/
static class NamedThreadFactory implements ThreadFactory {
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;

public NamedThreadFactory(String namePrefix) {
this.namePrefix = "pool-" + namePrefix + "-thread-";
}

@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, namePrefix + threadNumber.getAndIncrement());
t.setDaemon(false);
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
}

6.2 线程池使用最佳实践

public class ThreadPoolBestPractices {

public static void main(String[] args) throws Exception {
ThreadPoolExecutor pool = ThreadPoolConfig.createIOThreadPool();

try {
// 1. 提交任务并获取结果
Future<String> future = pool.submit(() -> {
Thread.sleep(2000);
return "任务完成";
});

// 2. 设置超时获取结果
String result = future.get(3, TimeUnit.SECONDS);
System.out.println(result);

// 3. 批量提交任务
List<Future<Integer>> futures = new ArrayList<>();
for (int i = 0; i < 10; i++) {
final int taskId = i;
futures.add(pool.submit(() -> {
Thread.sleep(1000);
return taskId * taskId;
}));
}

// 4. 获取所有结果
for (Future<Integer> f : futures) {
System.out.println(f.get());
}

} finally {
// 5. 优雅关闭
shutdownPool(pool);
}
}

/**
* 优雅关闭线程池
*/
private static void shutdownPool(ThreadPoolExecutor pool) {
pool.shutdown(); // 不接受新任务
try {
// 等待现有任务完成
if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
pool.shutdownNow(); // 强制关闭
if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
System.err.println("线程池未能完全关闭");
}
}
} catch (InterruptedException e) {
pool.shutdownNow();
Thread.currentThread().interrupt();
}
}
}

7. Executors 工具类的"陷阱"

7.1 为什么生产环境不建议使用Executors?

// ❌ 不推荐的方式
ExecutorService pool1 = Executors.newFixedThreadPool(10); // 队列无界,可能OOM
ExecutorService pool2 = Executors.newCachedThreadPool(); // 线程无界,可能撑爆
ExecutorService pool3 = Executors.newSingleThreadExecutor(); // 队列无界

问题分析

  • newFixedThreadPool: 使用 LinkedBlockingQueue,容量为 Integer.MAX_VALUE
  • newCachedThreadPool: 最大线程数为 Integer.MAX_VALUE
  • newSingleThreadExecutor: 同样使用无界队列

7.2 推荐的替代方案

// ✅ 推荐的方式
public class SafeThreadPool {
public static ThreadPoolExecutor createSafeFixedPool(int size) {
return new ThreadPoolExecutor(
size, size,
0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1000), // 使用有界队列
new NamedThreadFactory("safe-pool"),
new ThreadPoolExecutor.CallerRunsPolicy()
);
}

public static ThreadPoolExecutor createSafeCachedPool() {
return new ThreadPoolExecutor(
0, 50, // 限制最大线程数
60L, TimeUnit.SECONDS,
new SynchronousQueue<>(),
new NamedThreadFactory("safe-cached"),
new ThreadPoolExecutor.AbortPolicy()
);
}
}

8. 面试高频考点

8.1 线程池大小如何确定?

/**
* CPU密集型任务(计算多,IO少)
* 线程数 = CPU核心数 + 1
*/
int cpuIntensiveSize = Runtime.getRuntime().availableProcessors() + 1;

/**
* IO密集型任务(等待多,计算少)
* 线程数 = CPU核心数 * (1 + 平均等待时间/平均计算时间)
*/
int ioIntensiveSize = Runtime.getRuntime().availableProcessors() * 2;

/**
* 混合型任务
* 根据实际压测结果调整
*/

8.2 核心面试题详解

Q1: 线程池的工作原理是什么?

:线程池的工作分为以下几个步骤:

  1. 提交任务时,如果当前线程数小于 corePoolSize,创建新线程执行
  2. 否则尝试将任务放入 workQueue
  3. 如果队列已满,且线程数小于 maximumPoolSize,创建非核心线程
  4. 如果队列已满且线程数达到最大,执行拒绝策略

Q2: shutdown() 和 shutdownNow() 的区别?

// shutdown(): 平滑关闭
// 1. 不再接受新任务
// 2. 已提交的任务会继续执行
// 3. 等待所有任务完成后关闭

// shutdownNow(): 强制关闭
// 1. 不再接受新任务
// 2. 尝试中断正在执行的任务
// 3. 清空队列中的等待任务
// 4. 返回未执行的任务列表

Q3: 如何避免线程池中的任务丢失?

// 方案1: 使用可靠的拒绝策略
new ThreadPoolExecutor.CallerRunsPolicy() // 调用者执行

// 方案2: 重试机制
public class RetryRejectedHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// 放入MQ或数据库重试
retryQueue.offer(r);
}
}

// 方案3: 优雅关闭确保任务完成
public static void gracefulShutdown(ThreadPoolExecutor pool) {
pool.shutdown();
try {
if (!pool.awaitTermination(30, TimeUnit.SECONDS)) {
pool.shutdownNow();
}
} catch (InterruptedException e) {
pool.shutdownNow();
Thread.currentThread().interrupt();
}
}

Q4: 线程池的参数如何动态调整?

ThreadPoolExecutor pool = new ThreadPoolExecutor(...);

// 运行时调整参数
pool.setCorePoolSize(20); // 调整核心线程数
pool.setMaximumPoolSize(50); // 调整最大线程数
pool.setKeepAliveTime(120, TimeUnit.SECONDS); // 调整存活时间

// 可以结合配置中心实现热更新
@Scheduled(fixedRate = 60000) // 每分钟检查
public void adjustPoolSize() {
PoolConfig config = configCenter.getConfig();
pool.setCorePoolSize(config.getCoreSize());
pool.setMaximumPoolSize(config.getMaxSize());
}

8.3 面试必问场景题

场景:高并发接口中使用线程池,如何设计?

public class HighConcurrencyService {
private final ThreadPoolExecutor businessPool;
private final ThreadPoolExecutor asyncPool;

public HighConcurrencyService() {
// 业务线程池:核心接口处理
this.businessPool = new ThreadPoolExecutor(
20, 50, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(500),
new NamedThreadFactory("business"),
new ThreadPoolExecutor.CallerRunsPolicy()
);

// 异步线程池:非核心任务
this.asyncPool = new ThreadPoolExecutor(
10, 30, 120, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
new NamedThreadFactory("async"),
new ThreadPoolExecutor.DiscardOldestPolicy()
);
}

/**
* 核心业务接口
*/
public CompletableFuture<Result> processRequest(Request request) {
// 快速验证,无效直接返回
if (!isValid(request)) {
return CompletableFuture.completedFuture(Result.error("参数无效"));
}

return CompletableFuture.supplyAsync(() -> {
// 核心业务逻辑
return doBusinessLogic(request);
}, businessPool)
.thenApplyAsync(result -> {
// 异步后续处理:日志、通知等
asyncPool.execute(() -> {
logResult(result);
sendNotification(result);
});
return result;
}, businessPool)
.exceptionally(throwable -> {
// 异常处理
log.error("业务处理失败", throwable);
return Result.error("系统繁忙");
});
}
}

9. 性能调优与监控

9.1 线程池监控指标

public class ThreadPoolMonitor {
private final ThreadPoolExecutor pool;

public ThreadPoolMonitor(ThreadPoolExecutor pool) {
this.pool = pool;
}

/**
* 获取线程池状态信息
*/
public PoolStatus getStatus() {
return PoolStatus.builder()
.coreSize(pool.getCorePoolSize())
.maximumSize(pool.getMaximumPoolSize())
.activeCount(pool.getActiveCount())
.poolSize(pool.getPoolSize())
.taskCount(pool.getTaskCount())
.completedTaskCount(pool.getCompletedTaskCount())
.queueSize(pool.getQueue().size())
.queueRemainingCapacity(pool.getQueue().remainingCapacity())
.isShutdown(pool.isShutdown())
.isTerminated(pool.isTerminated())
.build();
}

/**
* 监控线程池健康状态
*/
@Scheduled(fixedRate = 5000) // 每5秒监控一次
public void monitor() {
PoolStatus status = getStatus();

// 检查队列积压
if (status.getQueueSize() > status.getMaximumSize() * 0.8) {
log.warn("任务队列积压严重: {}/{}",
status.getQueueSize(), status.getMaximumSize());
}

// 检查活跃线程比例
double activeRatio = (double) status.getActiveCount() / status.getPoolSize();
if (activeRatio > 0.9) {
log.warn("线程池繁忙: {}/{}",
status.getActiveCount(), status.getPoolSize());
}

// 检查拒绝次数(需要自定义拒绝策略统计)
if (rejectedCount > 100) { // 假设有rejectCount字段
log.warn("拒绝策略频繁触发: {}", rejectedCount);
}
}
}

9.2 性能调优建议

public class ThreadPoolTuning {

/**
* 根据业务场景调整线程池参数
*/
public static ThreadPoolExecutor createOptimizedPool(String businessType) {
switch (businessType) {
case "WEB_API":
// Web接口:快速响应,线程数适中
return new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors() * 2,
30, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(200),
new NamedThreadFactory("web-api"),
new ThreadPoolExecutor.CallerRunsPolicy()
);

case "BATCH_JOB":
// 批处理任务:高吞吐,队列容量大
return new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors() * 2,
Runtime.getRuntime().availableProcessors() * 4,
60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
new NamedThreadFactory("batch-job"),
new ThreadPoolExecutor.AbortPolicy()
);

case "ASYNC_TASK":
// 异步任务:长存活时间,队列适中
return new ThreadPoolExecutor(
5,
20,
300, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(500),
new NamedThreadFactory("async-task"),
new ThreadPoolExecutor.DiscardOldestPolicy()
);

default:
return createDefaultPool();
}
}
}

10. 常见问题与解决方案

10.1 OOM(内存溢出)

问题:线程池导致内存溢出

原因

  • 任务队列无界
  • 最大线程数过大
  • 任务对象过大

解决方案

// 使用有界队列
new ArrayBlockingQueue<>(1000)

// 限制最大线程数
.setMaximumPoolSize(100)

// 使用内存友好的任务对象
Runnable task = () -> {
// 避免在任务中创建大对象
processDataWithSmallBuffer();
};

10.2 死锁

问题:线程池中的任务相互等待

示例

// ❌ 可能导致死锁
pool.submit(() -> {
Future<String> f = pool.submit(() -> "hello"); // 等待自己执行
System.out.println(f.get()); // 死锁!
});

解决方案

// ✅ 使用不同的线程池
ThreadPoolExecutor pool1 = new ThreadPoolExecutor(...);
ThreadPoolExecutor pool2 = new ThreadPoolExecutor(...);

pool1.submit(() -> {
Future<String> f = pool2.submit(() -> "hello"); // 使用不同的池
System.out.println(f.get());
});

10.3 任务执行异常

问题:任务异常导致线程终止

解决方案

// 在任务内部处理异常
pool.submit(() -> {
try {
doSomething();
} catch (Exception e) {
log.error("任务执行失败", e);
// 不抛出异常,避免线程终止
}
});

// 或者使用UncaughtExceptionHandler
ThreadFactory factory = r -> {
Thread t = new Thread(r);
t.setUncaughtExceptionHandler((thread, ex) -> {
log.error("线程异常: " + thread.getName(), ex);
});
return t;
};

10.4 线程泄漏

问题:忘记关闭线程池

解决方案

public class ResourceManager implements AutoCloseable {
private final ThreadPoolExecutor pool;

public ResourceManager() {
this.pool = new ThreadPoolExecutor(...);
}

public void execute(Runnable task) {
pool.execute(task);
}

@Override
public void close() {
if (pool != null && !pool.isShutdown()) {
pool.shutdown();
try {
if (!pool.awaitTermination(30, TimeUnit.SECONDS)) {
pool.shutdownNow();
}
} catch (InterruptedException e) {
pool.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
}

// 使用try-with-resources自动关闭
try (ResourceManager manager = new ResourceManager()) {
manager.execute(() -> doWork());
manager.execute(() -> doMoreWork());
} // 自动调用close()

🎯 总结

核心要点回顾

  1. 线程池的本质:线程的复用池,避免频繁创建销毁线程
  2. 核心参数:corePoolSize、maximumPoolSize、keepAliveTime、workQueue、threadFactory、handler
  3. 执行流程:核心线程 → 队列 → 非核心线程 → 拒绝策略
  4. 生产实践:不使用Executors默认方法,自定义参数
  5. 监控调优:关注队列长度、活跃线程数、任务完成时间
  6. 常见陷阱:OOM、死锁、线程泄漏、异常处理

面试必背清单

  • 线程池的工作原理和参数含义
  • shutdown()与shutdownNow()的区别
  • 如何确定线程池大小
  • 为什么不推荐使用Executors
  • 拒绝策略的类型和应用场景
  • 如何避免任务丢失
  • 线程池监控指标
  • 常见问题的解决方案

进阶学习路径

  1. 深入源码:阅读ThreadPoolExecutor源码
  2. 分布式线程池:了解分布式环境下的线程池管理
  3. 线程池框架:学习Hystrix、Sentinel等框架的线程池应用
  4. 性能压测:使用JMeter等工具进行压力测试
  5. 监控集成:集成Prometheus、Grafana进行监控

💡 记住:线程池是Java并发编程的基础,掌握它不仅能提升代码质量,也是面试的必考点。建议动手实践每个示例,加深理解!