大家好,我是树哥。,本文将从一个简单的例子出发,与大家解释为啥要有 ForkJoinPool 的存在。接着向大家介绍 ForkJoinPool 的基本信息及使用,最后讲解 ForkJoinPool 的基本原理。,对于线程池来说,我们经常使用的是 ThreadPoolExecutor,可以用来提升任务处理效率。一般情况下,我们使用 ThreadPoolExecutor 的时候,各个任务之间都是没有联系的。但在某些特殊情况下,我们处理的任务之间是有联系的,例如经典的 Fibonacci 算法就是其中一种情况。,对于 Fibonacci 数列来说,我们知道 F (N) = F (N-1) + F (N-2)。当前数值的结果,都依赖后面几个数值的结果。这时候如果用 ThreadPoolExecutor 貌似就无法解决问题了。虽然我们可以单线程的递归算法,则其计算速度较慢,并且无法进行并行计算,无法发挥 CPU 多核的优势。,ForkJoinPool 就是设计用来解决父子任务有依赖的并行计算问题的。 类似于快速排序、二分查找、集合运算等有父子依赖的并行计算问题,都可以用 ForkJoinPool 来解决。对于 Fibonacci 数列问题,如果用 ForkJoinPool 来实现,其实现代码为:,如上面代码所示,我们定义了一个 Fibonacci 类,继承了 RecursiveTask 抽象类。在 Fibonacci 类中,我们定义了拆分逻辑,并调用了 join () 等待子线程执行结果。运行程序,我们会得到如下的结果:,上面代码中提到的 fork () 和 join () 是 ForkJoinPool 提供的 API 接口,主要用于执行任务以及等待子线程结果。关于其详细用法,我们稍后会讲到。,除了用于处理父子任务有依赖的情形,其实 ForkJoinPool 也可以用于处理需要获取子任务执行结果的场景。 例如:我们要计算 1 到 1 亿的和,为了加快计算的速度,我们自然想到算法中的分治原理,将 1 亿个数字分成 1 万个任务,每个任务计算 1 万个数值的综合,利用 CPU 的并发计算性能缩短计算时间。,因为 ThreadPoolExecutor 也可以通过 Future 获取执行结果,因此 ThreadPoolExecutor 也是可行的。这时候我们有两种实现方式,一种是用 ThreadPoolExecutor 实现,一种是用 ForkJoinPool 实现。下面我们将这两种方式都实现一下,看看这两种实现方式有什么不同。,无论哪种实现方式,其大致思路都是:,我们先使用 ThreadPoolExecutor 实现。,首先,定义一个 Calculator 接口,表示计算数字总和这个动作,如下所示。,接着,我们定义一个使用 ThreadPoolExecutor 线程池实现的类,如下所示。,如上面代码所示,我们实现了一个计算单个任务的类 SumTask,在该类中对数值进行累加。其次,我们在 sumUp () 方法中,对 1 亿的数字进行拆分,接着扔给线程池计算,最后阻塞等待计算结果,最终累加起来。,我们运行上面的代码,可以得到顺利得到最终结果,如下所示。,接着我们使用 ForkJoinPool 来实现。,我们首先实现 SumTask 继承 RecursiveTask 抽象类,并在 compute () 方法中定义拆分逻辑及计算。最后在爱 sumUp () 方法中调用 pool 方法进行计算,代码如下所示。,运行上面的代码,结果为:,对比 ThreadPoolExecutor 和 ForkJoinPool 这两者的实现,可以发现它们都有任务拆分的逻辑,以及最终合并数值的逻辑。但 ForkJoinPool 相比 ThreadPoolExecutor 来说,做了一些实现上的封装,例如:,因此对于没有父子任务依赖,但是希望获取到子任务执行结果的并行计算任务,也可以使用 ForkJoinPool 来实现。在这种情况下,使用 ForkJoinPool 实现更多是代码实现方便,封装做得更加好。,使用 ForkJoinPool 来进行并行计算,主要分为两步:,首先,我们需要定义一个 RecursiveTask 或 RecursiveAction 的子类,然后再该类的 compute () 方法中定义拆分逻辑和计算逻辑。 这两个抽象类的区别在于:前者有返回值,后者没有返回值。例如前面讲到的 1 到 1 亿的叠加问题,其定义的 RecursiveTask 实现类 SumTask 的代码如下:,对于 compute () 方法的实现,核心是想清楚:怎么拆分成子任务?什么时候结束拆分?,接着,初始化 ForkJoinPool 线程池,初始化计算任务,最后将任务丢入线程池中。,通过上面两步操作,我们就完成了一个 ForkJoinPool 任务代码的编写。,ForkJoinPool 的设计思想是分治算法,即将任务不断拆分(fork)成更小的任务,最终再合并(join)各个任务的计算结果。通过这种方式,可以充分利用 CPU 资源,再结合工作窃取算法(worksteal)整体提高执行效率。其简单的流程如下图:,,图片来源于思否用户「日拱一兵」,从图中可以看出 ForkJoinPool 要先执行完子任务才能执行上一层任务。因此 ForkJoinPool 最适合有父子任务依赖的场景,其次就是需要获取子任务执行结果的场景。比如:Fibonacci 数列、快速排序、二分查找等。,源码实现,ForkJoinPool 的主要实现类为:ForkJoinPool 和 ForkJoinTask 抽象类。,ForkJoinTask 实现了 Future 接口,可以用于获取处理结果。ForkJoinTask 有两个抽象子类:RecursiveAction 和 RecursiveTask 抽象类,其区别在于前者没有返回值,后者有返回值,其类图如下所示。,,图片来源于思否用户「日拱一兵」,ForkJoinPool 则是具体的逻辑实现,由于暂时没有应用场景,就不了解这么深了,这里就不深入解析了。,感兴趣的朋友可以参考这篇文章:ForkJoinPool 大型图文现场(一阅到底 vs 直接收藏) – SegmentFault 思否。,窃取算法,我们知道 ForkJoinPool 的父子任务之间是有依赖关系的,那么 ForkJoinPool 是如何实现的呢?答案是:利用不同任务队列执行。 在 ForkJoinPool 中有一个数组形式的成员变量 workQueue[],其对应一个队列数组,每个队列对应一个消费线程。丢入线程池的任务,根据特定规则进行转发。,,图片来源于思否用户「日拱一兵」,这样就有一个问题:有些队列可能任务比较多,有些队列任务比较少,这样就会导致不同线程负载不一样,整体不够高效,怎么办呢?,答案是:利用窃取算法,空闲的线程从尾部去消费其他队列的任务。,一般情况下,线程获取自己队列中的任务是 LIFO(Last Input First Output 后进先出)的方式,即类似于栈的操作方式。如下图所示,首先放入队列的时候先将任务 Push 进队列的头部(top),之后消费的时候在 pop 出队列头部(top)。,,图片来源于思否用户「日拱一兵」,而当某个线程对应的队列空闲时,该线程则去队列的底部(base)窃取(poll)任务到自己的队列,然后进行消费。那问题来了:为什么不从头部(top)获取任务,而要从底部(base)获取任务呢? 那是为了避免冲突!如果两个线程同时从顶部获取任务,那就会有多线程的冲突问题,就需要加锁操作,从而降低了执行效率。,
© 版权声明
文章版权归作者所有,未经允许请勿转载。