一、序言
关于“池”的概念,我的理解是它是为了让我们更快的获得资源,节省时间,在我所知的所有池(线程池、连接池、常量池、缓存池、对象池等等),都是这个作用,这里我们仅仅分享线程池的相关理解。
1.我们什么时候要用线程池?
在JAVA 里面我们一切都是对象,线程(Thread)同样也是对象,只要是对象那么就要涉及创建、使用、回收等三个主要步骤。通常情况下,创建线程的时间 和 回收(销毁)线程的时间的开销,由JVM控制,而使用过程由我们控制。假设我们在使用时间很短,并且发生频率很高的情况下,那么线程的频繁创建和销毁就会占用大量的的时间,为了减少这种开销,我们利用线程池技术,创建一个线程池,用的时候从里面拿,不用了放回去,再空闲的时间再进行销毁,能节省时间。
并且在一定程度上,无状态的线程是可以复用的,可以减少对象的创建。
二、功能介绍和设计:我们将分析JAVA线程池,然后写一个出来方便理解
1.创建线程池:
// 这是JDK 提供的此线程池的创建 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
a.corePoolSize :线程池基本线程数,因为一般池的创建分为两种,一种是初始化的时候就进行创建默认的最小连数目,还有一种是当使用的时候才创建,直到创建的数目超过基本数值的时候,后面的获取,才根据池里面的数量空闲状态进行返回,在此之前,都是一直创建,直到超过基本值。
b.maximumPoolSize:线程池最大容量
c.keepAliveTime:线程活动保持时间 ,线程在池里面的空闲的时间,相当于外包人员,在长时间没有接到任务的时候,就让他离职!减少负担。
d.unit:时间单位,这个可以参考TimeUtil 提供了很多精确的时间类型
e.workQueue:工作队列,这里负责维护多个任务调度,比如当我们进网吧的时候是需要刷卡的,当人比较多的时候需要排队,而且同时还要应对结账的人员,这里用队列的方式进行管理。
至于BlockingQueue 的实现,我们后面再介绍。
f.threadFactory:线程工厂,这里面提供了多种线程的创建方式
g:RejectedExecutionHandler:饱和策略,和缓存池类似,我们不可能无限制的分配下去,当线程数量到最大值的时候,我们需要用一种策略进行处理。
具体的处理策略我们也留到后面。
我们的设计如下:
2.功能体现:
既然是线程池,里面肯定放的是线程,同时我们肯定要有执行线程的方法execute,先来看看JDK这部分主体代码:
/** * 自己实现线程池,为了方便理解,按照JDK 的进行编写 * * @author Ran */ public class ThreadPool { private volatile int corePoolSize; private volatile int maximumPoolSize; private volatile ThreadFactory threadFactory; private final BlockingQueue<Runnable> workQueue; private volatile long keepAliveTime; private volatile RejectedExecutionHandler handler; // 当前实际线程数 private volatile int poolSize; // 状态锁,主要用于对poolSize,corePoolSize,maximumPoolSize,runState,workers 更新时的锁定 private final ReentrantLock mainLock = new ReentrantLock(); // 线程的一些状态 volatile int runState; static final int RUNNING = 0; // 不接受新任务了,但是已经加入队列的还会执行 static final int SHUTDOWN = 1; // 停止了,队列里面的任务也不执行了 static final int STOP = 2; // 全部停止,会关闭所有正在执行的线程 static final int TERMINATED = 3; // 存放工作线程的集合 private final HashSet<Worker> workers = new HashSet<Worker>(); // 记录线程池到达的最高峰值 的线程数 private int largestPoolSize; // 构造 public ThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; } // 执行方法 public void execute(Runnable command) { if (command == null) throw new NullPointerException(); // 如果线程池数,超过我们的基本连接数,直接执行下面 // 如果当前线程数小于基本线程数,就创建并执行 ,返回 if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { // 如果创建失败再次检测,如果有正在取线程的就放进去 if (runState == RUNNING && workQueue.offer(command)) { if (runState != RUNNING || poolSize == 0) // 如果线程池不处于运行状态,或者是第一个线程进入,执行 // 确保始终有一个线程执行该任务 ensureQueuedTaskHandled(command); }else if (!addIfUnderMaximumPoolSize(command)) // 否则就执行其他策略 reject(command); // is shutdown or saturated } } // 添加并返回 private boolean addIfUnderCorePoolSize(Runnable firstTask) { Thread t = null; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 可以看出,当前线程数是小于基本线程数,并且线程池处于活动状态 // 那么就进行创建 添加操作 if (poolSize < corePoolSize && runState == RUNNING) t = addThread(firstTask); } finally { mainLock.unlock(); } if (t == null) return false; //然后会启动该线程 t.start(); return true; } // 添加操作 private Thread addThread(Runnable firstTask) { Worker w = new Worker(firstTask); // 创建,这路的创建,如果传的null, 会new 一个,默认在DefaultThreadFactory // 里面 可以看到 Thread t = threadFactory.newThread(w); if (t != null) { // 可以看出,创建的线程实际是worker 对象,里面封装了很多内容 w.thread = t; // 然后保存进去。返回 workers.add(w); int nt = ++poolSize; if (nt > largestPoolSize) largestPoolSize = nt; } return t; } // 排队后从新检查状态,如果处于非RUNNING 状态,会把刚才队列里面的清掉 // 确保有一个线程来处理这个任务(前提是addThread 要成功) private void ensureQueuedTaskHandled(Runnable command) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); boolean reject = false; Thread t = null; try { // 重新检查状态 int state = runState; // 如果挂了,就把刚才那个移除返回true,执行处理的策略了 if (state != RUNNING && workQueue.remove(command)) reject = true; else if (state < STOP && poolSize < Math.max(corePoolSize, 1) && !workQueue.isEmpty()) // 如果线程池禁止添加新任务 了,并且队列不为空,并且基本数未满 t = addThread(null); } finally { mainLock.unlock(); } if (reject) reject(command); else if (t != null) t.start(); } // 调用拒绝执行的策略 void reject(Runnable command) { //handler.rejectedExecution(command, this); } // 队列添加失败(一般是满了)的时候,在满足条件的情况下,会再次创建新 private boolean addIfUnderMaximumPoolSize(Runnable firstTask) { Thread t = null; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (poolSize < maximumPoolSize && runState == RUNNING) t = addThread(firstTask); } finally { mainLock.unlock(); } if (t == null) return false; t.start(); return true; } // 这是内部类,里面对封装了,我们对线程 private final class Worker implements Runnable { // 针对每个线程任务进行控制, private final ReentrantLock runLock = new ReentrantLock(); private Runnable firstTask; Thread thread; public Worker(Runnable firstTask) { this.firstTask = firstTask; } @Override public void run() { } } }
执行方法小结:
上面代码,估计大家看着有点乱,这里我进行描述,然后借助图例,然后再回过头进行理解,就清楚了。整个执行方法无非就坐了几件事情:
1.当前线程数poolSize 小于 corePoolSize基本线程数,也就是说连最小、最基本的线程数都没满足,那么就会执行addIfUnderCorePoolSize方法。
1.1 addIfUnderCorePoolSize 主要是把一个线程包装成Worker对象,会执行addThread方法,用工厂创建线程,然后保存在我们的集合里,然后执行该线程,OK 返回true,失败返回false.
2.当上面条件不满足的情况,我们会看workQueue 是否满了,如果workQueue.offer(command) 成功,表示未满,就保存进队列。
2.1成功判断如果是第一个进入的线程,poolSize == 0,那么会执行ensureQueuedTaskHandled方法,该方法会再次验证线程池处于什么状态,如果是非RUNNING了,就把刚才加入队列的线程,移除,然后执行拒绝的策略,如果remove 失败了,会创建一个线程来,保证该
3. 如果workQueue.offer(command) 失败,说明队列满了,会执行addIfUnderMaximumPoolSize 方法,该方法是去判断线程池是否满了,未满的情况下 ,创建(包装)线程,并且执行,返回true, 否则返回false.
执行拒绝策略。
好吧,如果你还无法理解,我还是用实际例子:
假设我有4台电脑,准备让4个人帮我打文件:corePoolSize = 4.这时候来了3个人 3<4. 我就给他们穿上工作服(Worker包装),然后 让他们做事(addIfUnderCorePoolSize),过了一会又来2个人,这时候发现人够了,那么就把新来的放进等候室(队列)(workQueue.offer(command) ),然后人来多了,再让他们去等候间的时候,发现人满了,这是就要用你指定的策略了(RejectedExecutionHandler)。当然还有一种可能是也许第一个人进来的时候,可以add 失败,也许电脑就坏了,或者卡死了(!RUNNING),这时候就会为了确这个人有活干(ensureQueuedTaskHandled)你得重新检查一下,重新弄弄系统什么的吧!
下面我copy 的逻辑图,给大家再理解理解,图片来源:http://ifeve.com/java-threadpool/
3. 细节处理
上面我们初步解释了连接池的工作原理,但是里面线程怎么工作,怎么管理,以及淘汰策略怎么完成的,这些现在进行解释:
3.1 线程如何工作?
我们回到worker 里面的run 方法:
@Override public void run() { try { Runnable task = firstTask; firstTask = null; // 一直从队列里面取,知道执行完成 while (task != null || (task = getTask()) != null) { runTask(task); task = null; } } finally { workerDone(this);// 退出
上面主要有getTask 和 runTask ,我们分别来看看 这里面做了什么吧;
Runnable getTask() { for (;;) { try { int state = runState; // 放弃队列里面任务执行,直接返回 if (state > SHUTDOWN) return null; Runnable r; // 该状态,会返回已经加入队列连的线程 if (state == SHUTDOWN) // Help drain queue r = workQueue.poll(); // 如果超过的基本线程,并且allowCoreThreadTimeOut // 参数允许回收 else if (poolSize > corePoolSize || allowCoreThreadTimeOut) // 获取超时,那么就会回收 r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS); else // 否则就一直等待可以线程进入 r = workQueue.take(); if (r != null) return r; // 判断是否可以退出 if (workerCanExit()) { // 如果已经停了,就中断所有工作的线程(除当 // 前线程) if (runState >= SHUTDOWN) // Wake up others interruptIdleWorkers(); return null; } // Else retry } catch (InterruptedException ie) { // On interruption, re-check runState } } }
我们可以看出,getTask主要是返回队列中的任务,过程中根据池的不同状态做不同处理,获得task之后,我我们再看看runTask 的执行。
private void runTask(Runnable task) { final ReentrantLock runLock = this.runLock; runLock.lock(); try { // 这段代码的锁和线程池的锁不一样,用多次判断!我不明所以。。 // 在JDK 1.7 里面的变化挺大的,大家可以去参考里面的。 if (runState < STOP && Thread.interrupted() && runState >= STOP) thread.interrupt(); boolean ran = false; // 这是在任务之前执行方法,方便你重写的的。 beforeExecute(thread, task); try { // 看出来了,还是用的Runable 的run 方法 task.run(); ran = true; // 这里也方便重写 afterExecute(task, null); // 这里会记录完成任务的数量 ++completedTasks; } catch (RuntimeException ex) { if (!ran) afterExecute(task, ex); throw ex; } } finally { runLock.unlock(); } }
虽然解释得不很清楚,我相信至少能理解线程在线程池里面主要的额工作流程了。
小结:
1.上面分析了JDK 线程池的基本实现原理,仅供参考
2.JDK1.7 的变化挺大的,我表示很无奈,以后分析源码,还是往JDK1.8 上面靠吧,不然被淘汰了都不知道!
3.整个流程有些在没加锁的地方,老是喜欢用多次判断的方式,因为volatile的可见性,确实可以这么做,我不得不吐槽,写得并不好,这里JDK 1.7 里面进行的大量重构!
4.有写得不好,或者不明白的地方,欢迎大家一起提出,分享,由于篇幅和知识吸收的关系,里面的其他策略,下次分享,这里仅仅分析主要工作流程。
相关推荐
ThreadPoolExecutor源码解析.md
ThreadPoolExecutor源码解析.pdf
线程池原理-ThreadPoolExecutor源码解析 1.构造方法及参数 2.阻塞对列: BlockingQueue 3.线程工厂: DefaultThreadFactory 4.拒绝策略: RejectedExecutionHandler 5.执行线程 Executor
11-线程池 ThreadPoolExecutor 底层原理源码分析(上)-周瑜.pdf 12-线程池 ThreadPoolExecutor底层原理源码分析(下)-周瑜.pdf 13、线程池 ForkJoinPool实战及其工作原理分析 (1).pdf 14、深入理解井发可见性、...
11-线程池 ThreadPoolExecutor 底层原理源码分析(上)-周瑜.pdf 12-线程池 ThreadPoolExecutor底层原理源码分析(下)-周瑜.pdf 13、线程池 ForkJoinPool实战及其工作原理分析 (1).pdf 14、深入理解井发可见性、...
1. 任务提交到线程池中,首先会检测线程池中核心线程数是否已经到达到达上限,若未到达则执行步骤2,反之执行步 2. 初始化一条线程执行任务 3. 尝试将任务放到
线程池ThreadPoolExecutor底层原理源码分析
CorePoolSize: 核心线程池大小, 如果核心线程池有空闲的位置, 新的任务就会被核心线程池新建一个线程执行, 执行完毕不会销毁线程, 线程会进入缓冲队列等待再次被运行 MaximunPoolSize: 线程池能创建最大的线程数量,...
Java,线程池,ThreadPoolExecutor
在《阿里巴巴java开发手册》中指出了线程资源必须通过线程池提供,不允许在应用中自行显示的创建线程,这样一方面是线程的创建更加规范,可以合理控制开辟线程的数量;另一方面线程的细节管理交给线程池处理,优化了...
文中我们深度分析了线程池执行任务的核心流程,在ThreadPoolExecutor类的addWorker(Runnable, boolean)方法中,使用CAS安全的更新线程的数量之后,接下来就是创建新的Worker线程执行任务
如果你打算自己手动配置和调整ThreadPoolExecutor类时,建议先阅读一下下面的注意事项:Core and maximum pool sizesA T
1.资源简介:PyQt5中使用多线程模块QThread解决了PyQt5界面程序执行比较耗时操作时,程序卡顿出现的无响应以及界面输出无法实时显示的问题,采用线程池ThreadPoolExecutor解决了ping多个IP多任务耗时问题。...
线程池ThreadPoolExecutor实战及其原理分析(下)线程池ThreadPoolExecutor实战及其原理分析(下)线程池ThreadPoolExecutor实战及其原理分析(下)线程池ThreadPoolExecutor实战及其原理分析(下)线程池ThreadPoolExecutor...
线程池ThreadPoolExecutor的源码分析,含中文注释,深入了解线程池的构造
11-线程池 ThreadPoolExecutor 底层原理源码分析(上)-周瑜.pdf 12-线程池 ThreadPoolExecutor底层原理源码分析(下)-周瑜.pdf 13、线程池 ForkJoinPool实战及其工作原理分析 (1).pdf 14、深入理解井发可见性、...
于是乎到现在的Hibernate、MyBatis、Spring、Spring MVC、AQS、ThreadPoolExecutor、CountDownLatch使用场景和核心源码分析。 感觉自己还是真的菜鸡,有太多框架的底层实现都不怎么了解。 当山头被一座一座攻克时,...
线程池的工作主要是控制运行的线程的数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数量超过了最大数量,那么超出数量的线程排队等候,等其他线程执行完毕再从队列中取出任务来执行。...
ThreadPoolExecutor使用和思考
相比 threading 等模块,该模块通过 submit 返回的是一个 future 对象,它是一个未来可期的对象,通过它可以获悉线程的状态主线程(或进程)中可以获取某一个线程(进程)执行的状态或者某一个任务执行的状态及返回值: ...