Java并发(2):ThreadLocal、线程池
1 线程局部变量(ThreadLocal)
ThreadLocal的实现依赖于Thread类中的一个ThreadLocalMap字段,这是一个存储ThreadLocal变量本身和对应值的映射。每个线程都有自己的ThreadLocalMap实例,用于存储该线程所持有的所有ThreadLocal变量的值。
public static final SimpleDateFormat dateFormat=new SimpleDateFormat("yyyyMMdd HHmm"); |
1.1 内存泄漏
ThreadLocalMap
中使用的 key 为 ThreadLocal
的弱引用,而 value 是强引用。所以,如果 ThreadLocal
没有被外部强引用的情况下,在垃圾回收的时候,key 会被清理掉,而 value 不会被清理掉。
这样一来,ThreadLocalMap
中就会出现 key 为 null 的 Entry。假如我们不做任何措施的话,value 永远无法被 GC 回收,这个时候就可能会产生内存泄露。ThreadLocalMap
实现中已经考虑了这种情况,在调用 set()
、get()
、remove()
方法的时候,会清理掉 key 为 null 的记录。使用完 ThreadLocal
方法后最好手动调用remove()
方法
当一个线程结束时,其ThreadLocalMap也会随之销毁,但是ThreadLocal对象本身不会立即被垃圾回收,直到没有其他引用指向它为止。因此,在使用ThreadLocal时需要注意,如果不显式调用remove()方法,或者线程结束时未正确清理ThreadLocal变量,可能会导致内存泄漏,因为ThreadLocalMap会持续持有ThreadLocal变量的引用,即使这些变量不再被其他地方引用。因此,实际应用中需要在使用完ThreadLocal变量后调用remove()方法释放资源。
内存泄露
-
ThreadLocalMap
使用ThreadLocal
的弱引用作为key
,当ThreadLocal
变量被手动设置为null
,即一个ThreadLocal
没有外部强引用来引用它,当系统GC时,ThreadLocal
一定会被回收。这样的话,ThreadLocalMap
中就会出现key
为null
的Entry
,就没有办法访问这些key
为null
的Entry
的value
,如果当前线程再迟迟不结束的话(比如线程池的核心线程),这些key
为null
的Entry
的value
就会一直存在一条强引用链:Thread变量 -> Thread对象 -> ThreaLocalMap -> Entry -> value -> Object 永远无法回收,造成内存泄漏。 -
线程池的核心线程会被复用,如果在一个线程中使用了
ThreadLocal
,并且没有正确清理,当下次这个线程被复用执行其他任务时,可能会得到上一次遗留下来的数据,从而导致错误的结果。
1.2 引用类型
1.强引用(StrongReference)
当内存空间不足,Java 虚拟机抛出 OutOfMemoryError 错误,使程序异常终止
2.软引用(SoftReference)
可有可无,如果内存空间足够,垃圾回收器就不会回收它,如果内存空间不足,就会回收这些对象的内存。只要垃圾回收器没有回收它,该对象就可以被程序使用。软引用可用来实现内存敏感的高速缓存。
3.弱引用(WeakReference)
可有可无,弱引用与软引用的区别在于:只具有弱引用的对象拥有更短暂的生命周期。在垃圾回收器线程扫描它所管辖的内存区域的过程中,一旦发现了只具有弱引用的对象,不管当前内存空间足够与否,都会回收它的内存。
4.虚引用(PhantomReference)
虚引用并不会决定对象的生命周期。如果一个对象仅持有虚引用,那么它就和没有任何引用一样,在任何时候都可能被垃圾回收。虚引用主要用来跟踪对象被垃圾回收的活动。
虚引用必须和引用队列(ReferenceQueue)联合使用。当垃圾回收器准备回收一个对象时,如果发现它还有虚引用,就会在回收对象的内存之前,把这个虚引用加入到与之关联的引用队列中。
弱引用可以和一个引用队列(ReferenceQueue)联合使用,如果弱引用所引用的对象被垃圾回收,Java 虚拟机就会把这个弱引用加入到与之关联的引用队列中。
2 线程池
池化技术,线程池、数据库连接池、HTTP 连接池等等都是对这个思想的应用。池化技术的思想主要是为了减少每次获取资源的消耗,提高对资源的利用率。线程池是管理一系列线程的资源池。
-
降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
-
提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
-
提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控
2.1 创建线程池
方式一:通过ThreadPoolExecutor
构造函数来创建(推荐)。
方式二:通过 Executor
框架的工具类 Executors
来创建。
-
FixedThreadPool
:固定线程数量的线程池。该线程池中的线程数量始终不变。当有一个新的任务提交时,线程池中若有空闲线程,则立即执行。若没有,则新的任务会被暂存在一个任务队列中,待有线程空闲时,便处理在任务队列中的任务。 -
SingleThreadExecutor
: 只有一个线程的线程池。若多余一个任务被提交到该线程池,任务会被保存在一个任务队列中,待线程空闲,按先入先出的顺序执行队列中的任务。 -
CachedThreadPool
: 可根据实际情况调整线程数量的线程池。线程池的线程数量不确定,但若有空闲线程可以复用,则会优先使用可复用的线程。若所有线程均在工作,又有新的任务提交,则会创建新的线程处理任务。所有线程在当前任务执行完毕后,将返回线程池进行复用。 -
ScheduledThreadPool
:给定的延迟后运行任务或者定期执行任务的线程池。
方法 | 描述 | |
---|---|---|
newCachedThreadPool | 必要时创建新线程,空闲线程保留60秒 | |
newFixedThreadPool | 包含固定数量线程,空闲线程保留 | LinkedBlockingQueue |
newSingleThreadExecutor | 只有一个线程 | LinkedBlockingQueue |
预定执行 | ||
newScheduledThreadPool | 用于预定执行而构建的固定线程池 | |
newSingleThreadSccheduledExecutor | 用于预定执行而构建的单线程池 | |
Executors 返回线程池对象的弊端如下: |
-
FixedThreadPool
和SingleThreadExecutor
:使用的是无界的LinkedBlockingQueue
,任务队列最大长度为Integer.MAX_VALUE
,可能堆积大量的请求,从而导致 OOM。 -
CachedThreadPool
:使用的是同步队列SynchronousQueue
, 允许创建的线程数量为Integer.MAX_VALUE
,如果任务数量过多且执行速度较慢,可能会创建大量的线程,从而导致 OOM。 -
ScheduledThreadPool
和SingleThreadScheduledExecutor
:使用的无界的延迟阻塞队列DelayedWorkQueue
,任务队列最大长度为Integer.MAX_VALUE
,可能堆积大量的请求,从而导致 OOM。
2.2 线程池参数
int corePoolSize,//线程池的核心线程数量 |
-
corePoolSize(核心线程数):这是线程池的基本大小,即线程池中始终保持的线程数量,不受空闲时间的影响。当有新任务提交时,如果线程池中的线程数小于corePoolSize,即便存在空闲线程,线程池也会创建一个新线程来执行任务。
-
maximumPoolSize(最大线程数):线程池允许创建的最大线程数量。当线程池中的线程数超过corePoolSize,但小于maximumPoolSize时,只有在队列满的情况下,才会创建新的线程来处理任务。
-
keepAliveTime(空闲线程存活时间):当线程池中的线程数超过corePoolSize时,多余的线程在空闲时间超过keepAliveTime后会被销毁,直到线程池中的线程数不超过corePoolSize。这有助于在系统空闲时节省资源。
-
unit(时间单位):这是keepAliveTime的时间单位,如秒、毫秒等。它定义了如何解释keepAliveTime的值。
-
workQueue(工作队列):这是一个用于存储等待执行的任务的队列。当线程池中的线程数达到corePoolSize,且这些线程都在执行任务时,新提交的任务会被放入workQueue中等待执行。
-
threadFactory(线程工厂):这是一个用于创建新线程的对象。通过自定义线程工厂,我们可以控制新线程的创建方式,例如设置线程的名字、是否是守护线程等。
-
handler(拒绝策略):当所有线程都在繁忙,且workQueue也已满时,新提交的任务会被拒绝。此时,就需要一个拒绝策略来处理这种情况。常见的拒绝策略有抛出异常、丢弃当前任务、丢弃队列中最旧的任务或者由调用线程直接执行该任务等。
ThreadPoolExecutor
3 个最重要的参数:
-
corePoolSize
: 任务队列未达到队列容量时,最大可以同时运行的线程数量。 -
maximumPoolSize
: 任务队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。 -
workQueue
: 新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。
ThreadPoolExecutor
其他常见参数 :
-
keepAliveTime
:线程池中的线程数量大于corePoolSize
的时候,如果这时没有新的任务提交,多余的空闲线程不会立即销毁,而是会等待,直到等待的时间超过了keepAliveTime
才会被回收销毁,线程池回收线程时,会对核心线程和非核心线程一视同仁,直到线程池中线程的数量等于corePoolSize
,回收过程才会停止。 -
unit
:keepAliveTime
参数的时间单位。 -
threadFactory
:executor 创建新线程的时候会用到。 -
handler
:饱和策略。关于饱和策略下面单独介绍一下。
2.3 拒绝策略
如果当前同时运行的线程数量达到最大线程数量并且队列也已经被放满了任务时,ThreadPoolExecutor
定义一些策略:
-
ThreadPoolExecutor.AbortPolicy
:抛出 RejectedExecutionException来拒绝新任务的处理。 -
ThreadPoolExecutor.CallerRunsPolicy
:任务回退给调用者自己运行。调用执行自己的线程运行任务,也就是直接在调用execute
方法的线程中运行(run
)被拒绝的任务,如果执行程序已关闭,则会丢弃该任务。因此这种策略会降低对于新任务提交速度,影响程序的整体性能。如果你的应用程序可以承受此延迟并且你要求任何一个任务请求都要被执行的话,你可以选择这个策略。 -
ThreadPoolExecutor.DiscardPolicy
:不处理新任务,直接丢弃掉。 -
ThreadPoolExecutor.DiscardOldestPolicy
:此策略将丢弃最早的未处理的任务请求。
2.3.1 CallerRunsPolicy 拒绝策略有什么风险?如何解决?
如果想要保证任何一个任务请求都要被执行的话,那选择
CallerRunsPolicy
拒绝策略更合适一些。
如果走到CallerRunsPolicy
的任务是个非常耗时的任务,且处理提交任务的线程是主线程,可能会导致主线程阻塞=,影响程序的正常运行。
解决办法
-
增加阻塞队列
BlockingQueue
的大小并调整堆内存以容纳更多的任务,确保任务能够被准确执行。 -
为了充分利用 CPU,我们还可以调整线程池的
maximumPoolSize
(最大线程数)参数,这样可以提高任务处理速度,避免累计在BlockingQueue
的任务过多导致内存用完。 -
任务持久化
- 设计一张任务表将任务存储到 MySQL 数据库中。
- Redis缓存任务
- 将任务提交到消息队列
任务持久化
:
-
拒绝策略: 实现
RejectedExecutionHandler
接口自定义拒绝策略,自定义拒绝策略负责将线程池暂时无法处理(此时阻塞队列已满)的任务入库(保存到 MySQL 中)。注意:线程池暂时无法处理的任务会先被放在阻塞队列中,阻塞队列满了才会触发拒绝策略。 -
任务队列:继承
BlockingQueue
实现一个混合式阻塞队列,该队列包含JDK
自带的ArrayBlockingQueue
。另外,该混合式阻塞队列需要修改取任务处理的逻辑,也就是重写take()
方法,取任务时优先从数据库中读取最早的任务,数据库中无任务时再从ArrayBlockingQueue
中去取任务。
2.4 线程池阻塞队列
新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。
-
容量为
Integer.MAX_VALUE
的LinkedBlockingQueue
(无界队列):FixedThreadPool
和SingleThreadExector
。FixedThreadPool
最多只能创建核心线程数的线程(核心线程数和最大线程数相等),SingleThreadExector
只能创建一个线程(核心线程数和最大线程数都是 1),二者的任务队列永远不会被放满。 -
SynchronousQueue
(同步队列):CachedThreadPool
。SynchronousQueue
没有容量,不存储元素,目的是保证对于提交的任务,如果有空闲线程,则使用空闲线程来处理;否则新建一个线程来处理任务。也就是说,CachedThreadPool
的最大线程数是Integer.MAX_VALUE
,可以理解为线程数是可以无限扩展的,可能会创建大量线程,从而导致 OOM。 -
DelayedWorkQueue
(延迟阻塞队列):ScheduledThreadPool
和SingleThreadScheduledExecutor
。DelayedWorkQueue
的内部元素并不是按照放入的时间排序,而是会按照延迟的时间长短对任务进行排序,内部采用的是“堆”的数据结构,可以保证每次出队的任务都是当前队列中执行时间最靠前的。DelayedWorkQueue
添加元素满了之后会自动扩容原来容量的 1/2,即永远不会阻塞,最大扩容可达Integer.MAX_VALUE
,所以最多只能创建核心线程数的线程。
2.5 处理任务流程
-
如果当前运行的线程数小于核心线程数,那么就会新建一个线程来执行任务。
-
如果当前运行的线程数等于或大于核心线程数,但是小于最大线程数,那么就把该任务放入到任务队列里等待执行。
-
如果向任务队列投放任务失败(任务队列已经满了),但是当前运行的线程数是小于最大线程数的,就新建一个线程来执行任务。
-
如果当前运行的线程数已经等同于最大线程数了,新建线程将会使当前运行的线程超出最大线程数,那么当前任务会被拒绝,饱和策略会调用
RejectedExecutionHandler.rejectedExecution()
方法。
2.6 线程池中线程异常后,销毁还是复用?
先说结论,需要分两种情况:
-
使用
execute()
提交任务:当任务通过execute()
提交到线程池并在执行过程中抛出异常时,如果这个异常没有在任务内被捕获,那么该异常会导致当前线程终止,并且异常会被打印到控制台或日志文件中。线程池会检测到这种线程终止,并创建一个新线程来替换它,从而保持配置的线程数不变。 -
使用
submit()
提交任务:对于通过submit()
提交的任务,如果在任务执行中发生异常,这个异常不会直接打印出来。相反,异常会被封装在由submit()
返回的Future
对象中。当调用Future.get()
方法时,可以捕获到一个ExecutionException
。在这种情况下,线程不会因为异常而终止,它会继续存在于线程池中,准备执行后续的任务。
简单来说:使用execute()
时,未捕获异常导致线程终止,线程池创建新线程替代;使用submit()
时,异常被封装在Future
中,线程继续复用。
这种设计允许submit()
提供更灵活的错误处理机制,因为它允许调用者决定如何处理异常,而execute()
则适用于那些不需要关注执行结果的场景。
2.7 设计根据任务的优先级来执行的线程池
PriorityBlockingQueue
是一个支持优先级的无界阻塞队列,可以看作是线程安全的 PriorityQueue
,两者底层都是使用小顶堆形式的二叉堆,即值最小的元素优先出队。不过,PriorityQueue
不支持阻塞操作。
要想让 PriorityBlockingQueue
实现对任务的排序,传入其中的任务必须是具备排序能力的,方式有两种:
-
提交到线程池的任务实现
Comparable
接口,并重写compareTo
方法来指定任务之间的优先级比较规则。 -
创建
PriorityBlockingQueue
时传入一个Comparator
对象来指定任务之间的排序规则(推荐)。
-
PriorityBlockingQueue
是无界的,可能堆积大量的请求,从而导致 OOM
继承PriorityBlockingQueue
并重写一下 offer
方法(入队)的逻辑,当插入的元素数量超过指定值就返回 false 。
2.8 Executor框架
-
主线程首先要创建实现
Runnable
或者Callable
接口的任务对象。 -
把创建完成的实现
Runnable
/Callable
接口的 对象直接交给ExecutorService
执行:ExecutorService.execute(Runnable command)
)或者也可以把Runnable
对象或Callable
对象提交给ExecutorService
执行(ExecutorService.submit(Runnable task)
或ExecutorService.submit(Callable <T> task)
)。 -
如果执行
ExecutorService.submit(…)
,ExecutorService
将返回一个实现Future
接口的对象(我们刚刚也提到过了执行execute()
方法和submit()
方法的区别,submit()
会返回一个FutureTask 对象)。由于 FutureTask
实现了Runnable
,我们也可以创建FutureTask
,然后直接交给ExecutorService
执行。 -
最后,主线程可以执行
FutureTask.get()
方法来等待任务执行完成。主线程也可以执行FutureTask.cancel(boolean mayInterruptIfRunning)
来取消此任务的执行。
2.9 关闭线程池
-
shutdown()
:关闭线程池,线程池的状态变为SHUTDOWN
。线程池不再接受新任务了,但是队列里的任务得执行完毕。 -
shutdownNow()
:关闭线程池,线程池的状态变为STOP
。线程池会终止当前正在运行的任务,停止处理排队的任务并返回正在等待执行的 List。
2.10 动态线程池
提供三个方法修改线程池参数,通过配置中心进行变更推送:
-
setCorePoolSize
-
setMaximumPoolSize
-
setKeepAliveTime
3 阻塞队列(生产者/消费者模型)
poll和peek返回空来表示失败,因此,不能向队列中插入null
队列 | 描述 | 使用的线程池 | |
---|---|---|---|
LinkedBlockingQueue | 无界队列 | 无上边界,链表实现 | FixedThreadPool 和 SingleThreadExector |
ArrayBlockingQueue(int capacity, boolean fair) | 构建指定容量和公平性设置(可选),循环数组实现 | ||
PriorityBlockingQueue | 优先级阻塞队列 | 无界,阻塞优先队列 | |
DelayQueue | 无界,延迟超过指定时间的元素可以移出 | ScheduledThreadPool 和 SingleThreadScheduledExecutor |
|
SynchronousQueue | 同步阻塞队列 | 没有容量,不存储元素 | CachedThreadPool |