本文共 2910 字,大约阅读时间需要 9 分钟。
import java.util.concurrent.ForkJoinPool;/*计算一千万个整数的累加值,一种采用fork/join,另外一种采用直接计算。 * 根据预算结果,发现SumTask的fork任务的阈值选择对并行算法的效率影响很大。 * 当阈值设为50,并行算法的耗时几乎是常规算法的10倍; * 当阈值设为500000,并行算法的耗时是常规算法的一半。 * 综上,并行算法并非适用所有的场景,如果算法不当,并行计算可能并不比普通单线程算法节约时间*/public class Test { private static final int ARRAY_LEN = 10000000; public void test() { ForkJoinPool fjpool = new ForkJoinPool(); RandomNumber random = new RandomNumber(); int[] array = new int[ARRAY_LEN]; long result = 0; long starttime = 0; long endtime = 0; long consumetime = 0; /*fill the array*/ for(int i = 0; i < ARRAY_LEN; i++) { //array[i] = random.getRandom(); array[i] = i+1; } /*start parallel computation*/ starttime = System.currentTimeMillis(); result = fjpool.invoke(new SumTask(array,0,array.length,"root")); endtime = System.currentTimeMillis(); consumetime = endtime-starttime; System.out.println("The fork/join result = "+result); System.out.println("Consume time in millis = "+consumetime); starttime = System.currentTimeMillis(); result = calculate(array); endtime = System.currentTimeMillis(); consumetime = endtime-starttime; System.out.println("The direct calculate result = "+result); System.out.println("Consume time in millis = "+consumetime); } private long calculate(int[] array) { long result = 0; for( int i = 0; i < array.length; i++ ) { result += array[i]; } return result; }}
import java.util.concurrent.ForkJoinPool;import java.util.concurrent.RecursiveTask;public class SumTask extends RecursiveTask{ static final int SEQUENTIAL_THRESHOLD = 1000000; private ForkJoinPool fjpool = new ForkJoinPool(); int low; int high; int[] array; public String sequence = null; SumTask(int[] arr, int lo, int hi,String sequence) { array = arr; low = lo; high = hi; this.sequence = sequence; } protected Long compute() { if(high - low <= SEQUENTIAL_THRESHOLD) { long sum = 0; for(int i=low; i < high; ++i) sum += array[i]; return sum; } else { int mid = low + (high - low) / 2; SumTask left = new SumTask(array, low, mid,sequence+"-1"); SumTask right = new SumTask(array, mid, high,sequence+"-2"); left.fork(); //long rightAns = right.compute(); right.fork(); long leftAns = left.join(); long rightAns = right.join(); /*因为right任务也fork了,这里需要新增等待执行结果*/ return leftAns + rightAns; } } /*执行过程总结: * 1、主线程调用invoke,因为worker-thread为0个,因此会调用signalWork加入一个线程,此线程执行root任务; * 2、root任务执行到left.fork(),调用ForkJoinWorkerThread.pushTask,此方法会将left任务加入到执行root任务的工作线程 * 的任务队列,并调用pool.signalWork加入新的worker-thread然后启动,此新的工作线程从运行root的线程任务队列中steal * 了root-1任务运行; * 3、同上,新的工作线程,执行root-2任务; * 4、最后,执行root任务的线程等待两个子任务的线程执行并返回,然后将结果处理返回给调用invoke的主线程,执行结束*/}
转载地址:http://gvhii.baihongyu.com/