生产环境中使用线程池需要综合考虑资源管理、任务处理、错误恢复和监控等多个方面。以下是生产级别线程池的全面使用指南:
一、线程池创建最佳实践
1. 避免使用Executors快捷方法
// 反模式 - 可能导致OOM
ExecutorService unsafe = Executors.newCachedThreadPool(); // 无界线程池
ExecutorService unsafe2 = Executors.newFixedThreadPool(10); // 无界队列// 正确方式 - 手动创建ThreadPoolExecutor
int corePoolSize = Runtime.getRuntime().availableProcessors();
int maxPoolSize = corePoolSize * 2;
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(100); // 有界队列
RejectedExecutionHandler handler = new CustomRejectionPolicy();ExecutorService executor = new ThreadPoolExecutor(corePoolSize,maxPoolSize,60L, TimeUnit.SECONDS,workQueue,new CustomThreadFactory("app-worker-"),handler
);
2. 关键配置参数
- corePoolSize:常驻核心线程数(根据业务类型调整)
- maximumPoolSize:最大线程数(建议不超过100)
- keepAliveTime:空闲线程存活时间(30-120秒)
- workQueue:必须使用有界队列(避免OOM)
- threadFactory:自定义线程工厂
- rejectedExecutionHandler:自定义拒绝策略
二、线程池关键组件实现
1. 自定义线程工厂(命名、异常处理)
public class CustomThreadFactory implements ThreadFactory {private final AtomicInteger counter = new AtomicInteger(0);private final String namePrefix;private final ThreadGroup group;public CustomThreadFactory(String namePrefix) {this.namePrefix = namePrefix;SecurityManager s = System.getSecurityManager();this.group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();}@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(group, r, namePrefix + counter.incrementAndGet(), 0);thread.setDaemon(false);thread.setPriority(Thread.NORM_PRIORITY);// 设置未捕获异常处理器thread.setUncaughtExceptionHandler((t, e) -> {logger.error("Uncaught exception in thread: " + t.getName(), e);// 发送告警通知AlertManager.notify(e);});return thread;}
}
2. 自定义拒绝策略(生产级)
public class CustomRejectionPolicy implements RejectedExecutionHandler {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {if (!executor.isShutdown()) {// 1. 记录被拒绝任务logger.warn("Task rejected: " + r.toString());// 2. 尝试重新放入队列(带超时)try {boolean offered = executor.getQueue().offer(r, 1, TimeUnit.SECONDS);if (!offered) {// 3. 持久化到存储系统persistTask(r);logger.info("Task persisted to storage: " + r);}} catch (InterruptedException e) {Thread.currentThread().interrupt();logger.error("Re-enqueue interrupted", e);}}}private void persistTask(Runnable task) {// 实现任务持久化逻辑(数据库、文件、消息队列)TaskStorage.save(task);}
}
三、任务提交与执行最佳实践
1. 任务封装(带监控)
public class MonitoredTask implements Runnable {private final Runnable actualTask;private final long submitTime;public MonitoredTask(Runnable task) {this.actualTask = task;this.submitTime = System.currentTimeMillis();}@Overridepublic void run() {long start = System.currentTimeMillis();try {// 设置MDC上下文(日志链路跟踪)MDC.put("traceId", UUID.randomUUID().toString());actualTask.run();long duration = System.currentTimeMillis() - start;Metrics.recordSuccess(duration);} catch (Exception e) {long duration = System.currentTimeMillis() - start;Metrics.recordFailure(duration);// 重试逻辑if (shouldRetry(e)) {retryTask();} else {logger.error("Task execution failed", e);}} finally {MDC.clear();}}// 提交任务时使用public static void submit(ExecutorService executor, Runnable task) {executor.execute(new MonitoredTask(task));}
}
2. 任务超时控制
Future<?> future = executor.submit(task);try {// 设置任务超时时间future.get(30, TimeUnit.SECONDS);
} catch (TimeoutException e) {// 1. 取消任务执行future.cancel(true);// 2. 记录超时日志logger.warn("Task timed out: " + task);// 3. 执行降级策略fallbackHandler.handle(task);
} catch (Exception e) {// 处理其他异常
}
四、线程池监控与管理
1. 监控指标采集
public class ThreadPoolMonitor implements Runnable {private final ThreadPoolExecutor executor;public ThreadPoolMonitor(ThreadPoolExecutor executor) {this.executor = executor;}@Overridepublic void run() {while (!Thread.currentThread().isInterrupted()) {try {// 采集关键指标int activeCount = executor.getActiveCount();long completedTaskCount = executor.getCompletedTaskCount();int queueSize = executor.getQueue().size();int poolSize = executor.getPoolSize();// 发布到监控系统Metrics.gauge("threadpool.active.count", activeCount);Metrics.gauge("threadpool.queue.size", queueSize);Metrics.counter("threadpool.completed.tasks", completedTaskCount);// 检测潜在问题if (queueSize > executor.getQueue().remainingCapacity() * 0.8) {logger.warn("Thread pool queue is approaching capacity");}// 30秒采集一次Thread.sleep(30_000);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}}
}
2. 动态调整线程池参数
public class DynamicThreadPool extends ThreadPoolExecutor {public DynamicThreadPool(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue) {super(corePoolSize, maxPoolSize, keepAliveTime, unit, workQueue);}// 动态修改核心线程数public void setCorePoolSize(int corePoolSize) {if (corePoolSize >= 0) {super.setCorePoolSize(corePoolSize);Metrics.gauge("threadpool.core.size", corePoolSize);}}// 动态修改最大线程数public void setMaxPoolSize(int maxPoolSize) {if (maxPoolSize > 0 && maxPoolSize >= getCorePoolSize()) {super.setMaximumPoolSize(maxPoolSize);Metrics.gauge("threadpool.max.size", maxPoolSize);}}// 动态修改队列容量(需要特殊处理)public void resizeQueue(int newCapacity) {BlockingQueue<Runnable> newQueue = new ArrayBlockingQueue<>(newCapacity);BlockingQueue<Runnable> oldQueue = getQueue();// 转移任务synchronized (this) {List<Runnable> transferList = new ArrayList<>();oldQueue.drainTo(transferList);newQueue.addAll(transferList);// 更新队列super.setRejectedExecutionHandler(getRejectedExecutionHandler());super.setQueue(newQueue);}}
}
五、优雅关闭与资源清理
1. 应用关闭时处理
@PreDestroy
public void shutdownExecutor() {// 1. 禁止新任务提交executor.shutdown();try {// 2. 等待现有任务完成if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {// 3. 取消所有未开始任务executor.shutdownNow();// 4. 再次等待任务响应取消if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {logger.error("Thread pool did not terminate");}}} catch (InterruptedException e) {// 5. 中断当前线程并尝试取消executor.shutdownNow();Thread.currentThread().interrupt();}// 6. 清理资源cleanupResources();
}
2. 未完成任务恢复
public void recoverPendingTasks() {BlockingQueue<Runnable> pendingQueue = executor.getQueue();List<Runnable> pendingTasks = new ArrayList<>();pendingQueue.drainTo(pendingTasks);for (Runnable task : pendingTasks) {if (task instanceof RecoverableTask) {// 持久化到可靠存储TaskStorage.save((RecoverableTask) task);logger.info("Recovered pending task: " + task);}}
}
六、生产环境建议
-
线程隔离策略:
- CPU密集型任务:独立线程池
- I/O密集型任务:独立线程池
- 关键业务:独立线程池(避免相互影响)
-
资源限制:
// 使用Semaphore控制并发资源使用 private final Semaphore concurrencySemaphore = new Semaphore(50);executor.execute(() -> {try {concurrencySemaphore.acquire();// 执行受限资源操作} finally {concurrencySemaphore.release();} });
-
上下文传递:
// 使用TransmittableThreadLocal传递上下文 TransmittableThreadLocal<String> context = new TransmittableThreadLocal<>();executor.execute(TtlRunnable.get(() -> {// 可以访问父线程的context值String value = context.get(); }));
-
熔断降级:
CircuitBreaker circuitBreaker = new CircuitBreaker();executor.execute(() -> {if (circuitBreaker.allowExecution()) {try {// 执行业务逻辑} catch (Exception e) {circuitBreaker.recordFailure();}} else {// 执行降级逻辑fallbackService.executeFallback();} });
七、常见问题处理方案
问题 | 现象 | 解决方案 |
---|---|---|
线程泄露 | 线程数持续增长 | 1. 检查线程是否正常结束 2. 添加线程创建监控 3. 限制最大线程数 |
任务堆积 | 队列持续增长 | 1. 增加消费者线程 2. 优化任务处理速度 3. 实施任务降级 |
CPU使用率高 | CPU持续满载 | 1. 分析线程栈(jstack) 2. 优化热点代码 3. 限制线程池大小 |
任务饿死 | 低优先级任务长期得不到执行 | 1. 使用优先级队列 2. 拆分不同优先级线程池 3. 实现公平调度 |
上下文丢失 | 子线程无法获取上下文 | 1. 使用TransmittableThreadLocal 2. 手动传递上下文 3. 使用MDC框架 |
生产环境中使用线程池需要综合考虑资源配置、任务管理、错误处理和监控告警等多个方面。建议结合具体业务场景选择合适的策略,并建立完善的监控和告警机制。