在传统的 JDK 线程池中,开发者往往面临核心线程数(CoreSize)难以确定的困境:
- 核心线程过少:无法充分利用多核性能,IO 等待时吞吐量急剧下降。
- 核心线程过多:上下文切换开销巨大,甚至可能导致系统响应变慢。
RXlib 的 ThreadPool 采用基于负载的自适应动态调整策略:
核心公式:
最佳线程数 = CPU 线程数 * (1 + CPU 等待时间 / CPU 执行时间)
- 负载监控:实时监控 CPU 使用率与任务队列深度。
- 动态调优:
- 当队列已满且 CPU 使用率 < 40% 时,自动分批增加
maxThreads以提升并发处理能力。 - 当任务积压减少或 CPU 使用率 > 60% 时,自动收缩线程数以减少系统负荷,防止过度竞争。
- 当队列已满且 CPU 使用率 < 40% 时,自动分批增加
- 阻塞反馈:当达到最大线程数且队列依然撑爆时,会产生背压(Back-pressure)。默认
BLOCK模式保持旧行为,提交线程会等待队列释放 slot;也可以切换为超时拒绝或 caller-runs。
rxlib 的线程池通过扩展 RunFlag 枚举,提供了比原生 JDK 更精细的任务控制流。
| 运行标志 | 功能描述 | 适用场景 |
|---|---|---|
SINGLE |
唯一执行:基于 taskId 检查。若已有相同 ID 的任务在运行,则当前任务直接跳过。 |
重复触发的刷新操作、互斥逻辑。 |
SERIAL |
串行分发:基于 taskId 进行串行队列化。采用无锁 CompletableFuture 生成任务链,不会阻塞物理线程。 |
需要严格顺序处理的会话消息、日志记录。 |
TRANSFER |
移交执行:阻塞提交线程,直到任务被工作线程接手或成功存入队列。 | 关键任务流控,防止生产速度失控。 |
PRIORITY |
优先执行:若当前无空闲线程且队列已满,强制新建一个临时线程处理。 | 紧急状态上报、高优监控。 |
INHERIT_FAST_THREAD_LOCALS |
环境继承:任务执行时复制父线程 FastThreadLocal 环境,结束后恢复旧 map。 |
链路参数透传、用户权限上下文传递。 |
THREAD_TRACE |
链路追踪:开启异步 Trace,关联后续的所有异步调用流。 | 复杂异步系统全链路排障。 |
ThreadPool pool = Tasks.nextPool();
AtomicInteger counter = new AtomicInteger();
// 1. SINGLE 模式:确保同一时间只有一个执行,避免冗余
pool.run(() -> {
log.info("Do unique task...");
sleep(1000);
}, "task-unique-id", RunFlag.SINGLE.flags());
// 2. SERIAL 模式:非阻塞串行队列,任务按序执行
for (int i = 0; i < 5; i++) {
int seq = i;
pool.runAsync(() -> {
log.info("Batch seq: {}", seq);
sleep(500);
return null;
}, "serial-id", RunFlag.SERIAL.flags());
}ThreadPool.ThreadQueue 使用 LinkedTransferQueue + Semaphore 控制容量。默认配置保留历史阻塞语义:
app:
threadPool:
queueOfferMode: BLOCK
queueOfferTimeoutMillis: 0可选模式:
| 模式 | 行为 | 建议场景 |
|---|---|---|
BLOCK |
队列满时一直等待 slot,形成强背压。 | 后台批处理、允许提交线程阻塞的场景。 |
TIMEOUT_REJECT |
队列满后等待 queueOfferTimeoutMillis,超时抛 RejectedExecutionException。 |
Netty EventLoop、低延迟入口、不可无限阻塞的链路。 |
CALLER_RUNS |
队列满且等待超时后由提交线程执行溢出任务。 | 希望快速消化突发但能接受提交线程承担执行成本的场景。 |
高性能网络链路中,避免在 Netty EventLoop 上使用可能无限阻塞的提交路径;如果必须从 I/O 线程提交任务,优先配置 TIMEOUT_REJECT 并在上层做限流、降级或丢弃策略。
CALLER_RUNS 会把执行成本转移到提交线程;当前实现会走与 worker 线程一致的 beforeExecute/afterExecute 生命周期,覆盖 SINGLE、THREAD_TRACE、INHERIT_FAST_THREAD_LOCALS 和 taskMap 清理。INHERIT_FAST_THREAD_LOCALS 使用复制后的 InternalThreadLocalMap.indexedVariables 执行,任务内修改 FastThreadLocal 不会污染提交线程后续上下文。
SERIAL 按 taskId 使用 CompletableFuture 链串行,不占用工作线程等待。每个 taskId 的链长度受以下配置限制:
app:
threadPool:
serialQueueCapacity: 4096
serialQueueHardLimit: 100000超过容量会快速抛 RejectedExecutionException,避免单个热 key 积压大量 CompletableFuture 对象。serialQueueHardLimit 是最终保护上限,serialQueueCapacity 不会超过该值。
单个 SERIAL 任务异常只影响自己的 Future,不会阻断后续同 taskId 任务;链尾完成后会清理 taskSerialMap / taskSerialCountMap。
Tasks.executor() 是全局共享入口,Tasks.shutdown() / Tasks.shutdownNow() 以及 Tasks.executor().shutdown() / shutdownNow() 均为 no-op 包装语义,不承担释放底层共享线程池、timer 或 watchman 的职责。
独立创建的 new ThreadPool(...) / ThreadPool.fixed(...) 自己管理生命周期:shutdown() 和 shutdownNow() 会先从 CpuWatchman 注销,再进入 JDK 线程池关闭流程;关闭后再次 execute() / submit() / run() / runAsync() 会抛 RejectedExecutionException,调用方不能依赖 warn + ignore 语义。
全局入口和独立实例的生命周期必须区分:Tasks.executor() 只是共享线程池包装器,关闭方法不会释放共享资源;standalone ThreadPool 是资源拥有者,关闭后即进入拒绝新任务状态。
ThreadQueue 的 drainTo() / clear() / shutdownNow() 路径会按实际移除数量释放 slot,并清理未执行任务的 taskMap 映射,避免 counter 与 availableSlots 不一致。
List<Func<Integer>> tasks = Arrays.asList(
() -> { sleep(500); return 1; },
() -> { sleep(200); return 2; }
);
// 异步等待所有结束
ThreadPool.MultiTaskFuture<Void, Integer> mf = pool.runAllAsync(tasks);
mf.getFuture().join(); // 阻塞至全链路结束RXlib 线程池不仅支持上下文传递,还深度集成了异步 Trace 功能,支持跨越 Executor、WheelTimer 的链路追踪。
无 executor 参数的 CompletableFuture.xxAsync() 默认会由 RXlib 尝试 patch 到 Tasks.executor(),以继承并清理 traceId。该能力会修改 JDK 全局静态字段,如需禁用可配置 app.threadPool.patchCompletableFutureAsyncPool=false;新代码仍建议显式传入 Tasks.executor() 或使用 ThreadPool.runAsync()。
Spring 管理的 ThreadPoolTaskExecutor 会自动套用 trace 任务装饰器;确需使用非 RXlib、非 Spring 管理的原生线程池时,可用 ThreadPool.Task 做一次轻量包装:
ExecutorService executor = Executors.newFixedThreadPool(4);
executor.execute((Runnable) ThreadPool.Task.adapt((Runnable) () -> doWork()));
Future<String> future = executor.submit((Callable<String>) ThreadPool.Task.adapt((Callable<String>) () -> query()));// 初始化 Trace 配置
RxConfig.INSTANCE.getThreadPool().setTraceName("rx-traceId");
ThreadPool.traceIdGenerator = () -> UUID.randomUUID().toString().replace("-", "");
// 开启追踪
ThreadPool.startTrace(null);
pool.runAsync(() -> {
log.info("Step 1 (Main Process)");
pool.runAsync(() -> {
log.info("Step 2 (Self-contained callback)");
});
});
ThreadPool.endTrace();相比 JDK 默认固定大小的线程模型,RXlib 的定时线程池支持自适应 CoreSize 调整。当定时任务中存在大量阻塞 IO 时,它能伸缩其线程规模,保证其他定时任务不会因前面的阻塞而延期执行。
RXlib 对 Netty 的 HashedWheelTimer 进行了封装,核心点在于:只做调度,不做执行。
- 调度效率:时间轮算法在具有大量定时器时通过单线程调度极其高效。
- 并发执行:所有触发的任务都会立即异步移交给
ThreadPool执行,避免了传统时间轮“一处任务阻塞,整体调度停滞”的顽疾。 - 关闭语义:
WheelTimer.shutdown()会拒绝新任务,并取消未执行 timeout task、带 taskId 的 holder task 以及 periodic task,同时停止底层 NettyHashedWheelTimer;awaitTermination()在底层 timer 已停止且 holder、active timeout、periodic 集合清空后返回 true。更多 shutdown 语义及与外部 executor running task 的边界见 WheelTimerShutdown.md。
WheelTimer timer = Tasks.timer();
// 设置一个重复执行的任务
timer.setTimeout(() -> {
log.info("Heartbeat...");
asyncContinue(true); // 返回 true 表示继续下次循环
}, 1000, "heartbeat-task", TimeoutFlag.PERIOD.flags());DiagnosticMetrics 开启后,线程池会输出核心指标:
| 指标 | 含义 |
|---|---|
rx.thread_pool.core.count |
当前 corePoolSize。 |
rx.thread_pool.size.count |
当前线程池线程数。 |
rx.thread_pool.active.count |
正在执行任务的线程数。 |
rx.thread_pool.queue.count |
当前队列长度。 |
rx.thread_pool.queue.capacity |
队列容量。 |
rx.thread_pool.queue.remaining |
队列剩余 slot。 |
rx.thread_pool.completed.count |
已完成任务数。 |
rx.thread_pool.task.rejected.count |
线程池拒绝任务数。 |
rx.thread_pool.queue.offer.block.count |
提交线程遇到满队列并等待的次数。 |
rx.thread_pool.queue.offer.block.millis |
提交线程累计等待耗时。 |
rx.thread_pool.queue.offer.block.max.millis |
单次最大等待耗时。 |
rx.thread_pool.queue.offer.rejected.count |
队列 offer 超时拒绝次数。 |
rx.thread_pool.queue.offer.caller_runs.count |
caller-runs 溢出执行次数。 |
rx.thread_pool.serial.chain.count |
当前串行 taskId 链数量。 |
rx.thread_pool.serial.rejected.count |
SERIAL 容量拒绝次数。 |
rx.thread_pool.single.skip.count |
SINGLE 重复任务跳过次数。 |
rx.thread_pool.config.invalid.count |
线程池配置解析失败次数,当前用于 queueOfferMode 未知值。 |
rx.thread_pool.cpu_load.invalid.count |
CPU load 采样为 NaN 或负数时跳过 resize 的次数。 |
rx.thread_pool.resize.cooldown.skipped.count |
resize 因 cooldown 被跳过的次数。 |
rx.wheel_timer.active.count |
当前 active timeout task 数量。 |
rx.wheel_timer.holder.count |
当前带 taskId 的 holder 数量。 |
rx.wheel_timer.periodic.count |
当前 periodic task 数量。 |
rx.wheel_timer.pending.count |
Netty 时间轮 pending timeout 数量。 |
网络项目还必须同时关注 JVM 堆外内存和 Netty allocator 指标,尤其是 direct memory 使用量、连接数、吞吐、P99/P999 延迟、写队列水位与拒绝/降级次数。
rxlib 提供了一个高性能、自适应的通用对象池实现 (ObjectPool<T>),旨在解决高并发场景下频繁创建/销毁对象带来的 GC 压力与锁竞争。
- L1 级线程本地缓存:
利用
FastThreadLocal实现 L1 级缓存。当同一线程频繁borrow/recycle时,优先从本线程私有的缓存位获取,完全无锁且避免了对全局队列的竞争。 - 自适应预填充 (Adaptive Refill):
池化组件会定期采样借用频率。当负载升高时,它会根据
demandFactor自动计算并预创建对象(暖机);当负载下降时,自动回收空闲对象。 - 无锁状态管理:
对象的
IDLE、BORROWED、RETIRED状态切换全部通过 CAS (Compare-And-Swap) 实现,最大程度减少上下文切换。 - 健康检测机制:
- 校验(Validation):借出前、归还后均可进行活性检查。
- 泄漏检测(Leak Detection):定时扫描借出超时的对象并记录堆栈,帮助定位未归还对象的代码块。
| 特性 | 实现方式 | 优势 |
|---|---|---|
| 高并发性能 | FastThreadLocal L1 + 无锁 CAS | 借还操作延迟极低,热点数据访问基本无竞争。 |
| 负载适应度 | 滑动窗口采样 + 动态预填充 | 流量洪峰来临时减少现场创建对象的开销。 |
| 内存安全性 | 自动状态回收 + 泄漏堆栈追踪 | 配合 TraceHandler 能够快速发现资源泄漏。 |
// 初始化对象池:最小 2,最大 20
ObjectPool<MyClient> pool = new ObjectPool<>(2, 20,
() -> new MyClient(), // createHandler
client -> client.isAlive() // validateHandler
);
// 借用对象
MyClient client = pool.borrow();
try {
client.execute();
} finally {
// 归还对象
pool.recycle(client);
}- 基于 Netty
FastThreadLocal:显著优于 JDK 原生ThreadLocal的访问性能。 - 自研
IntWaterMark负载算法:精准控制核心线程规模。 - 无锁化串行逻辑:ThreadPool 采用
CompletableFuture无锁链,ObjectPool 采用 CAS 状态机,共同构建了高性能的并发基石。 - 背压一致性设计:通过任务队列和借用超时反馈,天然支持流量整形。