实现线程池:Github: custom pool 可视化线程池执行流程:Github: ThreadPoolWorkFlowTest
线程池模型
线程池是一个生产者消费者模型;业务中产生任务,由线程池进行消费; 1、任务:即告知线程如何处理特定数据;任务 = 函数 + 数据; 2、队列:缓冲待处理任务; 3、线程:由线程池管理的可以复用的线程,消费队列任务;
为什么要线程池
1、创建线程和销毁线程的花销是比较大的;使用线程池提前创建线程,直接用即可,提高运行时效率
2、线程不进行管理,设计不当,有可能无休止占用资源;使用线程池方便管理,避免无休止的创建线程,占用系统资源;
3、不建议使用
- Executors线程池参数被隐藏,不可控;
- 如:SingleThreadExecutor、newFixedThreadExecutor都会使用无界队列,存在内存安全问题;
线程池使用注意事项
1. 区分任务类型
- CPU密集型:不依赖外部IO,仅执行计算任务;
- IO密集型:任务中计算部分占用不多,更多的是磁盘、网络IO等;
2. 不同的线程池处理不同的任务
可以对特定任务进行特定的线程池配置,为后期优化提供可能;
3. 线程数量的设定
- CPU密集型:这类任务执行中由于不需要等待IO,因此不需要过多的线程;
- ==线程数 == CPU核数即可==
- IO密集型:需要计算任务中的计算和IO时间占比;得出经验公式:
- 其中,N:CPU核数;WT:IO等待时间;CT:CPU计算时间;
- 当WT==CT,线程数可以设为2N;
- IO等待时间占比越大,可以适当从2N往上增加线程数;
ThreadPoolExecutor
ThreadPoolExecutor线程池参数
// 最多的七参构造器,前五个参数必须 public ThreadPoolExecutor(int corePoolSize, // 核心池大小 int maximumPoolSize, // 最大线程数 long keepAliveTime, // 空闲线程等待时间 TimeUnit unit, // 等待时间单位 BlockingQueue<Runnable> workQueue, // 任务队列 ThreadFactory threadFactory, //线程工厂 RejectedExecutionHandler handler) //拒绝策略
- corePoolSize:核心池的大小,并非线程的最大数量
- maximumPoolSize > corePoolSize
- 在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
- maximumPoolSize:线程池的最大线程数,表示线程池中最多能创建多少个线程
- 非核心线程 = maximumPoolSize-corePoolSize
- 只有队列无法存放更多的任务,才会启用非核心线程;
- 冗余思想设计:核心线程数代表任务执行常态需要的数量,但是对于不好把控的业务,需要设计成冗余的,如果任务过多,需要增加额外的线程;
- keepAliveTime:表示线程没有任务执行时最多保持多久时间会被销毁
- 默认:只有线程池内线程数大于corePoolSize的线程,keepAliveTime才会对其计时
- 当一个线程的空闲时间大于keepAliveTime,则会被终止
- 如果调用了allowCoreThreadTimeOut(boolean),线程池内线程数小于corePoolSize,keepAliveTime也会起作用
- 如果任务的执行时间 > 线程的存活时间,线程则不会被复用;
- unit:参数keepAliveTime的时间单位(七种单位)
- workQueue:选择一个阻塞队列
- threadFactory:线程工厂,线程池用来创建线程的工厂方法。如果不传此参数
- 默认:Executors.defaultThreadFactory()
- NamedThreadFactory:命名线程池
- 默认:
- RejectedExecutionHandler:表示当拒绝处理任务时的策略(线程池无法再容纳的线程) 如果不传此参数,默认:ThreadPoolExecutor.AbortPolicy
// ============ 这里的丢弃,都是针对队列的线程任务 ============ // 丢弃任务并抛出RejectedExecutionException异常。 ThreadPoolExecutor.AbortPolicy // 也是丢弃任务,但是不抛出异常。 ThreadPoolExecutor.DiscardPolicy // 丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程) ThreadPoolExecutor.DiscardOldestPolicy // 由调用线程处理该任务 ThreadPoolExecutor.CallerRunsPolicy
执行流程
可视化线程池执行流程:Github: ThreadPoolWorkFlowTest
提交优先级:核心 > 队列 > 非核心
1、核心线程未满,直接交给核心线程执行; 2、核心线程已经满,并且正在运行,任务提交到队列; 3、核心满,队列满,任务提交给非核心线程; 4、最大线程数满(核心+非核心)、队列满,则拒绝提交;
执行优先级:核心 > 非核心 > 队列
1、先执行核心线程持有的任务 2、执行非核心线程持有的任务 3、最后从队列中获取任务
为什么优先放入队列
队列起到缓冲的作用,创建线程代价比较高,如果每次超出核心线程就创建新的线程,为什么不把核心线程数直接设置大一点呢?
理论上,可以没有非核心线程,但是不能没有队列;非核心线程是为队列而服务的,优先级比队列更低;
线程池状态
Running:正常运行状态;
ShutDown:调用
线程池核心方法
1、
- isShutdown():线程池是否被shutdown,正在停止/或已经停止都算shutdown
- isTerminated():线程池是否已经停止
- isTerminating():线程池是否正在停止,shutdown之后,真正停止之前都是terminating
@Test public void shutdown_thread_pool() throws InterruptedException { CountDownLatch latch = new CountDownLatch(5); for (int i = 0; i < 5; i++) { String message = "Thread [" + i + "]+ is running"; threadPoolExecutor.execute(() -> { System.out.println(message); try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { System.out.println("interrupted"); } finally { latch.countDown(); } }); } // 阻止新来的任务提交,对已经提交了的任务不会产生任何影响, 队列也不受影响 会继续执行完毕,仅仅是不再接受新的任务 // 线程池状态变为:shutdown threadPoolExecutor.shutdown(); logThreadPoolStatus(); // isShutdown:true, isTerminated:false, isTerminating:true,queue size:4 latch.await(); boolean res = threadPoolExecutor.awaitTermination(10, TimeUnit.SECONDS); logThreadPoolStatus(); // isShutdown:true, isTerminated:true, isTerminating:false,queue size:0 }
拒绝策略
https://cloud.tencent.com/developer/article/1520860
Executors
ThreadPoolExecutor的工厂类,提供已经基本配置的线程池;
不建议使用
- Executors线程池参数被隐藏,不可控;
- 如:SingleThreadExecutor、newFixedThreadExecutor都会使用无界队列,存在内存安全问题;
线程池监控及动态调整
ThreadPoolExecutor提供了线程池核心参数的set/get方法;
线程池监控
ThreadPoolExecutor提供了核心线程数、最大线程数、队列、活跃线程数、池内任务总数(正在运行 + 等待运行)等数据的get方法;
可以对外暴露接口,根据线程池的id,获取线程池,再获取线程池的参数,来监控线程池;
线程参数动态调整
public class ThreadPoolExecutor { void setCorePoolSize(); }
ForkJoinPool
传统的线程池将平等执行所有任务,依赖于执行队列;
在传统的线程池基础上,增加分叉、合并的操作,实现并行执行,提高执行效率;
适用场景:计算密集型的父子关系任务;具有优先级的任务;
因此,在web服务领域,使用较少,不适合IO密集型场景;