|
When the hardware processing power is not in accordance with Moore's Law, when the vertical development and choose the level of development of multi-core processors have been widely used. With the further development of future technology may occur hundreds of processing cores, but the existing program running on a multi-core processor and can not be promoted greater performance, the main bottleneck is the program itself is not concurrent processing capability strong, it can not reasonably use multi-core resources.
Existing treatment options from software start trying to use multiple threads, the program is to support multiple computing tasks at the same time, the multi-threaded processing scheme can be more significantly improved application performance in the case of less number of processors but we are more popular in the hardware multithreading mode, but this area has not very good results.
ForkJoin native multi-threaded parallel processing framework Java7 provided, the basic idea is to split the task into small big man, and finally a small task aggregating the results obtained. It is very similar to Hadoop MapReduce framework provides, but MapReduce tasks can compute for all nodes within a cluster, the cluster can take advantage of the ability to complete computing tasks. ForkJoin more like a stand-alone version of MapReduce.
Even without mapreduce, only the application itself the task decomposition and synthesis are also possible, but difficult to achieve considering on their own implementation may bring the complexity of large-scale, so that programmers need for a paradigm to deal with this a class of tasks. In a multithreaded process already has framework ACTOR models such as AKKA based and FORKJOIN is obvious for the realization of the task can be split feature requirements.
Its scene is: if an application can be decomposed into multiple sub-tasks, and combine multiple sub-tasks to get the final result will be able to answer, then it suitable for use FORK / JOIN model to achieve.
Fork / Join two or more classes to complete two things:
ForkJoinTask: we want to use ForkJoin frame, you must first create a ForkJoin task. It provides instructions for performing tasks in the fork () and join the operation mechanism, usually we do not directly inherit ForkjoinTask class, need only directly following the Cheng Qizai class.
1. RecursiveAction, did not return for the results of the task
2. RecursiveTask, for the return value of the task
ForkJoinPool: task to be performed by ForkJoinPool, dividing sub-tasks will be added to the current thread's deque work, go to the head of the queue. When a worker is no task gets a task from the other end of the queue worker threads.
ForkJoin framework uses a work-stealing ideas (work-stealing), the algorithm to steal from other tasks to perform queue, their workflow Pictured:
This algorithm reduces thread waits and competition.
Here is an example:
package com.inspur.jiyq.forkjoin.sum;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
public class CountTask extends RecursiveTask < Integer>
{
private static final long serialVersionUID = -3611254198265061729L;
public static final int threshold = 2;
private int start;
private int end;
public CountTask (int start, int end)
{
this.start = start;
this.end = end;
}
@Override
protected Integer compute ()
{
int sum = 0;
// If the task is sufficiently small computing tasks
boolean canCompute = (end - start) < = threshold;
if (canCompute)
{
for (int i = start; i < = end; i ++)
{
sum + = i;
}
}
else
{
// If the task is greater than the threshold value, it is split into two sub-task computing
int middle = (start + end) / 2;
CountTask leftTask = new CountTask (start, middle);
CountTask rightTask = new CountTask (middle + 1, end);
// Perform subtasks
leftTask.fork ();
rightTask.fork ();
// End Task waiting to be executed merge result
int leftResult = leftTask.join ();
int rightResult = rightTask.join ();
// Merge the subtasks
sum = leftResult + rightResult;
}
return sum;
}
public static void main (String [] args)
{
ForkJoinPool forkjoinPool = new ForkJoinPool ();
// Generate a computing task, calculate 1 + 2 + 3 + 4
CountTask task = new CountTask (1, 100);
// Perform a task
Future < Integer> result = forkjoinPool.submit (task);
try
{
System.out.println (result.get ());
}
catch (Exception e)
{
System.out.println (e);
}
}
}
Like this summation and sorting requirements can be achieved by FORKJOIN thought, but in actual use or to carry out the necessary performance tests to confirm the performance range.
In the above code, the definition of a cumulative task in compute method, it is determined whether the current value is less than a threshold value, and if it is then calculated, if not continue to split, and merge the subtasks intermediate results.
After the task is defined tasks, Fork / Join and provide a framework for the expansion of Executor thread to perform the task. |
|
|
|