18 Fork/Join框架
18.1 什么是Fork/Join
Fork/Join框架是一个实现了ExecutorService接口的多线程处理器,它专为那些可以通过递归分解成更细小的任务而设计,最大化的利用多核处理器来提高应用程序的性能。
与其他ExecutorService相关的实现相同的是,Fork/Join框架会将任务分配给线程池中的线程。而与之不同的是,Fork/Join框架在执行任务时使用了工作窃取算法。
fork在英文里有分叉的意思,join在英文里连接、结合的意思。顾名思义,fork就是要使一个大任务分解成若干个小任务,而join就是最后将各个小任务的结果结合起来得到大任务的结果。
Fork/Join的运行流程大致如下所示:

需要注意的是,图里的次级子任务可以一直分下去,一直分到子任务足够小为止。用伪代码来表示如下:
通过上面伪代码可以看出,我们通过递归嵌套的计算得到最终结果,这里有体现分而治之(divide and conquer) 的算法思想。
18.2 工作窃取算法
工作窃取算法指的是在多线程执行不同任务队列的过程中,某个线程执行完自己队列的任务后从其他线程的任务队列里窃取任务来执行。
工作窃取流程如下图所示:

值得注意的是,当一个线程窃取另一个线程的时候,为了减少两个任务线程之间的竞争,我们通常使用双端队列来存储任务。被窃取的任务线程都从双端队列的头部拿任务执行,而窃取其他任务的线程从双端队列的尾部执行任务。
另外,当一个线程在窃取任务时要是没有其他可用的任务了,这个线程会进入阻塞状态以等待再次“工作”。
18.3 Fork/Join的具体实现
前面我们说Fork/Join框架简单来讲就是对任务的分割与子任务的合并,所以要实现这个框架,先得有任务。在Fork/Join框架里提供了抽象类ForkJoinTask来实现任务。
18.3.1 ForkJoinTask
ForkJoinTask是一个类似普通线程的实体,但是比普通线程轻量得多。
fork()方法:使用线程池中的空闲线程异步提交任务
其实fork()只做了一件事,那就是把任务推入当前工作线程的工作队列里。
join()方法:等待处理任务的线程处理完毕,获得返回值。
来看下join()的源码:
我们在之前介绍过说Thread.join()会使线程阻塞,而ForkJoinPool.join()会使线程免于阻塞,下面是ForkJoinPool.join()的流程图: 
RecursiveAction和RecursiveTask
通常情况下,在创建任务的时候我们一般不直接继承ForkJoinTask,而是继承它的子类RecursiveAction和RecursiveTask。
两个都是ForkJoinTask的子类,RecursiveAction可以看做是无返回值的ForkJoinTask,RecursiveRask是有返回值的ForkJoinTask。
此外,两个子类都有执行主要计算的方法compute(),当然,RecursiveAction的compute()返回void,RecursiveTask的compute()有具体的返回值。
18.3.2 ForkJoinPool
ForkJoinPool是用于执行ForkJoinTask任务的执行(线程)池。
ForkJoinPool管理着执行池中的线程和任务队列,此外,执行池是否还接受任务,显示线程的运行状态也是在这里处理。
我们来大致看下ForkJoinPool的源码:
WorkQueue
双端队列,ForkJoinTask存放在这里。
当工作线程在处理自己的工作队列时,会从队列尾取任务来执行(LIFO);如果是窃取其他队列的任务时,窃取的任务位于所属任务队列的队首(FIFO)。
ForkJoinPool与传统线程池最显著的区别就是它维护了一个工作队列数组(volatile WorkQueue[] workQueues,ForkJoinPool中的每个工作线程都维护着一个工作队列)。
runState
ForkJoinPool的运行状态。SHUTDOWN状态用负数表示,其他用2的幂次表示。
18.4 Fork/Join的使用
上面我们说ForkJoinPool负责管理线程和任务,ForkJoinTask实现fork和join操作,所以要使用Fork/Join框架就离不开这两个类了,只是在实际开发中我们常用ForkJoinTask的子类RecursiveTask 和RecursiveAction来替代ForkJoinTask。
下面我们用一个计算斐波那契数列第n项的例子来看一下Fork/Join的使用:
斐波那契数列数列是一个线性递推数列,从第三项开始,每一项的值都等于前两项之和:
1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89······
如果设f(n)为该数列的第n项(n∈N*),那么有:f(n) = f(n-1) + f(n-2)。
上面例子在本机的输出:
需要注意的是,上述计算时间复杂度为O(2^n),随着n的增长计算效率会越来越低,这也是上面的例子中n不敢取太大的原因。
此外,也并不是所有的任务都适合Fork/Join框架,比如上面的例子任务划分过于细小反而体现不出效率,下面我们试试用普通的递归来求f(n)的值,看看是不是要比使用Fork/Join快:
普通递归的例子输出:
通过输出可以很明显的看出来,使用普通递归的效率都要比使用Fork/Join框架要高很多。
这里我们再用另一种思路来计算:
上面例子在笔者所用电脑的输出为:
这里耗时为0不代表没有耗时,是表明这里计算的耗时几乎可以忽略不计,大家可以在自己的电脑试试,即使是n取大很多量级的数据(注意int溢出的问题)耗时也是很短的,或者可以用System.nanoTime()统计纳秒的时间。
为什么在这里普通的递归或循环效率更快呢?因为Fork/Join是使用多个线程协作来计算的,所以会有线程通信和线程切换的开销。
如果要计算的任务比较简单(比如我们案例中的斐波那契数列),那当然是直接使用单线程会更快一些。但如果要计算的东西比较复杂,计算机又是多核的情况下,就可以充分利用多核CPU来提高计算速度。
另外,Java 8 Stream的并行操作底层就是用到了Fork/Join框架,下一章我们将从源码及案例两方面介绍Java 8 Stream的并行操作。
参考资料
Last updated
Was this helpful?