Friday, December 6, 2019

How to Create Your Own BlockingQueue - Java Program

This post shows how you can create your own BlockingQueue in Java using ReentrantLock and Condition interface. Condition interface provides method await and signal which work the same way as wait and notify.

BlockingQueue Java Program

Here we have a class called BufferClass which has an array of type Object, whose length is 5. So, 5 is the bound for buffer, if 5 values are already added to the array it will be blocked until at least one value is retrieved from the array.

put() and take() method are used to add value to an array and retrieve value from an array respectively.

BufferClass

 
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class BufferClass {
  final Lock lock = new ReentrantLock();
  // Conditions
  final Condition produce  = lock.newCondition(); 
  final Condition consume = lock.newCondition(); 

  final Object[] valueArr = new Object[5];
  int putIndex, takeIndex;
  int count;

  public void put(Object x) throws InterruptedException {
    //System.out.println("count -- " + count);
    //System.out.println("Array length -- " + valueArr.length);
    lock.lock();
    try {
      while (count == valueArr.length){    
        produce.await();
        //System.out.println("Awaiting");
      }
   
      valueArr[putIndex] = x;
      System.out.println("Adding - " + x);
      if (++putIndex == valueArr.length){
        putIndex = 0;
      }
      // increment count
      ++count;
      consume.signal();
    } finally {
      lock.unlock();
    }
  }

  public Object take() throws InterruptedException {
    lock.lock();
    try {
      while (count == 0){
        consume.await();
      }
      Object x = valueArr[takeIndex];
      System.out.println("Retrieving - " + x);
      if (++takeIndex == valueArr.length){
        takeIndex = 0;
      }
      // reduce the count
      --count;
      // signal producer
      produce.signal();
      return x;
    } finally {
      lock.unlock();
    }
  }
}

To test this BufferClass we have another class BufferClassDemo where two threads are created, one will add values to the buffer and another will retrieve values from the buffer. Here 10 values are added, BufferClass should ensure if 5 values are already added any attempt to add any further value should be blocked. Same way if the buffer is empty any attempt to retrieve value should be blocked.

 
public class BufferClassDemo {
 
 public static void main(String[] args) {
  BufferClass bufferClass = new BufferClass();
  // Creating two threads
  Thread producer = new Thread(new Producer(bufferClass));
  Thread consumer = new Thread(new Consumer(bufferClass)); 
  // starting threads
  producer.start();
  consumer.start();
 }
}

class Producer implements Runnable {
  private BufferClass bufferClass;    
  public Producer(BufferClass bufferClass){
    this.bufferClass = bufferClass;
  }
  @Override
  public void run() {
    for (int i = 1; i <= 10; i++) {
      try {
        //Thread.sleep(10);                            
        bufferClass.put(i);                            
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }
}

class Consumer implements Runnable {
  private BufferClass bufferClass;    
  public Consumer(BufferClass bufferClass){
    this.bufferClass = bufferClass;
  }
  @Override
  public void run() {
    for (int i = 1; i <= 10; i++) {
      try {
        // Thread.sleep(500);
        bufferClass.take();               
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }
}

Output

Output from one of the run. Note that output may vary but the condition of not having more than 5 elements and blocking any attempt to add should hold.

Adding - 1
Adding - 2
Adding - 3
Adding - 4
Adding - 5
Retrieving - 1
Retrieving - 2
Retrieving - 3
Retrieving - 4
Retrieving - 5
Adding - 6
Adding - 7
Adding - 8
Retrieving - 6
Retrieving - 7
Retrieving - 8
Adding - 9
Retrieving - 9
Adding - 10
Retrieving - 10

That's all for this topic How to Create Your Own BlockingQueue - Java Program. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Java Programs Page


Related Topics

  1. ArrayBlockingQueue in Java Concurrency
  2. How to run threads in sequence - Java Program
  3. How to create deadlock in Java multi-threading - Java Program
  4. Print odd-even numbers using threads and wait-notify
  5. Producer-Consumer Java program using ArrayBlockingQueue

You may also like-

  1. Converting string to bytearray - Java Program
  2. How to remove elements from an array - Java Program
  3. How to convert date and time between different time-zones in Java
  4. Callable and Future in Java concurrency
  5. Volatile in Java
  6. PermGen Space Removal in Java 8
  7. Spliterator in Java
  8. Multi catch statement in Java 7