1.记一次CPU占用率较高问题排查
2.喜提JDK的源码BUG一枚!多线程的源码情况下请谨慎使用这个类的stream遍历。
3.java线程池(一):java线程池基本使用及Executors
4.LinkedBlockingQueue
记一次CPU占用率较高问题排查
在项目开发调试过程中,源码我们遇到了服务组件CPU占用率较高的源码问题。在G内存、源码核CPU的源码系统源码品牌Linux环境下,服务组件在无数据处理情况下,源码CPU占用率维持在-%,源码引起关注。源码 我们首先分析可能的源码原因,结合实际场景,源码判断可能是源码代码中有耗CPU的操作。接下来,源码我们分步骤进行排查: 获取进程ID 查找CPU使用率较高的源码线程,发现三个线程ID分别为、源码、。 使用ps命令获取线程详细信息,输出至jstack.txt文件便于观察。 转换操作系统线程ID为进制,定位到堆栈信息中线程ID=F4。 定位到异常代码在UploadRunner类的run()方法内,while(true)循环获取阻塞队列元素进行消费。使用的是非阻塞方式poll()方法,导致线程自旋。 排查发现,当前有三个线程在自旋空转,导致CPU占用率持续较高。调整为使用queue.take()方法,让消费线程阻塞获取元素。算命 源码下载 验证结果,CPU占用率稳定在0%-1%之间,问题得到解决。 通过分析JDK BlockingQueue阻塞队列的实现方式,了解到调用take()方法后,队列为空时线程会被阻塞等待,从而避免了CPU资源的大量占用。 总结,通过仔细分析jvm运行原理、运用系统命令和排查工具,我们可以有效地定位和解决此类问题。面对复杂情况,关键在于深入理解原理并灵活应用工具。喜提JDK的BUG一枚!多线程的情况下请谨慎使用这个类的stream遍历。
在探讨问题之前,我们先回顾一下 LinkedBlockingQueue 的线程安全性。在传统的观点中,LinkedBlockingQueue 是线程安全的,因为它内部使用了 ReentrantLock。然而,就在 RocketMQ 的讨论版中,一个问题揭示了 LinkedBlockingQueue 在特定情况下的线程不安全性,引发了我们的好奇心。
核心问题在于 LinkedBlockingQueue 的 stream 遍历方式,在多线程环境下可能出现死循环。我们通过一个简单的 demo 来深入分析这一现象。首先,引入了一个链接,bootstrap项目源码其中详细展示了如何在多线程环境下复现这一 Bug。
在分析代码之前,让我们先明确 demo 的基本逻辑:创建了 个线程,每个线程不断调用 offer 和 remove 方法。主线程则通过 stream 对 queue 进行遍历,目标是找到队列中的第一个非空元素。这看似是一个简单的遍历操作,但事实并非如此。
关键点在于 tryAdvance 方法,看似平凡的遍历操作隐藏了陷阱。当运行代码时,预期的输出并未出现,而是陷入了一个死循环,控制台仅输出了一行信息或交替输出几次后停止。
我们的疑问指向了 JDK 版本,尤其是 JDK 8。通过替换为 JDK ,我们观察到交替输出的效果。这使得我们大胆推测,这可能是 JDK 8 版本的 Bug。为了验证这一假设,我们进行了详细的分析。
通过线程 dump 文件,我们发现主线程始终处于可运行状态,似乎没有被锁阻塞。然而,从控制台的输出来看,它似乎处于阻塞状态。这一现象让我们联想到一个经典的dota助手源码场景:线程陷入死循环。
通过深入源码分析,我们发现了死循环的根源。在 stream 遍历的关键方法 tryAdvance 中,存在一个 while 循环,其条件始终满足,导致死循环。而问题的核心在于移除队列头部元素的代码逻辑,当有其他线程不断调用 remove 方法时,可能会形成特定的节点结构,触发死循环。
经过详细的分析,我们揭示了这一 Bug 的原理,并通过简化代码演示了整个过程。通过将实例代码简化,我们揭示了死循环是如何在多线程环境下产生的。这不仅有助于理解 Bug 的本质,也为后续的 Bug 修复提供了思路。
为了验证解决方案的正确性,我们对比了 JDK 8 和 JDK 的源码差异。在 JDK 中,通过引入了一个名为 succ 的方法,成功解决了死循环问题。这一方法通过确保节点不会指向自身,从而避免了死循环的产生。
通过这篇文章的分析,我们不仅揭示了 LinkedBlockingQueue 在特定条件下的线程不安全性,还探讨了如何通过升级 JDK 版本、避免使用 stream 遍历,以及使用 synchronized 修饰符等方式来规避此类问题。cwm recovery 源码同时,我们还延伸至其他数据结构,如 ConcurrentHashMap,讨论了它们在不同使用场景下的线程安全性问题。
最后,我们再次强调在多线程环境下,LinkedBlockingQueue 的 stream 遍历方式可能存在一定的问题,可能会导致死循环。理解并解决这类 Bug,对于确保代码的健壮性和性能至关重要。
java线程池(一):java线程池基本使用及Executors
@[toc] 在前面学习线程组的时候就提到过线程池。实际上线程组在我们的日常工作中已经不太会用到,但是线程池恰恰相反,是我们日常工作中必不可少的工具之一。现在开始对线程池的使用,以及底层ThreadPoolExecutor的源码进行分析。1.为什么需要线程池我们在前面对线程基础以及线程的生命周期有过详细介绍。一个基本的常识就是,线程是一个特殊的对象,其底层是依赖于JVM的native方法,在jvm虚拟机内部实现的。线程与普通对象不一样的地方在于,除了需要在堆上分配对象之外,还需要给每个线程分配一个线程栈、以及本地方法栈、程序计数器等线程的私有空间。线程的初始化工作相对于线程执行的大多数任务而言,都是一个耗时比较长的工作。这与数据库使用一样。有时候我们连接数据库,仅仅只是为了执行一条很小的sql语句。但是在我们日常的开发工作中,我们的绝大部分工作内容,都会分解为一个个短小的执行任务来执行。这样才能更加合理的复用资源。这种思想就与我们之前提到的协程一样。任务要尽可能的小。但是在java中,任务不可能像协程那样拆分得那么细。那么试想,如果说,有一个已经初始化好的很多线程,在随时待命,那么当我们有任务提交的时候,这些线程就可以立即工作,无缝接管我们的任务请求。那么效率就会大大增加。这些个线程可以处理任何任务。这样一来我们就把实际的任务与线程本身进行了解耦。从而将这些线程实现了复用。 这种复用的一次创建,可以重复使用的池化的线程对象就被成为线程池。 在线程池中,我们的线程是可以复用的,不用每次都创建一个新的线程。减少了创建和销毁线程的时间开销。 同时,线程池还具有队列缓冲策略,拒绝机制和动态线程管理。可以实现线程环境的隔离。当一个线程有问题的时候,也不会对其他的线程造成影响。 以上就是我们使用线程池的原因。一句话来概括就是资源复用,降低开销。
2.java中线程池的实现在java中,线程池的主要接口是Executor和ExecutorService在这两个接口中分别对线程池的行为进行了约束,最主要的是在ExecutorService。之后,线程池的实际实现类是AbstractExecutorService类。这个类有三个主要的实现类,ThreadpoolExecutorService、ForkJoinPool以及DelegatedExecutorService。
后面我们将对这三种最主要的实现类的源码以及实现机制进行分析。
3.创建线程的工厂方法Executors在java中, 已经给我们提供了创建线程池的工厂方法类Executors。通过这个类以静态方法的模式可以为我们创建大多数线程池。Executors提供了5种创建线程池的方式,我们先来看看这个类提供的工厂方法。
3.1 newFixedThreadPool/** * Creates a thread pool that reuses a fixed number of threads * operating off a shared unbounded queue.At any point, at most * { @code nThreads} threads will be active processing tasks. * If additional tasks are submitted when all threads are active, * they will wait in the queue until a thread is available. * If any thread terminates due to a failure during execution * prior to shutdown, a new one will take its place if needed to * execute subsequent tasks.The threads in the pool will exist * until it is explicitly { @link ExecutorService#shutdown shutdown}. * * @param nThreads the number of threads in the pool * @return the newly created thread pool * @throws IllegalArgumentException if { @code nThreads <= 0} */public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());}这个方法能够创建一个固定线程数量的无界队列的线程池。参数nthreads是最多可同时处理的活动的线程数。如果在所有线程都在处理任务的情况下,提交了其他的任务,那么这些任务将处于等待队列中。直到有一个线程可用为止。如果任何线程在关闭之前的执行过程中,由于失败而终止,则需要在执行后续任务的时候,创建一个新的线程来替换。线程池中的所有线程都将一直存在,直到显示的调用了shutdown方法。 上述方法能创建一个固定线程数量的线程池。内部默认的是使用LinkedBlockingQueue。但是需要注意的是,这个LinkedBlockingQueue底层是链表结构,其允许的最大队列长度为Integer.MAX_VALUE。
public LinkedBlockingQueue() { this(Integer.MAX_VALUE);}这样在使用的过程中如果我们没有很好的控制,那么就可能导致内存溢出,出现OOM异常。因此这种方式实际上已经不被提倡。我们在使用的过程中应该谨慎使用。 newFixedThreadPool(int nThreads, ThreadFactory threadFactory)方法:
/** * Creates a thread pool that reuses a fixed number of threads * operating off a shared unbounded queue, using the provided * ThreadFactory to create new threads when needed.At any point, * at most { @code nThreads} threads will be active processing * tasks.If additional tasks are submitted when all threads are * active, they will wait in the queue until a thread is * available.If any thread terminates due to a failure during * execution prior to shutdown, a new one will take its place if * needed to execute subsequent tasks.The threads in the pool will * exist until it is explicitly { @link ExecutorService#shutdown * shutdown}. * * @param nThreads the number of threads in the pool * @param threadFactory the factory to use when creating new threads * @return the newly created thread pool * @throws NullPointerException if threadFactory is null * @throws IllegalArgumentException if { @code nThreads <= 0} */public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(),threadFactory);}这个方法与3.1中newFixedThreadPool(int nThreads)的方法的唯一区别就是,增加了threadFactory参数。在前面方法中,对于线程的创建是采用的默认实现Executors.defaultThreadFactory()。而在此方法中,可以根据需要自行定制。
3.2 newSingleThreadExecutor/** * Creates an Executor that uses a single worker thread operating * off an unbounded queue. (Note however that if this single * thread terminates due to a failure during execution prior to * shutdown, a new one will take its place if needed to execute * subsequent tasks.)Tasks are guaranteed to execute * sequentially, and no more than one task will be active at any * given time. Unlike the otherwise equivalent * { @code newFixedThreadPool(1)} the returned executor is * guaranteed not to be reconfigurable to use additional threads. * * @return the newly created single-threaded Executor */public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));}此方法将会创建指有一个线程和一个无届队列的线程池。需要注意的是,如果这个执行线程在执行过程中由于失败而终止,那么需要在执行后续任务的时候,用一个新的线程来替换。 那么这样一来,上述线程池就能确保任务的顺序性,并且在任何时间都不会有多个线程处于活动状态。与newFixedThreadPool(1)不同的是,使用newSingleThreadExecutor返回的ExecutorService不能被重新分配线程数量。而使用newFixExecutor(1)返回的ExecutorService,其活动的线程的数量可以重新分配。后面专门对这个问题进行详细分析。 newSingleThreadExecutor(ThreadFactory threadFactory) 方法:
/** * Creates an Executor that uses a single worker thread operating * off an unbounded queue, and uses the provided ThreadFactory to * create a new thread when needed. Unlike the otherwise * equivalent { @code newFixedThreadPool(1, threadFactory)} the * returned executor is guaranteed not to be reconfigurable to use * additional threads. * * @param threadFactory the factory to use when creating new * threads * * @return the newly created single-threaded Executor * @throws NullPointerException if threadFactory is null */public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(),threadFactory));}这个方法与3.3中newSingleThreadExecutor的区别就在于增加了一个threadFactory。可以自定义创建线程的方法。
3.3 newCachedThreadPool/** * Creates a thread pool that creates new threads as needed, but * will reuse previously constructed threads when they are * available.These pools will typically improve the performance * of programs that execute many short-lived asynchronous tasks. * Calls to { @code execute} will reuse previously constructed * threads if available. If no existing thread is available, a new * thread will be created and added to the pool. Threads that have * not been used for sixty seconds are terminated and removed from * the cache. Thus, a pool that remains idle for long enough will * not consume any resources. Note that pools with similar * properties but different details (for example, timeout parameters) * may be created using { @link ThreadPoolExecutor} constructors. * * @return the newly created thread pool */public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE,L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());}这个方法用来创建一个线程池,该线程池可以根据需要自动增加线程。以前的线程也可以复用。这个线程池通常可以提高很多执行周期短的异步任务的性能。对于execute将重用以前的构造线程。如果没有可用的线程,就创建一个 新的线程添加到pool中。秒内,如果该线程没有被使用,则该线程将会终止,并从缓存中删除。因此,在足够长的时间内,这个线程池不会消耗任何资源。可以使用ThreadPoolExecutor构造函数创建具有类似属性但是详细信息不同的线程池。 ?需要注意的是,这个方法创建的线程池,虽然队列的长度可控,但是线程的数量的范围是Integer.MAX_VALUE。这样的话,如果使用不当,同样存在OOM的风险。比如说,我们使用的每个任务的耗时比较长,任务的请求又非常快,那么这样势必会造成在单位时间内创建了大量的线程。从而造成内存溢出。 newCachedThreadPool(ThreadFactory threadFactory)方法:
/** * Creates a thread pool that creates new threads as needed, but * will reuse previously constructed threads when they are * available, and uses the provided * ThreadFactory to create new threads when needed. * @param threadFactory the factory to use when creating new threads * @return the newly created thread pool * @throws NullPointerException if threadFactory is null */public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { return new ThreadPoolExecutor(0, Integer.MAX_VALUE,L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(),threadFactory);}这个方法区别同样也是在于,增加了threadFactory可以自行指定线程的创建方式。
2.4 newScheduledThreadPool/** * Creates a thread pool that can schedule commands to run after a * given delay, or to execute periodically. * @param corePoolSize the number of threads to keep in the pool, * even if they are idle * @return a newly created scheduled thread pool * @throws IllegalArgumentException if { @code corePoolSize < 0} */public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize);}创建一个线程池,该线程池可以将任务在指定的延迟时间之后运行。或者定期运行。这个方法返回的是ScheduledThreadPoolExecutor。这个类是ThreadPoolExecutor的子类。在原有线程池的的基础之上,增加了延迟和定时功能。我们在后面分析了ThreadPoolExecutor源码之后,再来分析这个类的源码。 与之类似的方法:
/** * Creates a thread pool that can schedule commands to run after a * given delay, or to execute periodically. * @param corePoolSize the number of threads to keep in the pool, * even if they are idle * @param threadFactory the factory to use when the executor * creates a new thread * @return a newly created scheduled thread pool * @throws IllegalArgumentException if { @code corePoolSize < 0} * @throws NullPointerException if threadFactory is null */public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) { return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);}通过这个方法,我们可以指定threadFactory。自定义线程创建的方式。 同样,我们还可以只指定一个线程:
public static ScheduledExecutorService newSingleThreadScheduledExecutor() { return new DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1));}public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) { return new DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1, threadFactory));}上述两个方法都可以实现这个功能,但是需要注意的是,这两个方法的返回在外层包裹了一个包装类。
3.5 newWorkStealingPool这种方式是在jdk1.8之后新增的。我们先来看看其源码:
public LinkedBlockingQueue() { this(Integer.MAX_VALUE);}0这个方法实际上返回的是ForkJoinPool。该方法创建了一
LinkedBlockingQueue
LinkedBlockingDequeå¨ç»æä¸æå«äºä¹å讲解è¿çé»å¡éåï¼å®ä¸æ¯Queueèæ¯Dequeï¼ä¸æç¿»è¯æå端éåï¼å端éåæå¯ä»¥ä»ä»»æä¸ç«¯å ¥éæè åºéå ç´ çéåï¼å®ç°äºå¨éå头åéåå°¾çé«ææå ¥å移é¤LinkedBlockingDequeæ¯é¾è¡¨å®ç°ç线ç¨å®å ¨çæ ççåæ¶æ¯æFIFOãLIFOçå端é»å¡éåï¼å¯ä»¥å顾ä¸ä¹åçLinkedBlockingQueueé»å¡éåç¹ç¹ï¼æ¬è´¨ä¸æ¯ç±»ä¼¼çï¼ä½æ¯åæäºä¸åï¼
QueueåDequeçå ³ç³»æç¹ç±»ä¼¼äºåé¾è¡¨åååé¾è¡¨ï¼LinkedBlockingQueueåLinkedBlockingDequeçå é¨ç»ç¹å®ç°å°±æ¯åé¾è¡¨åååé¾è¡¨çåºå«ï¼å ·ä½å¯åèæºç ã
å¨ç¬¬äºç¹ä¸å¯è½æäºäººæäºçé®ï¼ä¸¤ä¸ªäºæ¥éåä¸ä¸ªäºæ¥éçåºå«å¨åªéï¼æ们å¯ä»¥èè以ä¸åºæ¯ï¼
A线ç¨å è¿è¡å ¥éæä½ï¼B线ç¨éåè¿è¡åºéæä½ï¼å¦ææ¯LinkedBlockingQueueï¼A线ç¨å ¥éè¿ç¨è¿æªç»æï¼å·²è·å¾éè¿æªéæ¾ï¼ï¼B线ç¨åºéæä½ä¸ä¼è¢«é»å¡çå¾ ï¼éä¸åï¼ï¼å¦ææ¯LinkedBlockingDequeåB线ç¨ä¼è¢«é»å¡çå¾ ï¼åä¸æéï¼A线ç¨å®ææä½æ继ç»æ§è¡
LinkedBlockingQueueä¸è¬çæä½æ¯è·åä¸æéå°±å¯ä»¥ï¼ä½æäºæä½ä¾å¦removeæä½ï¼åéè¦åæ¶è·å两æéï¼ä¹åçLinkedBlockingQueue讲解æ¾ç»è¯´æè¿
LinkedBlockingQueue ç±äºæ¯åé¾è¡¨ç»æï¼åªè½ä¸ç«¯æä½ï¼è¯»åªè½å¨å¤´ï¼ååªè½å¨å°¾ï¼å æ¤ä¸¤æéæçæ´é«ãLinkedBlockingDeque ç±äºæ¯åé¾è¡¨ç»æï¼ä¸¤ç«¯å¤´å°¾é½è½è¯»åï¼å æ¤åªè½ç¨ä¸æéä¿è¯ååæ§ã å½ç¶æçä¹å°±æ´ä½
ArrayBlockingQueue
LinkedBlockingQueue
é®é¢ï¼ä¸ºä»ä¹ArrayBlockingQueue ä¸è½ç¨ä¸¤æé
å 为ååºåï¼ArrayBlockingQueue çå ç´ éè¦åå移å¨ã
LinkedBlockingQueueå é¨ç±åé¾è¡¨å®ç°ï¼åªè½ä»headåå ç´ ï¼ä»tailæ·»å å ç´ ãæ·»å å ç´ åè·åå ç´ é½æç¬ç«çéï¼ä¹å°±æ¯è¯´LinkedBlockingQueueæ¯è¯»åå离çï¼è¯»åæä½å¯ä»¥å¹¶è¡æ§è¡ãLinkedBlockingQueueéç¨å¯éå ¥é(ReentrantLock)æ¥ä¿è¯å¨å¹¶åæ åµä¸ç线ç¨å®å ¨ã
LinkedBlockingQueueä¸å ±æä¸ä¸ªæé å¨ï¼åå«æ¯æ åæé å¨ãå¯ä»¥æå®å®¹éçæé å¨ãå¯ä»¥ç©¿å ¥ä¸ä¸ªå®¹å¨çæé å¨ãå¦æå¨å建å®ä¾çæ¶åè°ç¨çæ¯æ åæé å¨ï¼LinkedBlockingQueueçé»è®¤å®¹éæ¯Integer.MAX_VALUEï¼è¿æ ·åå¾å¯è½ä¼å¯¼è´éåè¿æ²¡æ满ï¼ä½æ¯å åå´å·²ç»æ»¡äºçæ åµï¼å å溢åºï¼ã
size()æ¹æ³ä¼éåæ´ä¸ªéåï¼æ¶é´å¤æ度为O(n),æ以æ好éç¨isEmtpy
1.å¤æå ç´ æ¯å¦ä¸ºnullï¼ä¸ºnullæåºå¼å¸¸
2.å é(å¯ä¸æé)
3.å¤æéåé¿åº¦æ¯å¦å°è¾¾å®¹éï¼å¦æå°è¾¾ä¸ç´çå¾
4.å¦æ没æé满ï¼enqueue()å¨éå°¾å å ¥å ç´
5.éåé¿åº¦å 1ï¼æ¤æ¶å¦æéåè¿æ²¡æ满ï¼è°ç¨signalå¤éå ¶ä»å µå¡éå
1.å é(ä¾æ§æ¯ReentrantLock)ï¼æ³¨æè¿éçéååå ¥æ¯ä¸åç两æé
2.å¤æéåæ¯å¦ä¸ºç©ºï¼å¦æ为空就ä¸ç´çå¾
3.éè¿dequeueæ¹æ³åå¾æ°æ®
3.åèµ°å ç´ åéåæ¯å¦ä¸ºç©ºï¼å¦æä¸ä¸ºç©ºå¤éå ¶ä»çå¾ ä¸çéå
åçï¼å¨éå°¾æå ¥ä¸ä¸ªå ç´ ï¼ å¦æéå没满ï¼ç«å³è¿åtrueï¼ å¦æéå满äºï¼ç«å³è¿åfalseã
åçï¼å¦æ没æå ç´ ï¼ç´æ¥è¿ånullï¼å¦ææå ç´ ï¼åºé
1ãå ·ä½å ¥éä¸åºéçåçå¾ï¼
å¾ä¸æ¯ä¸ä¸ªèç¹ååé¨å表示å°è£ çæ°æ®xï¼åè¾¹ç表示æåçä¸ä¸ä¸ªå¼ç¨ã
1.1ãåå§å
åå§åä¹åï¼åå§åä¸ä¸ªæ°æ®ä¸ºnullï¼ä¸headålastèç¹é½æ¯è¿ä¸ªèç¹ã
1.2ãå ¥é两个å ç´ è¿å
1.3ãåºéä¸ä¸ªå ç´ å
表é¢ä¸çï¼åªæ¯å°å¤´èç¹çnextæéæåäºè¦å é¤çx1.nextï¼äºå®ä¸è¿æ ·æè§çå°±å®å ¨å¯ä»¥ï¼ä½æ¯jdkå®é ä¸æ¯å°åæ¥çheadèç¹å é¤äºï¼èä¸è¾¹çå°çè¿ä¸ªheadèç¹ï¼æ£æ¯åååºéçx1èç¹ï¼åªæ¯å ¶å¼è¢«ç½®ç©ºäºã
2ãä¸ç§å ¥é对æ¯ï¼
3ãä¸ç§åºé对æ¯ï¼