皮皮网

【系统源码品牌】【算命 源码下载】【bootstrap项目源码】jdk blockingqueue源码

时间:2025-01-21 08:57:58 分类:热点 来源:HYPER解析源码

1.记一次CPU占用率较高问题排查
2.喜提JDK的源码BUG一枚!多线程的源码情况下请谨慎使用这个类的stream遍历。
3.java线程池(一):java线程池基本使用及Executors
4.LinkedBlockingQueue

jdk blockingqueue源码

记一次CPU占用率较高问题排查

       在项目开发调试过程中,源码我们遇到了服务组件CPU占用率较高的源码问题。在G内存、源码核CPU的源码系统源码品牌Linux环境下,服务组件在无数据处理情况下,源码CPU占用率维持在-%,源码引起关注。源码

       我们首先分析可能的源码原因,结合实际场景,源码判断可能是源码代码中有耗CPU的操作。接下来,源码我们分步骤进行排查:

       获取进程ID

       查找CPU使用率较高的源码线程,发现三个线程ID分别为、源码、。

       使用ps命令获取线程详细信息,输出至jstack.txt文件便于观察。

       转换操作系统线程ID为进制,定位到堆栈信息中线程ID=F4。

       定位到异常代码在UploadRunner类的run()方法内,while(true)循环获取阻塞队列元素进行消费。使用的是非阻塞方式poll()方法,导致线程自旋。

       排查发现,当前有三个线程在自旋空转,导致CPU占用率持续较高。调整为使用queue.take()方法,让消费线程阻塞获取元素。算命 源码下载

       验证结果,CPU占用率稳定在0%-1%之间,问题得到解决。

       通过分析JDK BlockingQueue阻塞队列的实现方式,了解到调用take()方法后,队列为空时线程会被阻塞等待,从而避免了CPU资源的大量占用。

       总结,通过仔细分析jvm运行原理、运用系统命令和排查工具,我们可以有效地定位和解决此类问题。面对复杂情况,关键在于深入理解原理并灵活应用工具。

喜提JDK的BUG一枚!多线程的情况下请谨慎使用这个类的stream遍历。

       在探讨问题之前,我们先回顾一下 LinkedBlockingQueue 的线程安全性。在传统的观点中,LinkedBlockingQueue 是线程安全的,因为它内部使用了 ReentrantLock。然而,就在 RocketMQ 的讨论版中,一个问题揭示了 LinkedBlockingQueue 在特定情况下的线程不安全性,引发了我们的好奇心。

       核心问题在于 LinkedBlockingQueue 的 stream 遍历方式,在多线程环境下可能出现死循环。我们通过一个简单的 demo 来深入分析这一现象。首先,引入了一个链接,bootstrap项目源码其中详细展示了如何在多线程环境下复现这一 Bug。

       在分析代码之前,让我们先明确 demo 的基本逻辑:创建了 个线程,每个线程不断调用 offer 和 remove 方法。主线程则通过 stream 对 queue 进行遍历,目标是找到队列中的第一个非空元素。这看似是一个简单的遍历操作,但事实并非如此。

       关键点在于 tryAdvance 方法,看似平凡的遍历操作隐藏了陷阱。当运行代码时,预期的输出并未出现,而是陷入了一个死循环,控制台仅输出了一行信息或交替输出几次后停止。

       我们的疑问指向了 JDK 版本,尤其是 JDK 8。通过替换为 JDK ,我们观察到交替输出的效果。这使得我们大胆推测,这可能是 JDK 8 版本的 Bug。为了验证这一假设,我们进行了详细的分析。

       通过线程 dump 文件,我们发现主线程始终处于可运行状态,似乎没有被锁阻塞。然而,从控制台的输出来看,它似乎处于阻塞状态。这一现象让我们联想到一个经典的dota助手源码场景:线程陷入死循环。

       通过深入源码分析,我们发现了死循环的根源。在 stream 遍历的关键方法 tryAdvance 中,存在一个 while 循环,其条件始终满足,导致死循环。而问题的核心在于移除队列头部元素的代码逻辑,当有其他线程不断调用 remove 方法时,可能会形成特定的节点结构,触发死循环。

       经过详细的分析,我们揭示了这一 Bug 的原理,并通过简化代码演示了整个过程。通过将实例代码简化,我们揭示了死循环是如何在多线程环境下产生的。这不仅有助于理解 Bug 的本质,也为后续的 Bug 修复提供了思路。

       为了验证解决方案的正确性,我们对比了 JDK 8 和 JDK 的源码差异。在 JDK 中,通过引入了一个名为 succ 的方法,成功解决了死循环问题。这一方法通过确保节点不会指向自身,从而避免了死循环的产生。

       通过这篇文章的分析,我们不仅揭示了 LinkedBlockingQueue 在特定条件下的线程不安全性,还探讨了如何通过升级 JDK 版本、避免使用 stream 遍历,以及使用 synchronized 修饰符等方式来规避此类问题。cwm recovery 源码同时,我们还延伸至其他数据结构,如 ConcurrentHashMap,讨论了它们在不同使用场景下的线程安全性问题。

       最后,我们再次强调在多线程环境下,LinkedBlockingQueue 的 stream 遍历方式可能存在一定的问题,可能会导致死循环。理解并解决这类 Bug,对于确保代码的健壮性和性能至关重要。

java线程池(一):java线程池基本使用及Executors

       @[toc] 在前面学习线程组的时候就提到过线程池。实际上线程组在我们的日常工作中已经不太会用到,但是线程池恰恰相反,是我们日常工作中必不可少的工具之一。现在开始对线程池的使用,以及底层ThreadPoolExecutor的源码进行分析。

1.为什么需要线程池

       我们在前面对线程基础以及线程的生命周期有过详细介绍。一个基本的常识就是,线程是一个特殊的对象,其底层是依赖于JVM的native方法,在jvm虚拟机内部实现的。线程与普通对象不一样的地方在于,除了需要在堆上分配对象之外,还需要给每个线程分配一个线程栈、以及本地方法栈、程序计数器等线程的私有空间。线程的初始化工作相对于线程执行的大多数任务而言,都是一个耗时比较长的工作。这与数据库使用一样。有时候我们连接数据库,仅仅只是为了执行一条很小的sql语句。但是在我们日常的开发工作中,我们的绝大部分工作内容,都会分解为一个个短小的执行任务来执行。这样才能更加合理的复用资源。这种思想就与我们之前提到的协程一样。任务要尽可能的小。但是在java中,任务不可能像协程那样拆分得那么细。那么试想,如果说,有一个已经初始化好的很多线程,在随时待命,那么当我们有任务提交的时候,这些线程就可以立即工作,无缝接管我们的任务请求。那么效率就会大大增加。这些个线程可以处理任何任务。这样一来我们就把实际的任务与线程本身进行了解耦。从而将这些线程实现了复用。 这种复用的一次创建,可以重复使用的池化的线程对象就被成为线程池。 在线程池中,我们的线程是可以复用的,不用每次都创建一个新的线程。减少了创建和销毁线程的时间开销。 同时,线程池还具有队列缓冲策略,拒绝机制和动态线程管理。可以实现线程环境的隔离。当一个线程有问题的时候,也不会对其他的线程造成影响。 以上就是我们使用线程池的原因。一句话来概括就是资源复用,降低开销。

2.java中线程池的实现

       在java中,线程池的主要接口是Executor和ExecutorService在这两个接口中分别对线程池的行为进行了约束,最主要的是在ExecutorService。之后,线程池的实际实现类是AbstractExecutorService类。这个类有三个主要的实现类,ThreadpoolExecutorService、ForkJoinPool以及DelegatedExecutorService。

       后面我们将对这三种最主要的实现类的源码以及实现机制进行分析。

3.创建线程的工厂方法Executors

       在java中, 已经给我们提供了创建线程池的工厂方法类Executors。通过这个类以静态方法的模式可以为我们创建大多数线程池。Executors提供了5种创建线程池的方式,我们先来看看这个类提供的工厂方法。

3.1 newFixedThreadPool/** * Creates a thread pool that reuses a fixed number of threads * operating off a shared unbounded queue.At any point, at most * { @code nThreads} threads will be active processing tasks. * If additional tasks are submitted when all threads are active, * they will wait in the queue until a thread is available. * If any thread terminates due to a failure during execution * prior to shutdown, a new one will take its place if needed to * execute subsequent tasks.The threads in the pool will exist * until it is explicitly { @link ExecutorService#shutdown shutdown}. * * @param nThreads the number of threads in the pool * @return the newly created thread pool * @throws IllegalArgumentException if { @code nThreads <= 0} */public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());}

       这个方法能够创建一个固定线程数量的无界队列的线程池。参数nthreads是最多可同时处理的活动的线程数。如果在所有线程都在处理任务的情况下,提交了其他的任务,那么这些任务将处于等待队列中。直到有一个线程可用为止。如果任何线程在关闭之前的执行过程中,由于失败而终止,则需要在执行后续任务的时候,创建一个新的线程来替换。线程池中的所有线程都将一直存在,直到显示的调用了shutdown方法。 上述方法能创建一个固定线程数量的线程池。内部默认的是使用LinkedBlockingQueue。但是需要注意的是,这个LinkedBlockingQueue底层是链表结构,其允许的最大队列长度为Integer.MAX_VALUE。

public LinkedBlockingQueue() { this(Integer.MAX_VALUE);}

       这样在使用的过程中如果我们没有很好的控制,那么就可能导致内存溢出,出现OOM异常。因此这种方式实际上已经不被提倡。我们在使用的过程中应该谨慎使用。 newFixedThreadPool(int nThreads, ThreadFactory threadFactory)方法:

/** * Creates a thread pool that reuses a fixed number of threads * operating off a shared unbounded queue, using the provided * ThreadFactory to create new threads when needed.At any point, * at most { @code nThreads} threads will be active processing * tasks.If additional tasks are submitted when all threads are * active, they will wait in the queue until a thread is * available.If any thread terminates due to a failure during * execution prior to shutdown, a new one will take its place if * needed to execute subsequent tasks.The threads in the pool will * exist until it is explicitly { @link ExecutorService#shutdown * shutdown}. * * @param nThreads the number of threads in the pool * @param threadFactory the factory to use when creating new threads * @return the newly created thread pool * @throws NullPointerException if threadFactory is null * @throws IllegalArgumentException if { @code nThreads <= 0} */public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(),threadFactory);}

       这个方法与3.1中newFixedThreadPool(int nThreads)的方法的唯一区别就是,增加了threadFactory参数。在前面方法中,对于线程的创建是采用的默认实现Executors.defaultThreadFactory()。而在此方法中,可以根据需要自行定制。

3.2 newSingleThreadExecutor/** * Creates an Executor that uses a single worker thread operating * off an unbounded queue. (Note however that if this single * thread terminates due to a failure during execution prior to * shutdown, a new one will take its place if needed to execute * subsequent tasks.)Tasks are guaranteed to execute * sequentially, and no more than one task will be active at any * given time. Unlike the otherwise equivalent * { @code newFixedThreadPool(1)} the returned executor is * guaranteed not to be reconfigurable to use additional threads. * * @return the newly created single-threaded Executor */public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));}

       此方法将会创建指有一个线程和一个无届队列的线程池。需要注意的是,如果这个执行线程在执行过程中由于失败而终止,那么需要在执行后续任务的时候,用一个新的线程来替换。 那么这样一来,上述线程池就能确保任务的顺序性,并且在任何时间都不会有多个线程处于活动状态。与newFixedThreadPool(1)不同的是,使用newSingleThreadExecutor返回的ExecutorService不能被重新分配线程数量。而使用newFixExecutor(1)返回的ExecutorService,其活动的线程的数量可以重新分配。后面专门对这个问题进行详细分析。 newSingleThreadExecutor(ThreadFactory threadFactory) 方法:

/** * Creates an Executor that uses a single worker thread operating * off an unbounded queue, and uses the provided ThreadFactory to * create a new thread when needed. Unlike the otherwise * equivalent { @code newFixedThreadPool(1, threadFactory)} the * returned executor is guaranteed not to be reconfigurable to use * additional threads. * * @param threadFactory the factory to use when creating new * threads * * @return the newly created single-threaded Executor * @throws NullPointerException if threadFactory is null */public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(),threadFactory));}

       这个方法与3.3中newSingleThreadExecutor的区别就在于增加了一个threadFactory。可以自定义创建线程的方法。

3.3 newCachedThreadPool/** * Creates a thread pool that creates new threads as needed, but * will reuse previously constructed threads when they are * available.These pools will typically improve the performance * of programs that execute many short-lived asynchronous tasks. * Calls to { @code execute} will reuse previously constructed * threads if available. If no existing thread is available, a new * thread will be created and added to the pool. Threads that have * not been used for sixty seconds are terminated and removed from * the cache. Thus, a pool that remains idle for long enough will * not consume any resources. Note that pools with similar * properties but different details (for example, timeout parameters) * may be created using { @link ThreadPoolExecutor} constructors. * * @return the newly created thread pool */public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE,L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());}

       这个方法用来创建一个线程池,该线程池可以根据需要自动增加线程。以前的线程也可以复用。这个线程池通常可以提高很多执行周期短的异步任务的性能。对于execute将重用以前的构造线程。如果没有可用的线程,就创建一个 新的线程添加到pool中。秒内,如果该线程没有被使用,则该线程将会终止,并从缓存中删除。因此,在足够长的时间内,这个线程池不会消耗任何资源。可以使用ThreadPoolExecutor构造函数创建具有类似属性但是详细信息不同的线程池。 ?需要注意的是,这个方法创建的线程池,虽然队列的长度可控,但是线程的数量的范围是Integer.MAX_VALUE。这样的话,如果使用不当,同样存在OOM的风险。比如说,我们使用的每个任务的耗时比较长,任务的请求又非常快,那么这样势必会造成在单位时间内创建了大量的线程。从而造成内存溢出。 newCachedThreadPool(ThreadFactory threadFactory)方法:

/** * Creates a thread pool that creates new threads as needed, but * will reuse previously constructed threads when they are * available, and uses the provided * ThreadFactory to create new threads when needed. * @param threadFactory the factory to use when creating new threads * @return the newly created thread pool * @throws NullPointerException if threadFactory is null */public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { return new ThreadPoolExecutor(0, Integer.MAX_VALUE,L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(),threadFactory);}

       这个方法区别同样也是在于,增加了threadFactory可以自行指定线程的创建方式。

2.4 newScheduledThreadPool/** * Creates a thread pool that can schedule commands to run after a * given delay, or to execute periodically. * @param corePoolSize the number of threads to keep in the pool, * even if they are idle * @return a newly created scheduled thread pool * @throws IllegalArgumentException if { @code corePoolSize < 0} */public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize);}

       创建一个线程池,该线程池可以将任务在指定的延迟时间之后运行。或者定期运行。这个方法返回的是ScheduledThreadPoolExecutor。这个类是ThreadPoolExecutor的子类。在原有线程池的的基础之上,增加了延迟和定时功能。我们在后面分析了ThreadPoolExecutor源码之后,再来分析这个类的源码。 与之类似的方法:

/** * Creates a thread pool that can schedule commands to run after a * given delay, or to execute periodically. * @param corePoolSize the number of threads to keep in the pool, * even if they are idle * @param threadFactory the factory to use when the executor * creates a new thread * @return a newly created scheduled thread pool * @throws IllegalArgumentException if { @code corePoolSize < 0} * @throws NullPointerException if threadFactory is null */public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) { return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);}

       通过这个方法,我们可以指定threadFactory。自定义线程创建的方式。 同样,我们还可以只指定一个线程:

public static ScheduledExecutorService newSingleThreadScheduledExecutor() { return new DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1));}public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) { return new DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1, threadFactory));}

       上述两个方法都可以实现这个功能,但是需要注意的是,这两个方法的返回在外层包裹了一个包装类。

3.5 newWorkStealingPool

       这种方式是在jdk1.8之后新增的。我们先来看看其源码:

public LinkedBlockingQueue() { this(Integer.MAX_VALUE);}0

       这个方法实际上返回的是ForkJoinPool。该方法创建了一

LinkedBlockingQueue

        LinkedBlockingDeque在结构上有别于之前讲解过的阻塞队列,它不是Queue而是Deque,中文翻译成双端队列,双端队列指可以从任意一端入队或者出队元素的队列,实现了在队列头和队列尾的高效插入和移除

        LinkedBlockingDeque是链表实现的线程安全的无界的同时支持FIFO、LIFO的双端阻塞队列,可以回顾下之前的LinkedBlockingQueue阻塞队列特点,本质上是类似的,但是又有些不同:

        Queue和Deque的关系有点类似于单链表和双向链表,LinkedBlockingQueue和LinkedBlockingDeque的内部结点实现就是单链表和双向链表的区别,具体可参考源码。

        在第二点中可能有些人有些疑问,两个互斥锁和一个互斥锁的区别在哪里?我们可以考虑以下场景:

        A线程先进行入队操作,B线程随后进行出队操作,如果是LinkedBlockingQueue,A线程入队过程还未结束(已获得锁还未释放),B线程出队操作不会被阻塞等待(锁不同),如果是LinkedBlockingDeque则B线程会被阻塞等待(同一把锁)A线程完成操作才继续执行

        LinkedBlockingQueue一般的操作是获取一把锁就可以,但有些操作例如remove操作,则需要同时获取两把锁,之前的LinkedBlockingQueue讲解曾经说明过

        LinkedBlockingQueue 由于是单链表结构,只能一端操作,读只能在头,写只能在尾,因此两把锁效率更高。LinkedBlockingDeque 由于是双链表结构,两端头尾都能读写,因此只能用一把锁保证原子性。 当然效率也就更低

        ArrayBlockingQueue

        LinkedBlockingQueue

        问题,为什么ArrayBlockingQueue 不能用两把锁

        因为取出后,ArrayBlockingQueue 的元素需要向前移动。

        LinkedBlockingQueue内部由单链表实现,只能从head取元素,从tail添加元素。添加元素和获取元素都有独立的锁,也就是说LinkedBlockingQueue是读写分离的,读写操作可以并行执行。LinkedBlockingQueue采用可重入锁(ReentrantLock)来保证在并发情况下的线程安全。

        LinkedBlockingQueue一共有三个构造器,分别是无参构造器、可以指定容量的构造器、可以穿入一个容器的构造器。如果在创建实例的时候调用的是无参构造器,LinkedBlockingQueue的默认容量是Integer.MAX_VALUE,这样做很可能会导致队列还没有满,但是内存却已经满了的情况(内存溢出)。

        size()方法会遍历整个队列,时间复杂度为O(n),所以最好选用isEmtpy

        1.判断元素是否为null,为null抛出异常

        2.加锁(可中断锁)

        3.判断队列长度是否到达容量,如果到达一直等待

        4.如果没有队满,enqueue()在队尾加入元素

        5.队列长度加1,此时如果队列还没有满,调用signal唤醒其他堵塞队列

        1.加锁(依旧是ReentrantLock),注意这里的锁和写入是不同的两把锁

        2.判断队列是否为空,如果为空就一直等待

        3.通过dequeue方法取得数据

        3.取走元素后队列是否为空,如果不为空唤醒其他等待中的队列

        原理:在队尾插入一个元素, 如果队列没满,立即返回true; 如果队列满了,立即返回false。

        原理:如果没有元素,直接返回null;如果有元素,出队

        1、具体入队与出队的原理图:

        图中每一个节点前半部分表示封装的数据x,后边的表示指向的下一个引用。

        1.1、初始化

        初始化之后,初始化一个数据为null,且head和last节点都是这个节点。

        1.2、入队两个元素过后

        1.3、出队一个元素后

        表面上看,只是将头节点的next指针指向了要删除的x1.next,事实上这样我觉的就完全可以,但是jdk实际上是将原来的head节点删除了,而上边看到的这个head节点,正是刚刚出队的x1节点,只是其值被置空了。

        2、三种入队对比:

        3、三种出队对比:

copyright © 2016 powered by 皮皮网   sitemap