Implementing ThreadPool using custom LinkedBlockingQueue in java


Contents of page :
  • What is ThreadPool?
  • How ThreadPool works?
    • How threads in ThreadPool can be stopped?
  • Program to implement ThreadPool in java using custom LinkedBlockingQueue
  • Let’s discuss output in detail, to get better understanding of ThreadPool program

What is ThreadPool?
ThreadPool is a pool of threads which reuses a fixed number of threads  to execute tasks.

At any point, at most nThreads threads will be active processing tasks. If additional tasks are submitted when all threads are active, they will wait in the queue until a thread is available.
ThreadPool implementation internally uses LinkedBlockingQueue for adding and removing tasks.
In this post i will be using custom LinkedBlockingQueue, you can refer this post for implementing ThreadPool using Java Api’s LinkedBlockingQueue & also for more detailed information on ThreadPool.


How ThreadPool works?
We will instantiate ThreadPool, in ThreadPool’s constructor nThreads number of threads are created and started.
ThreadPool threadPool=new ThreadPool(2);

Here 2 threads will be created and started in ThreadPool.

Then, threads will enter run() method of ThreadPoolsThread class and will call take() method on taskQueue.
  • If tasks are available thread will execute task by entering run() method of task (As tasks executed always implements Runnable).
public void run() {
. . .
    while (true) {   
        . . .   
       Runnable runnable = taskQueue.take();
       runnable.run();
        . . .
    }
. . .
}  

  • Else waits for tasks to become available.



When tasks are added?
When execute() method of ThreadPool is called, it internally calls put() method on taskQueue to add tasks.
taskQueue.put(task);


Once tasks are available all waiting threads are notified that task is available.


How threads in ThreadPool can be stopped?
shutDown() method can be used to stop threads executing in threadPool, once shutdown of ThreadPool is initiated, previously submitted tasks are executed, but no new tasks could be accepted.

After thread has executed task
  1. Check whether pool shutDown has been initiated or not, if pool shutDown has been initiated and
  2. taskQueue does not contain any unExecuted task (i.e. taskQueue's size is 0 )
than interrupt() the thread.

public void run() {
. . .
    while (true) {   
        . . .   
       runnable.run();
        //task EXECUTED
        . . .   

        if(this.threadPool.isPoolShutDownInitiated() &&  
                        this.taskQueue.size()==0)
             this.interrupt();
        

    }
. . .
}  


        
Program to implement ThreadPool in java using custom LinkedBlockingQueue>

package ThreadPoolUsingLinkedBlockingQueueCustom;
import java.util.LinkedList;
import java.util.List;
/**
* Implementing custom BlockingQueue interface .
* This BlockingQueue implementation follows FIFO (first-in-first-out).
* New elements are inserted at the tail of the queue,
* and removal elements is done at the head of the queue.
*
* @author AnkitMittal
* Copyright (c), AnkitMittal .
* All Contents are copyrighted and must not be reproduced in any form.
*/
interface BlockingQueueCustom<E> {
    /**
     * Inserts the specified element into this queue
     * only if space is available else
     * waits for space to become available.
     */
    void put(E item)  throws InterruptedException ;
    /**
     * Retrieves and removes the head of this queue
     * only if elements are available else
     * waits for element to become available.
     */
    E take()  throws InterruptedException;
    
    /**
     * Returns size of queue.
     */
    int size();
}
/**
* Implementing custom LinkedBlockingQueue class.
* This BlockingQueue implementation follows FIFO (first-in-first-out).
* New elements are inserted at the tail of the queue,
* and removal elements is done at the head of the queue.
*
* @author AnkitMittal
* Copyright (c), AnkitMittal .
* All Contents are copyrighted and must not be reproduced in any form.
*/
class LinkedBlockingQueueCustom<E> implements BlockingQueueCustom<E>{
    private List<E> queue;
    private int  maxSize ; //maximum number of elements queue can hold at a time.
    public LinkedBlockingQueueCustom(int maxSize){
          this.maxSize = maxSize;
          queue = new LinkedList<E>();
    }
    /**
     * Inserts the specified element into this queue
     * only if space is available else
     * waits for space to become available.
     * After inserting element it notifies all waiting threads.
     */
    public synchronized void put(E item)  throws InterruptedException  {
     
         //check space is available or not.
      if (queue.size() == maxSize) {
          this.wait();
      }
     
      //space is available, insert element and notify all waiting threads.
          queue.add(item);
          this.notifyAll();
    }
    /**
     * Retrieves and removes the head of this queue
     * only if elements are available else
     * waits for element to become available.
     * After removing element it notifies all waiting threads.
     */
    public synchronized E take()  throws InterruptedException{
       //waits element is available or not.
            if (queue.size() == 0) {
                this.wait();
            }
            //element is available, remove element and notify all waiting threads.
            this.notifyAll();
      return queue.remove(0);
         
    }
    /**
     * Returns size of LinkedBlockingQueueCustom.
     */
    public synchronized int size() {
            return queue.size();
    }
}
/**
* ThreadPool is a class which creates a thread pool that reuses a fixed
* number of threads to execute tasks.
* At any point, at most nThreads threads will be active processing tasks.
* If additional tasks are submitted when all threads are active,
* they will wait in the queue until a thread is available.
*
* Once shutdown of ThreadPool is initiated, previously submitted tasks are
* executed, but no new tasks will be accepted.
*
* @author AnkitMittal
* Copyright (c), AnkitMittal .JavaMadeSoEasy.com
* All Contents are copyrighted and must not be reproduced in any form.
*/
class ThreadPool {
   private BlockingQueueCustom<Runnable> taskQueue;
  
   /*
    * Once pool shutDown will be initiated, poolShutDownInitiated will become true.
    */
   private boolean poolShutDownInitiated = false;
   /* Constructor of ThreadPool
    * nThreads= is a number of threads that exist in ThreadPool.
    * nThreads number of threads are created and started. *
    */
   public ThreadPool(int nThreads){
       taskQueue = new LinkedBlockingQueueCustom<Runnable>(nThreads);
       //Create and start nThreads number of threads.
       for(int i=1; i<=nThreads; i++){
          ThreadPoolsThread threadPoolsThread=new ThreadPoolsThread(taskQueue,this);
        threadPoolsThread.setName("Thread-"+i);
        System.out.println("Thread-"+i +" created in ThreadPool.");
        threadPoolsThread.start();   //start thread
       }
      
   }
  
   /**
    * Execute the task, task must be of Runnable type.
    */
   public synchronized void  execute(Runnable task) throws Exception{
       if(this.poolShutDownInitiated)
          throw new Exception("ThreadPool has been shutDown, no further tasks can be added");
      
       /*
     * Add task in sharedQueue,
     * and notify all waiting threads that task is available.  
          */
       System.out.println("task has been added.");
       this.taskQueue.put(task);
   }
   public boolean isPoolShutDownInitiated() {
          return poolShutDownInitiated;
   }
   /**
    * Initiates shutdown of ThreadPool, previously submitted tasks
    * are executed, but no new tasks will be accepted.
    */
   public synchronized void shutdown(){
      this.poolShutDownInitiated = true;
       System.out.println("ThreadPool SHUTDOWN initiated.");
   }
}
/**
* These threads are created and started from constructor of ThreadPool class.
*/
class ThreadPoolsThread extends Thread {
   private BlockingQueueCustom<Runnable> taskQueue;
   private ThreadPool threadPool;
   public ThreadPoolsThread(BlockingQueueCustom<Runnable> queue,
                 ThreadPool threadPool){
       taskQueue = queue;
       this.threadPool=threadPool;
      
   }
   public void run() {
          try {
                 /*
                 * ThreadPool's threads will keep on running
                 * until ThreadPool is not shutDown (shutDown will interrupt thread) and
                 * taskQueue contains some unExecuted tasks.
                 */
                 while (true) {   
                       System.out.println(Thread.currentThread().getName()
                                     +" is READY to execute task.");
                       /*ThreadPool's thread will take() task from sharedQueue
                       * only if tasks are available else
                       * waits for tasks to become available.
                       */
                       Runnable runnable = taskQueue.take();
                       System.out.println(Thread.currentThread().getName()
                                     +" has taken task.");
                       //Now, execute task with current thread.
                       runnable.run();               
                      
                       System.out.println(Thread.currentThread().getName()
                                     +" has EXECUTED task.");
                      
                       /*
                       * 1) Check whether pool shutDown has been initiated or not,
                       * if pool shutDown has been initiated and
                       * 2) taskQueue does not contain any
                       *    unExecuted task (i.e. taskQueue's size is 0 )
                       * than  interrupt() the thread.
                       */
                       if(this.threadPool.isPoolShutDownInitiated()
                                     &&  this.taskQueue.size()==0){
                              this.interrupt();
                              /*
                              *  Interrupting basically sends a message to the thread
                              *  indicating it has been interrupted but it doesn't cause
                              *  a thread to stop immediately,
                              *
                              *  if sleep is called, thread immediately throws
                              *  InterruptedException
                              */
                              Thread.sleep(1);
                       }  
                 }
          } catch (Exception e) {
                 System.out.println(Thread.currentThread().getName()+" has been STOPPED.");
          }
   }
}
/**
* Task class which implements Runnable.
*/
class Task implements Runnable{  
   @Override
   public void run() {
          try {
                 Thread.sleep(2000);
                 System.out.println(Thread.currentThread().getName()
                              +" is executing task.");
          } catch (InterruptedException e) {
                 e.printStackTrace();
          }
   }
};
/**
* Test ThreadPool.
*/
public class ThreadPoolTest{
   public static void main(String[] args) throws Exception {
          ThreadPool threadPool=new ThreadPool(2); //create 2 threads in ThreadPool
          Runnable task=new Task();
          threadPool.execute(task);
          threadPool.execute(task);
         
          threadPool.shutdown();
         
   }
  
}
/*OUTPUT
Thread-1 created in ThreadPool.
Thread-2 created in ThreadPool.
Thread-1 is READY to execute task.
Thread-2 is READY to execute task.
task has been added.
task has been added.
Thread-1 has taken task.
Thread-2 has taken task.
ThreadPool SHUTDOWN initiated.
Thread-1 is executing task.
Thread-1 has EXECUTED task.
Thread-1 has been STOPPED.
Thread-2 is executing task.
Thread-2 has EXECUTED task.
Thread-2 has been STOPPED.
*/

Let’s discuss output in detail, to get better understanding of ThreadPool program >
Note : I have mentioned output in green text.

Total number of thread created in ThreadPool was 2.
Thread-1 created in ThreadPool.
Till now Thread-1 have been created.
Thread-2 created in ThreadPool.
Till now Thread-2 have been created.

Thread-1 is READY to execute task.
Thread-1 have entered run() method and taskQueue’s size is 0. So its waiting for task to become available.
Thread-2 is READY to execute task.
Thread-2 have entered run() method and taskQueue’s size is 0. So its waiting for task to become available.

task has been added.
execute() method of ThreadPool is called by main thread, it internally calls put() method on taskQueue to add tasks. Once tasks is available all waiting threads are notified that task is available.

task has been added.
execute() method of ThreadPool is called by main thread, it internally calls put() method on taskQueue to add tasks. Once tasks is available all waiting threads are notified that task is available.

Thread-1 has taken task.
As waiting Thread-1 has been notified it takes task.

Thread-2 has taken task.
As waiting Thread-2 has been notified it takes task.

ThreadPool SHUTDOWN initiated.
threadPool.shutdown() is called by main thread, previously submitted tasks are executed, but no new tasks will be accepted.

Thread-1 is executing task.
Thread-1 is executing task, it’s in run() method of Task class (shutdown was initiated, but  previously submitted tasks are executed ).

Thread-1 has EXECUTED task.
Thread-1 has executed task.

Thread-1 has been STOPPED.
Thread-1 has been stopped.

Thread-2 is executing task.
Thread-2 is executing task, it’s in run() method of Task class.

Thread-2 has EXECUTED task.
Thread-2 has executed task.

Thread-2 has been STOPPED.
Thread-2 has been stopped.






RELATED LINKS>

BlockingQueue >

Solve Consumer Producer problem by using BlockingQueue in multithreading


Custom implementation of LinkedBlockingQueue class which implements BlockingQueue interface





Object and class lock >

Acquiring object lock - synchronization blocks and methods- multiple threads may exist on same object but only one thread of that object can enter synchronized block/method at a time.
Acquiring lock on class, 2 Ways to acquire lock on class

Difference between object Lock and class Lock



Thread Pool >

Implement Thread pool in java



Guidelines to threadsafe code >

Must read for you :

1 comment: