找回密码
 立即注册
首页 业界区 业界 【Java】线程池源码解析

【Java】线程池源码解析

糙昧邵 2025-11-27 01:40:00
点进ThreadPoolExectutor源码ThreadPoolExecutor.java可以看到线程池的核心实现;
ThreadPoolExecutor

首先是ThreadPoolExecutor()里面可以看到线程池核心参数
  1. public ThreadPoolExecutor(int corePoolSize,
  2.                           int maximumPoolSize,
  3.                           long keepAliveTime,
  4.                           TimeUnit unit,
  5.                           BlockingQueue<Runnable> workQueue,
  6.                           ThreadFactory threadFactory,
  7.                           RejectedExecutionHandler handler) {
  8.     if (corePoolSize < 0 ||
  9.         maximumPoolSize <= 0 ||
  10.         maximumPoolSize < corePoolSize ||
  11.         keepAliveTime < 0)
  12.         throw new IllegalArgumentException();
  13.     if (workQueue == null || threadFactory == null || handler == null)
  14.         throw new NullPointerException();
  15.     this.acc = System.getSecurityManager() == null ?
  16.             null :
  17.             AccessController.getContext();
  18.     this.corePoolSize = corePoolSize;
  19.     this.maximumPoolSize = maximumPoolSize;
  20.     this.workQueue = workQueue;
  21.     this.keepAliveTime = unit.toNanos(keepAliveTime);
  22.     this.threadFactory = threadFactory;
  23.     this.handler = handler;
  24. }
复制代码
wc记录的是工作线程数,timed 标记的是“当前这个线程是否受超时限制”
再看下里面try内逻辑:

  • 如果 timed == true(通常是因为 wc > corePoolSize),说明当前线程属于“多出来的非核心线程”。
  • 那么它去队列拿任务时,用 poll(keepAliveTime)。
  • 关键点:如果在 keepAliveTime 时间内没拿到任务(返回 null),下一次循环时 timedOut 就会变为 true,进而导致返回 null,最终导致这个 Worker 退出循环被销毁。
总结:是先“等待超时拿到 null”,然后才“被回收”。
execute()

再看下execute()方法
  1. private final class Worker
  2.     extends AbstractQueuedSynchronizer
  3.     implements Runnable
  4. {
  5.     // 省略部分方法和变量
  6.     /** Thread this worker is running in.  Null if factory fails. */
  7.     final Thread thread;
  8.     /** Initial task to run.  Possibly null. */
  9.     Runnable firstTask;
  10.     Worker(Runnable firstTask) {
  11.         setState(-1); // inhibit interrupts until runWorker
  12.         this.firstTask = firstTask;
  13.         this.thread = getThreadFactory().newThread(this);
  14.     }
  15.     /** Delegates main run loop to outer runWorker  */
  16.     public void run() {
  17.         runWorker(this);
  18.     }  
  19. }
复制代码
无论是看注释还是看if-else那块代码都能理解:
如果任务数小于核心线程数,那就创建核心线程
如果线程池正在运行,尝试将任务加入队列(workQueue.offer(command))
成功后需要二次检查:

  • 如果线程池已关闭,移除任务并拒绝
  • 如果没有线程了,创建新线程
你可能会疑惑,为什么任务已经入队了,还要判断 workerCount == 0 并可能创建一个空任务线程?

  • 原因:假设核心线程数(Core)设为 0,或者在任务入队的瞬间,现有的线程刚好都挂了(抛异常)或者都超时销毁了。
  • 后果:如果这里不检查,任务孤零零地躺在队列里,永远没人去取它,导致“死锁”般的假死状态。
  • 作用:兜底策略,确保只要队列里有任务,就至少有一个线程活着去处理它。
如果队列满了

  • 尝试创建非核心线程(addWorker(command, false))
  • 如果失败(达到最大线程数),拒绝任务
再关注一下另外两个的重要的方法;
prestartCoreThread()
  1. Worker(Runnable firstTask) {
  2.     setState(-1); // inhibit interrupts until runWorker
  3.     this.firstTask = firstTask;
  4.     this.thread = getThreadFactory().newThread(this);
  5. }
复制代码
prestartAllCoreThreads()
  1. final void runWorker(Worker w) {
  2.     Thread wt = Thread.currentThread();
  3.     Runnable task = w.firstTask;
  4.     w.firstTask = null;
  5.     w.unlock(); // allow interrupts
  6.     boolean completedAbruptly = true;
  7.     try {
  8.         while (task != null || (task = getTask()) != null) {
  9.             w.lock();
  10.             // If pool is stopping, ensure thread is interrupted;
  11.             // if not, ensure thread is not interrupted.  This
  12.             // requires a recheck in second case to deal with
  13.             // shutdownNow race while clearing interrupt
  14.             if ((runStateAtLeast(ctl.get(), STOP) ||
  15.                  (Thread.interrupted() &&
  16.                   runStateAtLeast(ctl.get(), STOP))) &&
  17.                 !wt.isInterrupted())
  18.                 wt.interrupt();
  19.             try {
  20.                 beforeExecute(wt, task);
  21.                 Throwable thrown = null;
  22.                 try {
  23.                     task.run();
  24.                 } catch (RuntimeException x) {
  25.                     thrown = x; throw x;
  26.                 } catch (Error x) {
  27.                     thrown = x; throw x;
  28.                 } catch (Throwable x) {
  29.                     thrown = x; throw new Error(x);
  30.                 } finally {
  31.                     afterExecute(task, thrown);
  32.                 }
  33.             } finally {
  34.                 task = null;
  35.                 w.completedTasks++;
  36.                 w.unlock();
  37.             }
  38.         }
  39.         completedAbruptly = false;
  40.     } finally {
  41.         processWorkerExit(w, completedAbruptly);
  42.     }
  43. }
复制代码

  • 核心机制:private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

首先注意到这两个方法内部都调用了:
Java
  1. private Runnable getTask() {
  2.     boolean timedOut = false; // Did the last poll() time out?
  3.     for (;;) {
  4.         int c = ctl.get();
  5.         int rs = runStateOf(c);
  6.         // Check if queue empty only if necessary.
  7.         if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
  8.             decrementWorkerCount();
  9.             return null;
  10.         }
  11.         int wc = workerCountOf(c);
  12.         // Are workers subject to culling?
  13.         boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
  14.         if ((wc > maximumPoolSize || (timed && timedOut))
  15.             && (wc > 1 || workQueue.isEmpty())) {
  16.             if (compareAndDecrementWorkerCount(c))
  17.                 return null;
  18.             continue;
  19.         }
  20.         try {
  21.             Runnable r = timed ?
  22.                 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
  23.                 workQueue.take();
  24.             if (r != null)
  25.                 return r;
  26.             timedOut = true;
  27.         } catch (InterruptedException retry) {
  28.             timedOut = false;
  29.         }
  30.     }
  31. }
复制代码

  • null: 这里的 firstTask 是空。

    • 回忆一下之前看的 runWorker 源码:如果 firstTask 为空,线程启动后就不会立即执行任务,而是直接进入 while 循环调用 getTask()。
    • getTask() 会调用 workQueue.take(),导致该线程在队列上阻塞等待。

  • true: 表示创建的是核心线程。
结论:private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
} 的作用就是——招一个工人,让他没事干先去仓库门口等着,随时准备干活。
2. prestartCoreThread() —— 启动一个

Java
  1. public boolean prestartCoreThread() {        // 1. 检查当前线程数是否小于核心数         // 2. 尝试创建一个空任务的核心线程         return workerCountOf(ctl.get()) < corePoolSize && private Runnable getTask() {
  2.     boolean timedOut = false; // Did the last poll() time out?
  3.     for (;;) {
  4.         int c = ctl.get();
  5.         int rs = runStateOf(c);
  6.         // Check if queue empty only if necessary.
  7.         if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
  8.             decrementWorkerCount();
  9.             return null;
  10.         }
  11.         int wc = workerCountOf(c);
  12.         // Are workers subject to culling?
  13.         boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
  14.         if ((wc > maximumPoolSize || (timed && timedOut))
  15.             && (wc > 1 || workQueue.isEmpty())) {
  16.             if (compareAndDecrementWorkerCount(c))
  17.                 return null;
  18.             continue;
  19.         }
  20.         try {
  21.             Runnable r = timed ?
  22.                 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
  23.                 workQueue.take();
  24.             if (r != null)
  25.                 return r;
  26.             timedOut = true;
  27.         } catch (InterruptedException retry) {
  28.             timedOut = false;
  29.         }
  30.     }
  31. }; }
复制代码

  • 理解:如果核心线程还没满,就提前启动一个核心线程。
  • 返回值:如果成功启动了一个线程,返回 true;如果核心线程早已满了,返回 false。
3. prestartAllCoreThreads() —— 全部启动

Java
  1. public int prestartAllCoreThreads() {        int n = 0;        // 只要 addWorker 返回 true(说明还没满),就一直循环创建        while (private Runnable getTask() {
  2.     boolean timedOut = false; // Did the last poll() time out?
  3.     for (;;) {
  4.         int c = ctl.get();
  5.         int rs = runStateOf(c);
  6.         // Check if queue empty only if necessary.
  7.         if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
  8.             decrementWorkerCount();
  9.             return null;
  10.         }
  11.         int wc = workerCountOf(c);
  12.         // Are workers subject to culling?
  13.         boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
  14.         if ((wc > maximumPoolSize || (timed && timedOut))
  15.             && (wc > 1 || workQueue.isEmpty())) {
  16.             if (compareAndDecrementWorkerCount(c))
  17.                 return null;
  18.             continue;
  19.         }
  20.         try {
  21.             Runnable r = timed ?
  22.                 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
  23.                 workQueue.take();
  24.             if (r != null)
  25.                 return r;
  26.             timedOut = true;
  27.         } catch (InterruptedException retry) {
  28.             timedOut = false;
  29.         }
  30.     }
  31. })            ++n;        return n;}
复制代码

  • 理解:不管现在有多少线程,只要没达到 corePoolSize,就一口气把剩下的坑位全填满,让所有核心线程全部就位待命。
  • 返回值:返回这次一共新启动了多少个线程。
典型应用场景:


  • 高并发系统的启动时刻:比如“双11”零点,流量瞬间铺天盖地。如果这时候再去创建线程,线程创建的开销可能会导致系统卡顿(Jitter)。使用 prestartAllCoreThreads() 可以在流量到达前先把线程池填满。
  • 低延迟敏感系统:为了避免请求第一次处理时的抖动(Latency Spike),提前预热。
总结


  • 入口:ThreadPoolExecutor 构造参数(配置)。
  • 调度:execute()(决策:是建线程还是入队)。
  • 载体:Worker(封装了 Thread 和 Runnable)。
  • 引擎:runWorker()(死循环:取任务 -> 执行 -> 统计)。
  • 油箱:getTask()(阻塞队列取货,决定线程生死)。
程池的核心本质:
线程池__不仅仅是 new Thread() 的集合,它更是一个 “生产者-消费者” 模型。

  • 生产者:__execute() 方法,负责把任务“生产”出来并推送到队列或直接交给工人。
  • 消费者:__Worker 线程,负责从队列这个“缓冲区”里不断 getTask() 并消费。
  • 管理者:__ThreadPoolExecutor 持有 ctl 状态,动态控制工人的数量(招人/裁员)。
1、如果核心线程数设置为0会发生啥?

corePoolSize = 0 _时,线程池__就像一个“全兼职”_的机构。

  • 任务来了先尝试进队列。
  • 如果进了队列,必须兜底检查是否有线程活着,如果没有,通过 addWorker(null, false) 创建一个临时工(非核心线程)来处理队列里的活。注意这里必须用 false__,因为核心编制(Core)是0,只能招临时工(Max)。
  • 这个临时工线程在 keepAliveTime _超时后会被销毁,_线程池__最终会变回空状态。

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册