Saturday, April 10, 2021

ConcurrentLinkedQueue in Java With Examples

ConcurrentLinkedQueue in Java implements Queue interface and it was added in Java 5 along with other concurrent utilities like CyclicBarrier, CountDownLatch, Semaphore, ConcurrentHashMap etc.

ConcurrentLinkedQueue in Java is an unbounded thread-safe queue which stores its elements as linked nodes. This queue orders elements FIFO (first-in-first-out). The head of the queue is the element that has been on the queue the longest time. The tail of the queue is the element that has been on the queue the shortest time. New elements are inserted at the tail of the queue, and the queue retrieval operations obtain elements at the head of the queue.

Like most other concurrent collection implementations, ConcurrentLinkedQueue class does not permit the use of null elements.

Usage of ConcurrentLinkedQueue in Java

A ConcurrentLinkedQueue is an appropriate choice when many threads will share access to a common collection. Note that it doesn't block operations as it is done in the implementations of BlockingQueue interface like ArrayBlockingQueue, LinkedBlockingQueue. So there are no put() or take() methods which will wait if required.

Java ConcurrentLinkedQueue constructors

  • ConcurrentLinkedQueue()- Creates a ConcurrentLinkedQueue that is initially empty.
  • ConcurrentLinkedQueue​(Collection<? extends E> c)- Creates a ConcurrentLinkedQueue initially containing the elements of the given collection, added in traversal order of the collection's iterator.

Java ConcurrentLinkedQueue Iterator

Iterators in ConcurrentLinkedQueue are weakly consistent, returning elements reflecting the state of the queue at some point at or since the creation of the iterator. They do not throw ConcurrentModificationException, and may proceed concurrently with other operations. Elements contained in the queue since the creation of the iterator will be returned exactly once.

ConcurrentLinkedQueue Java Example

Let's create a producer consumer Java program using ConcurrentLinkedQueue. In this code there will be one producer thread putting element into the queue and three consumer threads retrieving elements from the queue. Note that producer thread will put only 3 elements.

You can run the program multiple times and observe that two consumer threads are not retrieving the same element from the queue.

Later ConcurrentLinkedQueue is replaced with PriorityQueue which is not synchronized. Running this code will result in consumer threads getting into a race condition and overstepping on each other. Note that sometimes you may get correct output also but in multiple runs you are bound to get incorrect results.

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ConcurrentLQDemo {
  public static void main(String[] args) {
    Buffer buffer = new Buffer();
    ExecutorService executor = Executors.newFixedThreadPool(4);
    // Calling one producer
    executor.execute(new ProdTask(buffer));
    // Calling three consumers
    executor.execute(new ConTask(buffer));
    executor.execute(new ConTask(buffer));
    executor.execute(new ConTask(buffer));
    executor.shutdown();
  }
}
/**
 *  
 */
class ProdTask implements Runnable{
  Buffer buffer;
  ProdTask(Buffer buffer){
    this.buffer = buffer;
  }
  @Override
  public void run() {
    // putting just three elements
    for(int i = 0; i < 3; i++){
      buffer.put(i);
    }
  }
}

/**
 * 
 */
class ConTask implements Runnable{
  Buffer buffer;
  ConTask(Buffer buffer){
    this.buffer = buffer;
  }
  @Override
  public void run() {
    try {
      // delay to make sure producer starts first
      Thread.sleep(20);
    } catch (InterruptedException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
    buffer.get();
  }    
}

//Shared class used by threads
class Buffer{
  int i;
  Queue<Integer> clQueue = new ConcurrentLinkedQueue<Integer>();

  //Retrieving from the queue 
  public void get(){
    System.out.println("Consumer recd - " + clQueue.poll());
  }
  // putting in the queue
  public void put(int i){
    this.i = i;
    clQueue.add(i);
    System.out.println("Putting - " + i);
  } 
}

Output

Putting - 0
Putting - 1
Putting - 2
Consumer recd - 0
Consumer recd - 2
Consumer recd - 1

Now let's replace the ConcurrentLinkedQueue with PriorityQueue which is not synchronized.

import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ConcurrentLQDemo {
  public static void main(String[] args) {
    Buffer buffer = new Buffer();
    ExecutorService executor = Executors.newFixedThreadPool(4);
    // Calling one producer
    executor.execute(new ProdTask(buffer));
    // Calling three consumers
    executor.execute(new ConTask(buffer));
    executor.execute(new ConTask(buffer));
    executor.execute(new ConTask(buffer));
    executor.shutdown();
  }
}

/**
 * 
 */
class ProdTask implements Runnable{
  Buffer buffer;
  ProdTask(Buffer buffer){
    this.buffer = buffer;
  }
  @Override
  public void run() {
    // putting just three elements
    for(int i = 0; i < 3; i++){
      buffer.put(i);
    }
  }
}

/**
 * 
 */
class ConTask implements Runnable{
  Buffer buffer;
  ConTask(Buffer buffer){
    this.buffer = buffer;
  }
  @Override
  public void run() {
    try {
      // delay to make sure producer starts first
      Thread.sleep(20);
    } catch (InterruptedException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
    buffer.get();
  }    
}

//Shared class used by threads
class Buffer{
  int i;
  //Queue<Integer> clQueue = new ConcurrentLinkedQueue<Integer>();
  Queue<Integer> clQueue = new PriorityQueue<Integer>();

  //Retrieving from the queue 
  public void get(){
    System.out.println("Consumer recd - " + clQueue.poll());
  }
  // putting in the queue
  public void put(int i){
    this.i = i;
    clQueue.add(i);
    System.out.println("Putting - " + i);
  }  
}

Output

Putting - 0
Putting - 1
Putting - 2
Consumer recd - 0
Consumer recd - 1
Consumer recd - 0

Here it can be seen Consumer threads are getting the same element out of the queue.

That's all for this topic ConcurrentLinkedQueue in Java With Examples. If you have any doubt or any suggestions to make please drop a comment. Thanks!


Related Topics

  1. Java SynchronousQueue With Examples
  2. Java BlockingDeque With Examples
  3. ConcurrentHashMap in Java With Examples
  4. Java ReentrantReadWriteLock With Examples
  5. Java Semaphore With Examples

You may also like-

  1. Race Condition in Java Multi-Threading
  2. Fail-Fast Vs Fail-Safe Iterator in Java
  3. How HashMap Works Internally in Java
  4. @FunctionalInterface Annotation in Java
  5. Interface Static Methods in Java
  6. static Method Overloading or Overriding in Java
  7. Multi-Catch Statement in Java Exception Handling
  8. How to Inject Prototype Scoped Bean into a Singleton Bean in Spring