背景
在我们日常开发中总会利用多线程去并发的处理请求,又或者用多线程处理文件的并发写入等等,但是这样会碰到一个问题:如果在并发量很大的情况下,每一个请求都开启一个新线程,那么系统就要花费很大的资源和时间去为请求创建线程和销毁,而且有时候线程的创建和销毁会比线程真正执行的时间还长,这样系统肯定是受不了的。那么在这种情况下,我们通常会用线程池来下列问题:
- 减少每个线程创建和销毁的性能开销
- 对线程进行一些管理和维护,比如定时开始、周期执行以及并发数的控制
那么到底什么是线程池呢?我们下面来详细了解一下!
ThreadPoolExecutor介绍
ThreadPoolExecutor
类线程池的实现,通常我们通过它的构造方法来创建不同配置的线程池。查看它的源码我们可以看到它有如下4种构造方法:
//第一种
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
//第二种
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
//第三种
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
//第四种
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
上述构造方法参数说明:
corePoolSize
核心线程数。即线程池中所存活的线程数,即使这些线程是空闲的,也不会被回收。除非设置了allowCoreThreadTimeOut
为true
。
maximumPoolSize
线程池中所允许的最大线程数量。
keepAliveTime
当线程池中的数量超过核心线程数量时,多余的空闲线程在终止之前等待新任务的最大时间(也可理解为非核心线程在空闲后所存活的时间)。
unit
keepAliveTime
参数的时间单位
workQueue
用于在执行任务之前保存任务的队列。该队列只保存execute
方法所提交的可运行的任务。
该队列的类型通常为BlockingQueue
类型。官网对于该类型的解释如下:
任何类型的BlockingQueue
都可能会被用来传递或者保存被execute
方法提交的任务。此队列的使用与池大小调整相互作用:
1.如果当前运行的线程数量少于核心线程数量,那么执行器总是倾向于添加一个新线程而不是排队。
2.如果当前运行的数量等于或者大于核心线程数量,那么执行器总是倾向于将请求排队而不是添加一个新线程。
3.如果请求不能被排队,那么一个新线程将会被创建除非它将超过线程池所允许的最大线程数量,在这种情况下,任务会被拒绝。
这里有排队的三种策略:
1.直接提交
。工作队列里一个很好的默认选项就是SynchronousQueue
,它会将任务直接传递给线程,而不会保存这些任务。在这里,如果没有立即可用的的线程时,则将任务进行排队的尝试会失败,因此一个新线程将会被创建。此策略在处理可能具有内部依赖项的请求集时避免出现锁。直接提交队列通常需要maximumPoolSize
参数的值没有边界(即无穷大),以避免拒绝新提交的任务。反过来,当提交新任务的速率大于线程处理任务的速率时,就有可能出现无线的线程增长。
2.无边界队列
。当使用无边界的队列(比如说没有设定预先容量的LinkedBlockingQueue
)将会在核心线程很繁忙时造成新提交的任务在队列中进行等待。这也就是说,不会再有超过核心线程数的线程被创建,不管maxmumPoolSize
的值有多大都不会有影响。这可能在每个任务完全独立与其他任务时是合适的,因为每个任务都不会影响其他任务的执行。比如说,在网页服务器中。虽然这种类型的排队方式在平滑短暂的请求突发方面很有用,但是,当提交新任务的速率大于线程处理任务的速率时,也有可能出现无限的队列增长。
3.有界队列
。有界队列(比如说ArrayBlockingQueue
)在使用了maxmumPoolSize
的时候会帮助避免资源被耗尽,但是这将变得非常难以调优和控制。队列大小和最大池大小可以相互交换:使用大队列和小池可以最小化CPU使用、操作系统资源和上下文切换开销,但是会导致人为的底吞吐量。如果任务频繁堵塞(比如说受到IO的限制),那么系统将会为更多的线程安排时间。使用小队列通常需要更大的池,这会使得CPU变得繁忙,但是或许会遇到不可接收的调度开销,着同样也会使得吞吐量下降。
threadFactory
执行器创建新线程时的工厂,又称为线程工厂。可以通过线程工厂对线程的属性进行一些定制。
handler
当执行由于达到了线程边界或者任务队列容量达到饱和而被堵塞时使用的处理器。在这种情况下,执行方法会调用RejectedExecutionHandler.rejectedExecution(Runnable, ThreadPoolExecutor)
方法,java提供了预定义的4种处理策略:
1.ThreadPoolExecutor.AbortPolicy
是默认的处理策略,处理器会在拒绝时抛出一个运行时RejectedExecutionException
异常。
2.ThreadPoolExecutor.CallerRunPolicy
。调用execute本身的线程将运行这个任务。这提供了一个简单的反馈控制机制,可以降低新任务的提交速度。
3.ThreadPoolExecutor.DiscardPolicy
,无法执行的任务将会被丢弃掉。
4.ThreadPoolExecutor.DiscardOldesPolicy
,如果执行器没有shutdown,排在工作队列最前面的任务将会被丢弃掉,然后重新执行(可能再次失败,导致重复执行)。
除了上述4种预先提供的策略之外,我们也可以自定义和用其他的RejectedExecutionHandler
类。但是这样做需要谨慎一点,特别是当策略设计为仅在特定容量或队列策略下工作时。
我们可以通过实现RejectExecutionHandler
接口来实现处理的具体逻辑:
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
}
}
预先提供的线程池
jdk在Executors
中为很多场景提供了预定义的常用ExecutorService
。
固定大小线程池
顾名思义,就是想线程池的线程数量固定在某一个级别。当线程非常繁忙的时候,就将新的任务存放到任务队列中,一旦线程完成了任务,就从对列中取出队列中排队的任务进行执行。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
无界线程池
无界线程池的无界是指线程数量没有边界,即可以无限的创建线程(在理想状况下)。采用的任务队列是SynchronousQueue
队列,这样保证了在核心线程数很繁忙的情况下,会立马为提交的新任务添加新的进程。因为该线程池中的maxmumPoolSize
参数设置的为Integer.MAX_VALUE
,所以我们可以认为它会一直创建线程而不会拒绝提交的任务(理想状态下)。但是这种情况下会耗尽服务器的CPU资源。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
单一线程池
顾名思义,线程池中只有一个核心线程。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
调度线程池
构建一个定长的线程池,该线程池支持周期性的执行任务。
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), handler);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
总结
通过以上线程池的介绍,我们发现线程池的使用范围以及方式是非常广泛和灵活的,所以我们在日常开发中需要根据自己的业务场景来进行合理的使用。