线程池&&队列
Queue: 基本上,一个队列就是一个先入先出(FIFO)的数据结构
Queue接口与List、Set同一级别,都是继承了Collection接口。LinkedList实现了Deque接 口。
队列的实现
Java中对于队列的实现分为非阻塞和阻塞两种。
非阻塞队列分为如下:
- LinkedList
LinkedList是双相链表结构,在添加和删除元素时具有比ArrayList更好的性能。但在 Get 与 Set 方面弱于ArrayList。当然,这些对比都是指数据量很大或者操作很频繁的情况下的对比。
- PriorityQueue
PriorityQueue维护了一个有序列表,存储到队列中的元素会按照自然顺序排列。当然,我们也可以给它指定一个实现了 java.util.Comparator 接口的排序类来指定元素排列的顺序。
- ConcurrentLinkedQueue
ConcurrentLinkedQueue 是基于链接节点的并且线程安全的队列。因为它在队列的尾部添加元素并从头部删除它们,所以只要不需要知道队列的大小 ConcurrentLinkedQueue 对公共集合的共享访问就可以工作得很好。收集关于队列大小的信息会很慢,需要遍历队列。
阻塞队列分为如下:
阻塞队列定义在了java.util.concurrent包中,java.util.concurrent.BlockingQueue 继承了Queue接口,它有 5 个实现类,分别是:
- ArrayBlockingQueue
一个内部由数组支持的有界队列。初始化时必须指定队列的容量,还可以设置内部的ReentrantLock是否使用公平锁。但是公平性会使你在性能上付出代价,只有在的确非常需要的时候再使用它。它是基于数组的阻塞循环队列,此队列按 FIFO(先进先出)原则对元素进行排序。
它的思想就是如果BlockQueue是空的,那么从BlockingQueue取东西的操作将会被阻断进入等待状态,直到BlockingQueue进了东西才会被唤醒。同样,如果BlockingQueue是满的,任何试图往里存东西的操作也会被阻断进入等待状态,直到BlockingQueue里有空间才会被唤醒继续操作。
- LinkedBlockingQueue
一个内部由链接节点支持的可选有界队列。初始化时不需要指定队列的容量,默认是Integer.MAX_VALUE,也可以看成容量无限大。此队列按 FIFO(先进先出)排序元素 。
- PriorityBlockingQueue
一个内部由优先级堆支持的无界优先级队列。PriorityBlockingQueue是对 PriorityQueue的再次包装,队列中的元素按优先级顺序被移除。
- DelayQueue
一个内部由优先级堆支持的、基于时间的调度队列。队列中存放Delayed元素,只有在延迟期满后才能从队列中提取元素。当一个元素的getDelay()方法返回值小于等于0时才能从队列中poll中元素,否则poll()方法会返回null。
- SynchronousQueue
一个利用 BlockingQueue 接口的简单聚集(rendezvous)机制。
队列常用方法
add 添加一个元素 如果队列已满,抛出一个IllegalStateException(“Queue full”)异常
offer 添加一个元素并返回true 如果队列已满,则返回false
put 添加一个元素 如果队列已满,则阻塞
remove 移除并返回队列头部的元素 如果队列为空,则抛出一个NoSuchElementException异常
element 返回队列头部的元素 如果队列为空,则抛出一个NoSuchElementException异常
take 移除并返回队列头部的元素 如果队列为空,则阻塞
peek 返回队列头部的元素 如果队列为空,则返回null
poll 移除并返回队列头部的元素 如果队列为空,则返回null
线程池
https://www.cnblogs.com/CarpenterLee/p/9558026.html
https://blog.csdn.net/achuo/article/details/80623893
https://segmentfault.com/a/1190000014741369
https://blog.csdn.net/u013541140/article/details/95225769
什么是线程池?
线程池是一种多线程处理形式,处理过程中将任务提交到线程池,任务的执行交由线程池来管理。
如果每个请求都创建一个线程去处理,那么服务器的资源很快就会被耗尽,使用线程池可以减少创建和销毁线程的次数,每个工作线程都可以被重复利用,可执行多个任务。
为什么要使用线程池?
创建线程和销毁线程的花销是比较大的,这些时间有可能比处理业务的时间还要长。这样频繁的创建线程和销毁线程,再加上业务工作线程,消耗系统资源的时间,可能导致系统资源不足。(我们可以把创建和销毁的线程的过程去掉)
线程池的作用
线程池作用就是限制系统中执行线程的数量。
1、提高效率 创建好一定数量的线程放在池中,等需要使用的时候就从池中拿一个,这要比需要的时候创建一个线程对象要快的多。
2、方便管理 可以编写线程池管理代码对池中的线程同一进行管理,比如说启动时有该程序创建100个线程,每当有请求的时候,就分配一个线程去工作,如果刚好并发有101个请求,那多出的这一个请求可以排队等候,避免因无休止的创建线程导致系统崩溃。
- 线程池作用就是限制系统中执行线程的数量。
- 根据系统的环境情况,可以自动或手动设置线程数量,达到运行的最佳效果。
- 少了浪费了系统资源,多了造成系统拥挤效率不高。
- 用线程池控制线程数量,其他线程排 队等候。
- 一个任务执行完毕,再从队列的中取最前面的任务开始执行。
- 若队列中没有等待进程,线程池的这一资源处于等待。
- 当一个新任务需要运行时,如果线程池中有等待的工作线程,就可以开始运行了;否则进入等待队列。
为什么要用线程池?
- 减少了创建和销毁线程的次数,每个工作线程都可以被重复利用,可执行多个任务。
- 可以根据系统的承受能力,调整线程池中工作线线程的数目,防止因为消耗过多的内存,而把服务器累趴下(每个线程需要大约1MB内存,线程开的越多,消耗的内存也就越大,最后死机)。
Java里面线程池的顶级接口是Executor,但是严格意义上讲Executor并不是一个线程池,而只是一个执行线程的工具。真正的线程池接口是ExecutorService。
比较重要的几个类
类 | 描述 |
---|---|
ExecutorService | 真正的线程池接口。 |
ScheduledExecutorService | 能和Timer/TimerTask类似,解决那些需要任务重复执行的问题。 |
ThreadPoolExecutor | ExecutorService的默认实现。 |
ScheduledThreadPoolExecutor | 继承ThreadPoolExecutor的ScheduledExecutorService接口实现,周期性任务调度的类实现。 |
Executor和Executors的区别?
Executors 工具类的不同方法按照我们的需求创建了不同的线程池,来满足业务的需求。
Executor 接口对象能执行我们的线程任务。
ExecutorService接口继承了Executor接口并进行了扩展,提供了更多的方法我们能获得任务执行的状态并且可以获取任务的返回值。
使用ThreadPoolExecutor 可以创建自定义线程池。
Future 表示异步计算的结果,他提供了检查计算是否完成的方法,以等待计算的完成,并可以使用get()方法获取计算的结果。
new Thread的弊端
public class TestNewThread {
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
System.out.println("start");
}
}).start();
}
}
执行一个异步任务你还只是如下new Thread吗?
那你就out太多了,new Thread的弊端如下:
- 每次new Thread新建对象性能差。
- 线程缺乏统一管理,可能无限制新建线程,相互之间竞争,及可能占用过多系统资源导致死机或oom。
- 缺乏更多功能,如定时执行、定期执行、线程中断。
相比new Thread,Java提供的四种线程池的好处在于:
- 重用存在的线程,减少对象创建、消亡的开销,性能佳。
- 可有效控制最大并发线程数,提高系统资源的使用率,同时避免过多资源竞争,避免堵塞。
- 提供定时执行、定期执行、单线程、并发数控制等功能。
四种线程池
Java通过Executors提供四种线程池,分别为:
- newCachedThreadPoo
创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。 - newFixedThreadPool
创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。 - newScheduledThreadPool
创建一个定长线程池,支持定时及周期性任务执行。 - newSingleThreadExecutor
创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
Executors创建线程池
Java中创建线程池很简单,只需要调用Executors中相应的便捷方法即可,比如Executors.newFixedThreadPool(int nThreads),但是便捷不仅隐藏了复杂性,也为我们埋下了潜在的隐患(OOM,线程耗尽)。
线程池中的几种重要的参数
corePoolSize:就是线程池中的核心线程数量,这几个核心线程,只是在没有用的时候,也不会被回收
maximumPoolSize:就是线程池中可以容纳的最大线程的数量
keepAliveTime:线程池中除了核心线程之外的其他的最长可以保留的时间,因为在线程池中,除了核心线程即使在无任务的情况下也不能被清除,其余的都是有存活时间的,意思就是非核心线程可以保留的最长的空闲时间,
unit:计算这个时间的一个单位。
workQueue:等待队列,任务可以储存在任务队列中等待被执行,执行的是FIFIO原则(先进先出)。
threadFactory:创建线程的线程工厂。
handler:是一种拒绝策略,我们可以在任务满了之后,拒绝执行某些任务。
Executors创建线程池便捷方法列表:
方法名 | 功能 |
---|---|
newFixedThreadPool(int nThreads) | 创建固定大小的线程池 |
newSingleThreadExecutor() | 创建只有一个线程的线程池 |
newCachedThreadPool() | 创建一个不限线程数上限的线程池,任何提交的任务都将立即执行 |
小程序使用这些快捷方法没什么问题,对于服务端需要长期运行的程序,创建线程池应该直接使用ThreadPoolExecutor的构造方法。没错,上述Executors方法创建的线程池就是ThreadPoolExecutor。
ThreadPoolExecutor
Executors中创建线程池的快捷方法,实际上是调用了ThreadPoolExecutor的构造方法(定时任务使用的是ScheduledThreadPoolExecutor),该类构造方法参数列表如下:
// Java线程池的完整构造函数
public ThreadPoolExecutor(
int corePoolSize, // 线程池长期维持的线程数,即使线程处于Idle状态,也不会回收。
int maximumPoolSize, // 线程数的上限
long keepAliveTime, TimeUnit unit, // 超过corePoolSize的线程的idle时长,
// 超过这个时间,多余的线程会被回收。
BlockingQueue<Runnable> workQueue, // 任务的排队队列
ThreadFactory threadFactory, // 新线程的产生方式
RejectedExecutionHandler handler) // 拒绝策略
竟然有7个参数,很无奈,构造一个线程池确实需要这么多参数。这些参数中,比较容易引起问题的有corePoolSize, maximumPoolSize, workQueue以及handler:
corePoolSize和maximumPoolSize设置不当会影响效率,甚至耗尽线程;
workQueue设置不当容易导致OOM;
handler设置不当会导致提交任务时抛出异常。
正确的参数设置方式会在下文给出。
corePoolSize -> 任务队列 -> maximumPoolSize -> 拒绝策略
拒绝策略
当队列满时,此时便是饱和策略发挥作用的时候了,JDK中定义了四种饱和策略:
1、AbortPolicy:终止策略是默认的饱和策略,当队列满时,会抛出一个RejectExecutionException异常(第一段代码就是例子),客户可以捕获这个异常,根据需求编写自己的处理代码
AbortPolicy(中止策略)
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
功能:当触发拒绝策略时,直接抛出拒绝执行的异常,中止策略的意思也就是打断当前执行流程
使用场景:这个就没有特殊的场景了,但是一点要正确处理抛出的异常。ThreadPoolExecutor 中默认的策略就是AbortPolicy,ExecutorService 接口的系列 ThreadPoolExecutor 因为都没有显示的设置拒绝策略,所以默认的都是这个。但是请注意,ExecutorService 中的线程池实例队列都是无界的,也就是说把内存撑爆了都不会触发拒绝策略。当自己自定义线程池实例时,使用这个策略一定要处理好触发策略时抛的异常,因为他会打断当前的执行流程。
2、DiscardPolicy:策略会悄悄抛弃该任务。
DiscardPolicy(丢弃策略)
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
功能:直接静悄悄的丢弃这个任务,不触发任何动作
使用场景:如果你提交的任务无关紧要,你就可以使用它 。因为它就是个空实现,会悄无声息的吞噬你的的任务。所以这个策略基本上不用了
3、DiscardOldestPolicy:策略将会抛弃下一个将要执行的任务,如果此策略配合优先队列PriorityBlockingQueue,该策略将会抛弃优先级最高的任务
DiscardOldestPolicy(弃老策略)
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
功能:如果线程池未关闭,就弹出队列头部的元素,然后尝试执行
使用场景:这个策略还是会丢弃任务,丢弃时也是毫无声息,但是特点是丢弃的是老的未执行的任务,而且是待执行优先级较高的任务。基于这个特性,我能想到的场景就是,发布消息,和修改消息,当消息发布出去后,还未执行,此时更新的消息又来了,这个时候未执行的消息的版本比现在提交的消息版本要低就可以被丢弃了。因为队列中还有可能存在消息版本更低的消息会排队执行,所以在真正处理消息的时候一定要做好消息的版本比较
4、CallerRunsPolicy:调用者运行策略,该策略不会抛出异常,不会抛弃任务,而是将任务回退给调用者线程执行(调用execute方法的线程),由于任务需要执行一段时间,所以在此期间不能提交任务,从而使工作线程有时间执行正在执行的任务。
CallerRunsPolicy(调用者运行策略)
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
功能:当触发拒绝策略时,只要线程池没有关闭,就由提交任务的当前线程处理。
使用场景:一般在不允许失败的、对性能要求不高、并发量较小的场景下使用,因为线程池一般情况下不会关闭,也就是提交的任务一定会被运行,但是由于是调用者线程自己执行的,当多次提交任务时,就会阻塞后续任务执行,性能和效率自然就慢了。
功能:当触发拒绝策略时,直接抛出拒绝执行的异常,中止策略的意思也就是打断当前执行流程
使用场景:这个就没有特殊的场景了,但是一点要正确处理抛出的异常。ThreadPoolExecutor 中默认的策略就是AbortPolicy,ExecutorService 接口的系列 ThreadPoolExecutor 因为都没有显示的设置拒绝策略,所以默认的都是这个。但是请注意,ExecutorService 中的线程池实例队列都是无界的,也就是说把内存撑爆了都不会触发拒绝策略。当自己自定义线程池实例时,使用这个策略一定要处理好触发策略时抛的异常,因为他会打断当前的执行流程。
线程池的关闭
关闭线程池可以调用shutdownNow和shutdown两个方法来实现
shutdownNow:对正在执行的任务全部发出interrupt(),停止执行,对还未开始执行的任务全部取消,并且返回还没开始的任务列表。
shutdown:当我们调用shutdown后,线程池将不再接受新的任务,但也不会去强制终止已经提交或者正在执行中的任务。
Runnable和Callable
可以向线程池提交的任务有两种:Runnable和Callable,二者的区别如下:
- 方法签名不同,void Runnable.run(), V Callable.call() throws Exception
- 是否允许有返回值,Callable允许有返回值
- 是否允许抛出异常,Callable允许抛出异常。
Callable是JDK1.5时加入的接口,作为Runnable的一种补充,允许有返回值,允许抛出异常。
三种提交任务的方式:
| 提交方式 | 是否关心返回结果 |
| ———————————- | ———————————————– |
| Future
| void execute(Runnable command) | 否 |
| Future<?> submit(Runnable task) | 否,虽然返回Future,但是其get()方法总是返回null |
线程池都有哪几种工作队列
1、ArrayBlockingQueue
是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。
2、LinkedBlockingQueue
一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列
3、SynchronousQueue
一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。
4、PriorityBlockingQueue
一个具有优先级的无限阻塞队列。
如何正确使用线程池
避免使用无界队列
不要使用Executors.newXXXThreadPool()快捷方法创建线程池,因为这种方式会使用无界的任务队列,为避免OOM,我们应该使用ThreadPoolExecutor的构造方法手动指定队列的最大长度:
ExecutorService executorService = new ThreadPoolExecutor(2, 2,
0, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(512), // 使用有界队列,避免OOM
new ThreadPoolExecutor.DiscardPolicy());
明确拒绝任务时的行为
任务队列总有占满的时候,这是再submit()提交新的任务会怎么样呢?RejectedExecutionHandler接口为我们提供了控制方式,接口定义如下:
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
拒绝策略 | 拒绝行为 |
---|---|
AbortPolicy | 抛出RejectedExecutionException |
DiscardPolicy | 什么也不做,直接忽略 |
DiscardOldestPolicy | 丢弃执行队列中最老的任务,尝试为当前提交的任务腾出位置 |
CallerRunsPolicy | 直接由提交任务者执行这个任务 |
线程池默认的拒绝行为是AbortPolicy,也就是抛出RejectedExecutionHandler异常,该异常是非受检异常,很容易忘记捕获。如果不关心任务被拒绝的事件,可以将拒绝策略设置成DiscardPolicy,这样多余的任务会悄悄的被忽略。
ExecutorService executorService = new ThreadPoolExecutor(2, 2,
0, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(512),
new ThreadPoolExecutor.DiscardPolicy());// 指定拒绝策略
submit()和execute()的以及shutdown()和shutdownNow()的区别
- submit(),提交一个线程任务,可以接受回调函数的返回值吗,适用于需要处理返回着或者异常的业务场景
- execute(),执行一个任务,没有返回值
- shutdown(),表示不再接受新任务,但不会强行终止已经提交或者正在执行中的任务
- shutdownNow(),对于尚未执行的任务全部取消,正在执行的任务全部发出interrupt(),停止执行
五种线程池的适应场景
- newCachedThreadPool:用来创建一个可以无限扩大的线程池,适用于服务器负载较轻,执行很多短期异步任务。
- newFixedThreadPool:创建一个固定大小的线程池,因为采用无界的阻塞队列,所以实际线程数量永远不会变化,适用于可以预测线程数量的业务中,或者服务器负载较重,对当前线程数量进行限制。
- newSingleThreadExecutor:创建一个单线程的线程池,适用于需要保证顺序执行各个任务,并且在任意时间点,不会有多个线程是活动的场景。
- newScheduledThreadPool:可以延时启动,定时启动的线程池,适用于需要多个后台线程执行周期任务的场景。
- newWorkStealingPool:创建一个拥有多个任务队列的线程池,可以减少连接数,创建当前可用cpu数量的线程来并行执行,适用于大耗时的操作,可以并行来执行