Fork/Join Framework - Parallel programming in java - Detailed explanation with full program


In this thread concurrency tutorial we will learn what is Fork/Join Framework in java with program and examples. How Fork/Join Framework improves program performance by using parallel programming. What is Divide-and-conquer in Fork/Join framework. What is ForkJoinPool in java, what is work-stealing approach. What is ForkJoinTask in java, RecursiveAction in java, RecursiveTask in java, what are Similarity and Difference between RecursiveAction and RecursiveTask in java, Example to demonstrate RecursiveTask using fork() and join() methods in java. What are Application of Fork/Join framework in real world.


Contents of page :
  • What is Fork/Join Framework in java ?
  • The Fork/Join Framework improves program performance in following ways >
  • Difference between traditional multithreading and parallel programming?
  • Divide-and-conquer in Fork/Join framework ?
  • ForkJoinPool in java
    • ForkJoinPool constructors in java >
      • ForkJoinPool( )
      • ForkJoinPool(int parallelism)
    • ForkJoinPool important methods in java >
      • <T> T invoke(ForkJoinTask<T> task)
      • void execute(ForkJoinTask<?> task)  
      • submit() method comes in 4 different forms.
      •  int getParallelism()
      • void shutdown()
    • ForkJoinPool features >
    • work-stealing approach

  • ForkJoinTask<V> in java
    • ForkJoinTask important methods in java >
      • ForkJoinTask<V> fork( )
  • V join( )
  • In short, about fork( ) and join( )
  • V invoke( )
  • static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2)
  • static void invokeAll(ForkJoinTask<?> … tasks)

    • Some other important methods for checking status of submitted task -
  • boolean isDone()
  • boolean isCompletedNormally()
  • boolean isCompletedAbnormally()
  • boolean isCancelled()

  • RecursiveAction in java
    • protected abstract void compute()
  • RecursiveTask<V> in java
    • protected abstract V compute()
  • Similarity and Difference between RecursiveAction and RecursiveTask in java
  • Example/ Program to demonstrate usage of RecursiveAction (submitted task does not return result) in java >
  • Let’s summarize above (i.e. ForkJoinPool() Vs ForkJoinPool(2)) in tabular form >
  • Example/ Program to demonstrate usage of RecursiveTask (submitted task returns result), using fork() and join() methods in java >
  • Application of Fork/Join framework in real world >


What is Fork/Join Framework in java ?

Fork/Join Framework has been added in JDK 7 and is defined in the java.util.concurrent package.

Fork/Join framework enables parallel programming. Parallel programming means taking advantage two or more processors (multicore) in computers.  Parallel programming improves program performance.

The Fork/Join Framework improves program performance in following ways >
  • Fork/Join framework makes use of multiple processors available in computer. Hence enabling parallel processing, and
  • It managing whole life cycle of Threads.


Difference between traditional multithreading and parallel programming?
MultiThreading primarily was designed to work with single CPU and utilize idle time of CPU. If two or more processors are there multithreading won’t be able to utilize multi processors but parallel programing using Fork/Join framework can utilize multiple processors available in computer.


The Fork/Join framework recursively divide a task into smaller subtasks until subtask isn’t small enough to be solved independently. This is also known as divide-and-conquer strategy.

Divide-and-conquer in Fork/Join framework ?
The divide-and-conquer strategy recursively divides a task into smaller subtasks until  subtask isn’t small enough to be solved independently.



ForkJoinPool in java
ForkJoinPool implements ExecutorService framework. The execution of ForkJoinTasks takes place within a ForkJoinPool, which also manages the execution of the tasks in java.

ForkJoinPool constructors >
  • ForkJoinPool( )
  • Creates a pool.
  • level of parallelism = number of processors available in the system
  • ForkJoinPool(int parallelism)
  • The parallelism is the level of parallelism. Its value must be greater than 0 and must not be more than number of processors in system.
  • level of parallelism determines the number of threads that can execute simultaneously. As a number of threads are determined it also determines number of tasks that could be executed parallely.

ForkJoinPool important methods in java >
After you have created an instance of ForkJoinPool, you can start a task in a number of different ways. The first task started is the main task. Main task begins subtasks that are also managed by the pool. Different methods for starting tasks have been discussed below >
  • <T> T invoke(ForkJoinTask<T> task)
This method starts the task and returns the result of the task.  Calling code waits until method returns in java.
  • void execute(ForkJoinTask<?> task)
The execute() method can be used to start a task without waiting for its completion.
This method starts the task. Calling code continues its execution asynchronously and does not waits for method completion like in invoke method in java.
  •  submit() method comes in 4 different forms.
submit() method can also be used for submitting task in java.


  • int getParallelism()
  The method returns level of parallelism i.e. number of processors available in the system in java.

  • void shutdown()
Initiates shutdown, previously submitted tasks are executed, but no new tasks will be accepted in java.

  • List<Runnable> shutdownNow()
  • attempts to stop all actively executing tasks,
  • submitted tasks may or may not execute.
  • awaiting tasks will never execute, and
  • method cancels both existing and unexecuted tasks, so it returns empty list.



ForkJoinPool features in java >

  • work-stealing approach = ForkJoinPool uses work-stealing approach for managing threads. Each thread in ForkJoinPool maintains a queue of tasks. If one thread’s queue is empty, it can take task from another thread. This overall improves the program/applications performance in java.

  • ForkJoinPool uses daemon threads. Daemon threads are low priority threads which runs intermittently in background, daemon threads die after all other user threads dies. So, we need to shutdown ForkJoinPool explicitly. We may use ForkJoinPool’s shutdown() method for shutting pool if required in java.

  • A ForkJoinPool can execute more tasks than its level of parallelism. Setting level of parallelism does not guarantee 100% that only that much number of tasks will be managed simultaneously.


ForkJoinTask<V> in java
ForkJoinTask is abstract class for tasks that run within a ForkJoinPool.  
ForkJoinTask<V> is an abstract class that defines a task that can be managed by a ForkJoinPool.
The V specifies the result type of the task.
Threads managed by ForkJoinPool executes ForkJoinTasks. Small number of threads are used to serve large number of tasks.

ForkJoinTask important methods in java >

ForkJoinTask core methods are fork( ) and join()
  • ForkJoinTask<V> fork( )
    The fork( ) method submits the task for asynchronous execution, means that the thread that calls fork( ) method to submit task continues to run. Task are executed in the compute() method, which is running within a ForkJoinPool in java.

  • V join( )
The join( ) method waits for task completion on which it is called. The method returns result of the task in java.
  • In short, about fork( ) and join( ) are used for starting one or more new tasks and then wait for them to complete in java.

  • V invoke( )
The invoke() method combines the functionality of fork() and join() methods. invoke() submits the task and waits for completion of submitted task in java.
The method returns result of task in java.

  • static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2)
invokeAll() method  submits t1 and t2 and waits for completion of t1 and t1.
  • static void invokeAll(ForkJoinTask<?> … tasks)
invokeAll() method submits list of tasks i.e. tasks and waits for completion of all tasks in list in java.

The invokeAll( ) method can only be called from within the overridden compute() method of another ForkJoinTask, which is running within a ForkJoinPool in java.

Some other important methods for checking status of submitted task in java -
  • boolean isDone() method returns true if a task completes in java.

  • boolean isCompletedNormally() method returns true if a task completed normally without cancellation or without throwing any exception in java.

  • boolean isCompletedAbnormally() returns true if a task completed abnormally either by cancellation or by throwing any exception in java.

  • boolean isCancelled() returns true if the task was cancelled in java.



RecursiveAction in java
RecursiveAction is subclass of ForkJoinTask. This submits a task and does not return a result in java.
Most important method of RecursiveAction is compute() method in java.
protected abstract void compute()
All computations by tasks are performed inside this method in java.
  • protected > it can be called only by methods of its class or subclass in java.
  • abstract > First concrete class which Implements RecursiveAction must override this abstract method in java.
  • void > does not return a result of submitted task in java.
RecursiveAction is used to implement a recursive, divide-and-conquer strategy for tasks in java.

RecursiveTask<V> in java
RecursiveTask is subclass of ForkJoinTask. This submits a task and  returns a result.
The V specifies the result type of the task.
Most important method of RecursiveAction is compute() method.
protected abstract V compute()
All computations by tasks are performed inside this method.
  • protected > it can be called only by methods of its class or subclass.
  • abstract > First concrete class which Implements RecursiveAction must override this abstract method.
  • V > returns a result of submitted task specified by V.
RecursiveTask is used to implement a recursive, divide-and-conquer strategy for tasks.


Similarity and Difference between RecursiveAction and RecursiveTask in java

Difference between RecursiveAction and RecursiveTask in java

RecursiveAction
RecursiveTask<V>
This submits a task and does not return a result.
This submits a task and  returns a result.
Definition of compute method
protected abstract void compute()
protected abstract V compute()
The V specifies the result type of the task.

Similarity between RecursiveAction and RecursiveTask in java
Both extends ForkJoinPool.
All computations by tasks are performed inside compute() method in java.


Example/ Program to demonstrate usage of RecursiveAction (submitted task does not return result) in java >

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
/*
* MyRecursiveAction extends  RecursiveAction,
* RecursiveAction extends ForkJoinTask.
*/
class MyRecursiveAction extends RecursiveAction{
  
   //Part of array on which computation will be performed.
   int start;
   int end;
  
   //Array on which computation will be done recursively.
   long[] numberAr;
  
  
   public MyRecursiveAction(int start, int end, long[] numberAr) {  
          this.start=start;
          this.end=end;
          this.numberAr=numberAr;
   }
  
   /* computation will be performed in this method
    * and method won't return any result.
    */
   @Override
   protected void compute() {
  
          /* We divide array into small arrays, as small as minimumProcessingSize.
          * So that each processor could efficiently process smaller array, using this
          * approach enables work-stealing approach to comes into picture.
          */
          int minimumProcessingSize=100;
         
          //Array is small enough to be processed, we need not to divide array.
          if(end-start < minimumProcessingSize){
                 for (int i = start; i < end; i++) {
                       numberAr[i]=numberAr[i]*numberAr[i];
                 }
          }
          //divide array in small arrays.
          else {
                 int mid= (start+end)/2;
                 invokeAll(new MyRecursiveAction(start, mid, numberAr),
                              new MyRecursiveAction(mid, end, numberAr));
          }
   }
}






/** Copyright (c), AnkitMittal JavaMadeSoEasy.com */
public class ForkJoinExample {
   public static void main(String[] args) {
          ForkJoinPool forkJoinPool=new ForkJoinPool();
          long startNanoSec=0;
          long endNanoSec=0;
         
          long[] numberAr=new long[100000];
          for(int i=0;i<100000;i++){
                 numberAr[i]=i;
          }
          System.out.print("original array : ");
          for(int i=0;i<10;i++){
                 System.out.print(numberAr[i]+" ");
          }
         
          MyRecursiveAction task=new MyRecursiveAction(0,numberAr.length,numberAr);
         
          startNanoSec=System.nanoTime();
          forkJoinPool.invoke(task);
          endNanoSec=System.nanoTime();
         
          System.out.print("\narray after being processed recursively "
                       + "using RecursiveAction : ");
          for(int i=0;i<10;i++){
                 System.out.print(numberAr[i]+" ");
          }
         
          System.out.println();
          System.out.println("Level of Parallelism > "+
                                            forkJoinPool.getParallelism());
          System.out.print("Time taken to complete task : "+
                                            (endNanoSec-startNanoSec)+" nanoSeconds");
         
         
         
   }
}
/*OUTPUT
original array : 0 1 2 3 4 5 6 7 8 9
array after being processed recursively using RecursiveAction : 0 1 4 9 16 25 36 49 64 81
Level of Parallelism > 4
Time taken to complete task : 12998811  nanoSeconds
*/

In above program we initialized array of 100000 elements, square of each element was calculated by passing array to MyRecursiveAction class’s compute() method.

All the computation was performed in compute() method, array was divided into smaller arrays until it wasn’t small enough to be processed independently.

Program was executed on quad core processor. So, by default there were 4 processors to execute the program in java.

For determining level of parallelism getParallelism() method of forkJoinPool was used.

In the above program,
ForkJoinPool forkJoinPool=new ForkJoinPool() is used. So, in this case
Level of parallelism = number of processors available in the system. i.e. 4, and program completed in 12998811 nanoSeconds.

When above program was executed with
ForkJoinPool forkJoinPool=new ForkJoinPool(2) ->
Level of parallelism = 2
And program completed in 22109812 nanoSeconds.
Output of the program was
/*
original array : 0 1 2 3 4 5 6 7 8 9
array after being processed recursively using RecursiveAction : 0 1 4 9 16 25 36 49 64 81
Level of Parallelism > 2
Time taken to complete task : 22109812 nanoSeconds
*/


Let’s summarize above (i.e. ForkJoinPool() Vs ForkJoinPool(2)) in tabular form >

Constructor used
In the above program,
ForkJoinPool forkJoinPool=new ForkJoinPool() is used.
When above program was executed with
ForkJoinPool forkJoinPool=new ForkJoinPool(2)
Level of parallelism
Level of parallelism = number of processors available in the system. i.e. 4, (As program was executed on quad core processor.)
Level of parallelism = 2, i.e. no of processors is 2 (As we have passed 2 in constructor)
Time taken by program to complete its execution
program completed in 12998811 nanoSeconds. (4 processors executed program in less time as compared to 2 processors)
And program completed in 22109812 nanoSeconds. (2 processors were used to execute program)





Example/ Program to demonstrate usage of RecursiveTask (submitted task returns result), using fork() and join() methods in java >
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
/*
* MyRecursiveTask extends RecursiveTask,
* RecursiveTask extends ForkJoinTask.
*/
class MyRecursiveTask extends RecursiveTask<Long>{
  
   //Part of array on which computation will be performed.
   int start;
   int end;
  
   //Array on which computation will be done recursively.
   long[] numberAr;
  
  
   public MyRecursiveTask(int start, int end, long[] numberAr) {
          this.start=start;
          this.end=end;
          this.numberAr=numberAr;
   }
  
   /* computation will be performed in this method
    * and method will return computed result.
    */
   @Override
   protected Long compute() {
  
          /* We divide array into small arrays, as small as minimumProcessingSize.
          * So that each processor could efficiently process smaller array, using this
          * approach enables work-stealing approach to comes into picture.
          */
          int minimumProcessingSize=100;
         
          long sum=0;
         
          //Array is small enough to be processed, we need not to divide array.
          if(end-start < minimumProcessingSize){
                 for (int i = start; i < end; i++) {
                       sum +=numberAr[i];
                 }
          }
          //divide array in small arrays.
          else {
                 int mid= (start+end)/2;
                
                 MyRecursiveTask task1 = new MyRecursiveTask(start, mid, numberAr);
                 MyRecursiveTask task2 = new MyRecursiveTask(mid, end, numberAr);
                
                 //We will submit subTasks by using fork() method,
                 //fork() method submits the tasks asynchronously, i.e. it
                 //won't wait for tasks to finish.
                 task1.fork();
                 task2.fork();
                
                 //join() method waits for subtask to return result.
                 //Once task will complete it will return and result will be
                 //available. Then we will sum up the result returned by two tasks.
                 sum= task1.join() + task2.join();
          }
         
          return sum;
   }
}






public class ForkJoinExample {
   public static void main(String[] args) {
          ForkJoinPool forkJoinPool=new ForkJoinPool();
         
          long[] numberAr=new long[100000];
          for(int i=0;i<100000;i++){
                 numberAr[i]=i;
          }
  
          MyRecursiveTask task=new MyRecursiveTask(0,numberAr.length,numberAr);
          //store returned result in sum variable.
          long sum = forkJoinPool.invoke(task);
          System.out.println("sum of 100000 elements returned by compute() method = "+sum);
         
         
         
   }
}
/*OUTPUT
original array : 0 1 2 3 4 5 6 7 8 9
array after being processed recursively using RecursiveTask : 0 1 4 9 16 25 36 49 64 81
Level of Parallelism > 4
Time taken to complete task : 12998811  nanoSeconds
*/

In the above program, we submitted subTasks by using fork() method, fork() method submitted the tasks asynchronously, i.e. it didn’t wait for tasks to finish.
task1.fork();
task2.fork();
join() method waited for subtask to return result. Once task completed it returned and result was available. Then we summed up the result returned by two tasks.  
sum= task1.join() + task2.join();

We used divide-and-conquer strategy, recursively we divided a array into smaller subarrays until  subarray wasn’t small enough to be solved independently in java.



Application of Fork/Join framework in real world >
We can use Fork/Join framework for calculating sum of array of 100000 or even may be more numbers. Fork/Join framework uses divide-and-conquer strategy for enabling parallel programming. Divide-and-conquer strategy recursively divides a array into smaller subarrays until  subarray isn’t small enough to be solved independently.
Also,  ForkJoinPool uses work-stealing approach for managing threads. Each thread in ForkJoinPool maintains a queue of tasks. If one thread’s queue is empty, it can take task from another thread. This overall improve the program's performance.
Please refer above section for program.

Summary>
In this thread concurrency tutorial we learned what is Fork/Join Framework in java with program and examples. How Fork/Join Framework improves program performance by using parallel programming. What is Divide-and-conquer in Fork/Join framework. What is ForkJoinPool in java, what is work-stealing approach. What is ForkJoinTask in java, RecursiveAction in java, RecursiveTask in java, what are Similarity and Difference between RecursiveAction and RecursiveTask in java, Example to demonstrate RecursiveTask using fork() and join() methods in java. What are Application of Fork/Join framework in real world.



Having any doubt? or you you liked the tutorial! Please comment in below section.
Please express your love by liking JavaMadeSoEasy.com (JMSE) on facebook, following on google+ or Twitter.



RELATED LINKS>

java.util.concurrent. Executor and ExecutorService framework in java

Atomic operations in java

java.util.concurrent.Semaphore in java



Thread concurrency Interviews >

No comments:

Post a Comment