2021-04-16 08:26  阅读(101)
文章分类:Java 零基础入门 文章标签:JavaJava 入门
©  原文作者:廖雪峰的网站 原文地址:https://www.liaoxuefeng.com/

Java 7开始引入了一种新的Fork/Join线程池,它可以执行一种特殊的任务:把一个大任务拆成多个小任务并行执行。

我们举个例子:如果要计算一个超大数组的和,最简单的做法是用一个循环在一个线程内完成:

    ┌─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┐
    └─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┘
    

还有一种方法,可以把数组拆成两部分,分别计算,最后加起来就是最终结果,这样可以用两个线程并行执行:

    ┌─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┐
    └─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┘
    ┌─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┐
    └─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┘
    

如果拆成两部分还是很大,我们还可以继续拆,用4个线程并行执行:

    ┌─┬─┬─┬─┬─┬─┐
    └─┴─┴─┴─┴─┴─┘
    ┌─┬─┬─┬─┬─┬─┐
    └─┴─┴─┴─┴─┴─┘
    ┌─┬─┬─┬─┬─┬─┐
    └─┴─┴─┴─┴─┴─┘
    ┌─┬─┬─┬─┬─┬─┐
    └─┴─┴─┴─┴─┴─┘
    

这就是Fork/Join任务的原理:判断一个任务是否足够小,如果是,直接计算,否则,就分拆成几个小任务分别计算。这个过程可以反复“裂变”成一系列小任务。

我们来看如何使用Fork/Join对大数据进行并行求和:

    import java.util.Random;
    import java.util.concurrent.*;
    ----
    public class Main {
        public static void main(String[] args) throws Exception {
            // 创建2000个随机数组成的数组:
            long[] array = new long[2000];
            long expectedSum = 0;
            for (int i = 0; i < array.length; i++) {
                array[i] = random();
                expectedSum += array[i];
            }
            System.out.println("Expected sum: " + expectedSum);
            // fork/join:
            ForkJoinTask<Long> task = new SumTask(array, 0, array.length);
            long startTime = System.currentTimeMillis();
            Long result = ForkJoinPool.commonPool().invoke(task);
            long endTime = System.currentTimeMillis();
            System.out.println("Fork/join sum: " + result + " in " + (endTime - startTime) + " ms.");
        }
    
        static Random random = new Random(0);
    
        static long random() {
            return random.nextInt(10000);
        }
    }
    
    class SumTask extends RecursiveTask<Long> {
        static final int THRESHOLD = 500;
        long[] array;
        int start;
        int end;
    
        SumTask(long[] array, int start, int end) {
            this.array = array;
            this.start = start;
            this.end = end;
        }
    
        @Override
        protected Long compute() {
            if (end - start <= THRESHOLD) {
                // 如果任务足够小,直接计算:
                long sum = 0;
                for (int i = start; i < end; i++) {
                    sum += this.array[i];
                    // 故意放慢计算速度:
                    try {
                        Thread.sleep(1);
                    } catch (InterruptedException e) {
                    }
                }
                return sum;
            }
            // 任务太大,一分为二:
            int middle = (end + start) / 2;
            System.out.println(String.format("split %d~%d ==> %d~%d, %d~%d", start, end, start, middle, middle, end));
            SumTask subtask1 = new SumTask(this.array, start, middle);
            SumTask subtask2 = new SumTask(this.array, middle, end);
            invokeAll(subtask1, subtask2);
            Long subresult1 = subtask1.join();
            Long subresult2 = subtask2.join();
            Long result = subresult1 + subresult2;
            System.out.println("result = " + subresult1 + " + " + subresult2 + " ==> " + result);
            return result;
        }
    }
    

观察上述代码的执行过程,一个大的计算任务0~2000首先分裂为两个小任务0~1000和1000~2000,这两个小任务仍然太大,继续分裂为更小的0~500,500~1000,1000~1500,1500~2000,最后,计算结果被依次合并,得到最终结果。

因此,核心代码SumTask继承自RecursiveTask,在compute()方法中,关键是如何“分裂”出子任务并且提交子任务:

    class SumTask extends RecursiveTask<Long> {
        protected Long compute() {
            // “分裂”子任务:
            SumTask subtask1 = new SumTask(...);
            SumTask subtask2 = new SumTask(...);
            // invokeAll会并行运行两个子任务:
            invokeAll(subtask1, subtask2);
            // 获得子任务的结果:
            Long subresult1 = subtask1.join();
            Long subresult2 = subtask2.join();
            // 汇总结果:
            return subresult1 + subresult2;
        }
    }
    

Fork/Join线程池在Java标准库中就有应用。Java标准库提供的java.util.Arrays.parallelSort(array)可以进行并行排序,它的原理就是内部通过Fork/Join对大数组分拆进行并行排序,在多核CPU上就可以大大提高排序的速度。

练习

使用Fork/Join

小结

Fork/Join是一种基于“分治”的算法:通过分解任务,并行执行,最后合并结果得到最终结果。

ForkJoinPool线程池可以把一个大任务分拆成小任务并行执行,任务类必须继承自RecursiveTaskRecursiveAction

使用Fork/Join模式可以进行并行计算以提高效率。

点赞(0)
版权归原创作者所有,任何形式转载请联系作者; Java 技术驿站 >> 使用ForkJoin
上一篇
使用CompletableFuture
下一篇
使用ThreadLocal