目录
1 线程池引入
假设一个服务器完成一项任务所需时间为:T1 创建线程时间,T2 在线程中执行任务的时间,T3 销毁线程时间。如果:T1 + T3 远大于 T2,则可以采用线程池,以提高服务器性能。 所以,线程池技术正是关注如何缩短或调整T1,T3时间的技术,从而提高服务器程序性能的。它把T1,T3分别安排在服务器程序的启动和结束的时间段或者一些空闲的时间段,这样在服务器程序处理客户请求时,不会有T1,T3的开销了。
第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
第二:提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
第三:提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控
2 线程池ThreadPooleExecutor介绍
2.1 组成
一个线程池包括以下四个基本组成部分:
- 线程池管理器(ThreadPoolExcutor):(1)用于创建并管理线程池:包括 创建线程池,销毁线程池;(2)管理worker,线程池自己创建了一批Worker,将线程池创建的一个线程封装成一个worker;(3)管理阻塞对列BlockingQueue;
- 工作线程(Worker):线程池中线程,被封装成一个worker,循环从阻塞队列中获取任务,然后执行任务;在线程池中通过一个HashSet来存储线程:
1 |
private final HashSet<Worker> workers = new HashSet<Worker>(); |
- 阻塞队列(BlockingQueue):用于存放没有处理的任务task。worker线程从这个阻塞队列获取任务task。
- ThreadFactory,生成线程池的线程
- RejectedExecutionHandler,处理没有执行的任务Runable
2.2 线程池状态
包含五种状态:
(1)RUNNING: Accept new tasks and process queued tasks;
- 接受新任务;
- 并处理阻塞对列中任务
(2)SHUTDOWN: 通过shutDown()操作来修改线程池的状态为此状态
- 不接受新任务;
- 终止闲置的worker线程;但是不中断正在执行任务的worker线程,所以阻塞对列中任务仍然可以被处理
(3)STOP: 通过shutDownNow()操作来修改线程池的状态为此状态
- 不接受任务;
- 终止所有worker线程(包括闲置和不闲置)
- 清空阻塞队列,将队列中任务list作为shutDownNow的函数
(4)TIDYING: 满足如下两个条件任意一个就可以将如下两种状态转变成TIDYING
- SHUTDOWN状态 && 阻塞对列为0 && worker线程都应该被中断;
- STOP状态
(5)TERMINATED: 在tryTerminate函数中可以查看,只要设置TIDYING成立,那么此时就执行terminate()操作,然后设置状态为TERMINAMTED
3 线程池初始化
我们可以通过ThreadPoolExecutor来创建一个线程池。
(1) 方式1 不需要定义ThreadFactory
1 2 3 4 5 6 7 8 |
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } |
(2)方式2 不需要定义RejectedExecutionHandler
1 2 3 4 5 6 7 8 9 |
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); } |
(3) 方式3 不需要定义ThreadFactory
1 2 3 4 5 6 7 8 9 |
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); } |
(4) 六个参数都需要
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; } |
创建一个线程池需要输入六个参数:
1. corePoolSize(线程池的基本大小)
(1)当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。
(2)如果调用了线程池的prestartAllCoreThreads方法,线程池会提前创建并启动所有基本线程。
2. BlockingQueue<Runnable>阻塞对列
用于保存等待执行的任务。 可以选择以下几个阻塞队列。
- ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。
- LinkedBlockingQueue:一个基于链表结构的阻塞队列【无边界队列】,此队列按FIFO (先进先出) 排序元素。
1 |
因为只有对列满时,才可能创建比核心线程数多的线程,而这个队列是无边界的。,所以线程池使用这种对列时,线程池最大个数就是核心线程数 |
- SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,静态工厂方法Executors.newCachedThreadPool使用了这个队列。
- PriorityBlockingQueue:一个具有优先级的无限阻塞队列。
3. keepAliveTime(线程活动保持时间):线程池的工作线程空闲后,保持存活的时间。所以如果任务很多,并且每个任务执行的时间比较短,可以调大这个时间,提高线程的利用率。
4. maximumPoolSize(线程池最大大小)
线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是如果使用了无界的任务队列这个参数就没什么效果。
5. ThreadFactory 用于设置创建线程的工厂
可以通过线程工厂给每个创建出来的线程设置更有意义的名字。
6. RejectedExecutionHandler(饱和策略)
如在添加任务时,发现阻塞队列已经满了,而且worers线程数目达到最大值,此时就会执行如下策略:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
public void execute(Runnable command) { ...... if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); ... } |
这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。以下是JDK提供的五种策略:
- AbortPolicy:直接抛出异常。默认的对列
- DiscardPolicy:函数为空:表示不处理,丢弃掉这个任务。
- DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行execute来执行当前任务。
- CallerRunsPolicy:只用调用者所在线程来运行任务,即此时如果每一个任务都通过这个方式来执行,那么此时就和之前的单线程执行所有的任务没有什么区别。
- 自定义。根据应用场景需要来实现RejectedExecutionHandler接口自定义策略。如记录日志或持久化不能处理的任务。
4 执行一个任务
1. 流程分析
从上图我们可以看出,当提交一个新任务到线程池时,线程池的处理流程如下:
首先,线程池判断基本线程池是否已满?没满,创建一个工作线程来执行任务。满了,则进入下个流程。
其次,线程池判断工作队列是否已满?没满,则将新提交的任务存储在工作队列里。满了,则进入下个流程。
最后,线程池判断整个线程池是否已满?没满,则创建一个新的工作线程来执行任务,满了,则交给饱和策略来处理这个任务。
2 代码分析
上面的流程分析让我们很直观的了解了线程池的工作原理,让我们再通过源代码来看看是如何实现的。线程池执行任务的方法如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { // 1.如果线程数小于基本线程数,则创建线程并执行当前任务 if (addWorker(command, true)) return; c = ctl.get(); } //2.如线程数大于等于基本线程数或线程创建失败,则将当前任务放到工作队列中。 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } //3.如果线程池不处于运行中或任务无法放入队列,并且当前线程数量小于最大允许的线程数量,则创建一个线程执行任务。 else if (!addWorker(command, false)) reject(command); } |
5 关闭线程池两种方式
我们可以通过调用线程池的shutdown或shutdownNow方法来关闭线程池,它们的原理是遍历线程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程,所以无法响应中断的任务可能永远无法终止。但是它们存在一定的区别
- shutdownNow (1)将线程池的状态设置成STOP;(2)中断所有的正在执行或等待执行任务的woker线程;(3)清空所有的任务队列中的任务。
- shutdown (1)将线程池的状态设置成SHUTDOWN状态;(2)中断所有等待执行任务的woker线程,但不中断正在执行任务的worker线程。
1. shutdown
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); // 1.设置线程池的状态为SHUTDOWN advanceRunState(SHUTDOWN); // 2.中断IDLE类型的线程,不终止正在运行线程 interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); } |
2、shutdownNow
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); // 1.设置线程池状态为STOP advanceRunState(STOP); // 2. 终止所有线程(等待执行任务和执行任务的线程) interruptWorkers(); // 3.清空任务队列中任务 tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; } |
6 worker
6.1 worker新增和运行时间点
1、新增
在执行任务时,才会新增worker;在初始化线程池时,不进行新增worker。
2、运行
只要新增worker,就启动次线程。
6.2 如何保证线程池一直运行
每一次创建一个worker时,就会启动这个worker线程,这个线程会不断从阻塞对列中获取任务runtable,然后执行runtalbe.run()。在新增一个worker时,执行addWorker()函数,在这个函数中执行t.start()方法,创建一个worker线程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
private boolean addWorker(Runnable firstTask, boolean core) { ............... w = new Worker(firstTask); final Thread t = w.thread; ................ if (workerAdded) { // 执行了一个线程,这个线程执行这个worker中对应的run方法 t.start(); workerStarted = true; } ............... ............... } |
6.3 如何keepAliveTime来终止线程
keepAliveTime作用,参考源码中注释:
If the pool currently has more than corePoolSize threads,excess threads will be terminated if they have been idle for more than the keepAliveTime
对于一个worker线程,执行runWorker() 操作,在runWorker时会循环从阻塞对列中获取一个任务Runable,如果此时获取为null就执行processWorkerExit,runWorker代码如下:
1 2 3 4 5 6 |
// 这个方法就是不断的扫描阻塞对列中任务Runable,然后执行Runable.run() final void runWorker(Worker w) { // 1.循环从阻塞队列获取任务 // 2.当阻塞对列中没有了任务Runalbe,此时执行processWorkerExit() } |
1、 查找超时的IDLE THREAD
获取任务Runable代码如下,keepaliveTime作用就体现这个函数中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } boolean timed; // Are workers subject to culling? for (;;) { int wc = workerCountOf(c); // 1.设置是否需要进行超时处理 timed = allowCoreThreadTimeOut || wc > corePoolSize; if (wc <= maximumPoolSize && ! (timedOut && timed)) break; if (compareAndDecrementWorkerCount(c)) return null; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } try { Runnable r = timed ? // 2.如果需要设置超时处理,测此时就要采用这个方法来获取任务,这里正是keepAliveTime的使用地方,如果在这个时间内没有获取任务,就说明在keepAliveTime内没有获取任务 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : // 3.一直阻塞 workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } } |
说明1. 比较allowCoreThreadTimeOut 和 wc > corePoolSize。
getTask() 中函数中代码如下:
1 |
timed = allowCoreThreadTimeOut || wc > corePoolSize; |
- wc > corePoolSize
在默认情况下,这个allowCoreThreadTimeOut为false,我们只是用”wc > corePoolSize”来判断是否可以根据keepAliveTime来终止线程。所以 使用这个条件时只处理超过核心线程数的线程。
- allowThreadTimeOut
如果这个字段设置为true(可以通过setter方法来设置),线程池来就会一直根据keepAliveTime来终止线程。所以这个字段作用为:强制线程池来执行根据keepAliveTime来终止worker,不再需要在wc>corePoolSize来判断。在使用这个字段时,不只是超过核心线程数的线程会被处理,线程个数小于核心线程数时,也会被处理。
2 处理超时的IDLE THREAD
当我们找到这个超过keepaliveTime的Idle 线程之后,如何处理呢?可以查看processWorkerExit函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 获取线程池执行任务总量 completedTaskCount += w.completedTasks; // 2.从线程池中删除这个worker workers.remove(w); } finally { mainLock.unlock(); } tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); } } |
6.3 addWorker(null,false)解释
当Worker中firstTask为null时,如下,此时是指没有指定task,只是空起一个线程,然后这个worker线程去阻塞队列中获取任务。
1 |
addWorker(null, true) |
当存在一个任务command时,如下,此时就是此woker线程首先执行这个command,然后再去阻塞队列中获取其他任务。所以此时也可以理解“firstTask”这个名字的语义了,就是标识这个worker线程第一次执行的任务
1 |
addWorker(command, true) |
6.4 Worker为什么定义成一个Lock?
线程池并发涉及到的锁
(1)线程池全局范围的锁,保护公共变量works
1 |
private final HashSet<Worker> workers = new HashSet<Worker>(); |
(2)worker范围内存的锁。这个锁是为了解决shutdown时执行interruptidleWokers的主线程和worker线程之间的并发性。
Woker为什么定义成一个lock的?如果说在主函数中定义一个lock的话,那么每启动一个worker都需要定义一个这个worker线程对应的锁,这显然不是一个好的方法,所以此时就将worker定义成一个锁对象。
1. 在两个地方使用了woker的lock
(1)在runWorker()代码,如下
1 2 3 4 5 6 7 |
final void runWorker(Worker w) { ...... while (task != null || (task = getTask()) != null) { w.lock(); ..... } |
(2)在interrutidleWorker中
在shutdown时,需要只关闭idel进程,如果可以获取worker的自身的锁,那么说明此线程就是idle线程。
1 2 3 4 5 6 7 8 9 10 11 |
private void interruptIdleWorkers(boolean onlyOne) { .... for (Worker w : workers) { Thread t = w.thread // 校验是否为idele线程 if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); .... } |
2. 作用
如上(1)和(2)使用,就是为了隔离 worker线程 和 中断worker线程的线程 两个线程。
6.5 任务队列中元素如何清理
通过阻塞队列的take和poll(time)来返回一个任务task,并从task队列中移除。
7 合理配置线程池
7.1 线程大小
7.2 任务优先级
优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理。它可以让优先级高的任务先得到执行,需要注意的是如果一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能执行。
8 线程池监控
通过线程池提供的参数进行监控。线程池里有一些属性在监控线程池的时候可以使用
- taskCount:线程池需要执行的任务数量。
- completedTaskCount:线程池在运行过程中已完成的任务数量。小于或等于taskCount。
- largestPoolSize:线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过。如等于线程池的最大大小,则表示线程池曾经满了。
- getPoolSize:线程池的线程数量。
- getActiveCount:获取活动的线程数。
通过扩展线程池进行监控。通过继承线程池并重写线程池的beforeExecute,afterExecute和terminated方法,我们可以在任务执行前,执行后和线程池关闭前干一些事情。如监控任务的平均执行时间,最大执行时间和最小执行时间等。这几个方法在线程池里是空方法。如:
1 |
protected void beforeExecute(Thread t, Runnable r) { } |
参考
使用线程池10个建议 https://www.nurkiewicz.com/2014/11/executorservice-10-tips-and-tricks.html