Tuesday, January 21, 2020

Parallel Stream in Java Stream API

You can execute streams in serial or in parallel. When a stream executes in parallel, the Java runtime partitions the stream into multiple sub-streams. Aggregate operations iterate over and process these sub-streams in parallel and then combine the results. This post shows how you can get and use parallel stream in Java.

This parallel execution of data using parallel streams, with each sub-stream running in a separate thread, results in increased performance. Since, part of the data (sub-stream) is processed by different processors of multi-core processors in separate threads which are later combined to give the final result, bulk operations can also be processed in less time making the whole process more efficient and less time consuming.


How to get a parallel stream in Java

Collection has methods Collection.stream() and Collection.parallelStream(), which produce sequential and parallel streams respectively.

You can also call parallel() method on a sequential stream to get a parallel stream. The parallel() method is defined in the BaseStream interface.

As example: If you have an Employee class and for some testing you want to create 1000 objects of Employee class then you can use parallel() method with range -

List<Employee> empList = IntStream.rangeClosed(1, 1000).parallel().mapToObj(Employee::new).collect(Collectors.toList());

Things to consider when using Java parallel streams

Since partial results are computed by separate threads and later combined so it becomes important to think about the following points-

  1. Is separate combiner needed to combine partial results or aggregate function itself can work as combiner too.
  2. Since multi-threading is involved so any shared variable should not be updated by any operation in the parallel stream.
  3. Most of the time collection is used as a data source of the stream and collections are not thread safe. Which means that multiple threads cannot manipulate a collection without introducing thread interference errors.
  4. Parallel stream uses common fork-join thread pool. Thus, running a performance intensive long running task may block all threads in the pool, which may block all other tasks (as no threads will be available) that are using parallel streams.
  5. Note that parallelism is not automatically faster than performing operations serially, although it can be if you have enough data and processor cores. While aggregate operations enable you to more easily implement parallelism, it is still your responsibility to determine if your application is suitable for parallelism.

Java parallel streams examples

Let’s first see a simple example using parallel stream in Java where you need to find the employee with maximum salary.

Employee class

public class Employee {
    private String lastName;
    private String firstName;
    private String empId;
    private int age;
    private int salary;
    public String getLastName() {
        return lastName;
    }
    public void setLastName(String lastName) {
        this.lastName = lastName;
    }
    public String getFirstName() {
        return firstName;
    }
    public void setFirstName(String firstName) {
        this.firstName = firstName;
    }
    public String getEmpId() {
        return empId;
    }
    public void setEmpId(String empId) {
        this.empId = empId;
    }
    public int getAge() {
        return age;
    }
    public void setAge(int age) {
        this.age = age;
    }
    
    public String getFullName(){
     return this.firstName + " " + this.lastName;
    }
   
    public int getSalary() {
      return salary;
    }
    public void setSalary(int salary) {
      this.salary = salary;
    }
}

Using parallel stream

import java.util.ArrayList;
import java.util.List;
import java.util.OptionalInt;

public class ParallelDemo1 {

    public static void main(String[] args) {
        // getting list of employee 
        List<Employee> empList = createList();
        OptionalInt maxSalary = empList.parallelStream().mapToInt(e -> e.getSalary()).max();
        if(maxSalary.isPresent()){
            System.out.println("Max Salary " + maxSalary.getAsInt());
        }
    }
    
    // Stub method to create list of employee objects
    private static List createList(){
        List<Employee> empList = new ArrayList<Employee>();
        Employee emp = new Employee();
        emp.setEmpId("E001");
        emp.setAge(40);
        emp.setFirstName("Ram");
        emp.setLastName("Chandra");
        emp.setSalary(5000);
        empList.add(emp);
        emp = new Employee();
        emp.setEmpId("E002");
        emp.setAge(35);
        emp.setFirstName("Sheila");
        emp.setLastName("Baijal");
        emp.setSalary(7000);
        empList.add(emp);
        emp = new Employee();
        emp.setEmpId("E003");
        emp.setAge(24);
        emp.setFirstName("Mukesh");
        emp.setLastName("Rishi");
        emp.setSalary(9000);
        empList.add(emp);
        emp = new Employee();
        emp.setEmpId("E004");
        emp.setAge(37);
        emp.setFirstName("Rani");
        emp.setLastName("Mukherjee");
        emp.setSalary(10000);
        empList.add(emp);
        return empList;
    }
}

Output

Max Salary 10000

Java Parallel Stream Mistake-updating a shared variable

As already stated above updating a shared state when using parallel stream may cause problem due to multi-threading. Let’s see it with an example.

There is an Employee class and 500 objects of the Employee class are stored in a list. Then using parallel stream you are trying to get the total salary paid to all employees.

Employee class

Employee class is same as used above with one difference, now there is a constructor with int argument which is used to set salary property. Using range method 500 objects will be created with salary set as 1..500.

public class Employee {
    private String lastName;
    private String firstName;
    private String empId;
    private int age;
    private int salary;
    
    Employee(int salary){
     this.salary = salary;
    }
    
    public String getLastName() {
        return lastName;
    }
    public void setLastName(String lastName) {
        this.lastName = lastName;
    }
    public String getFirstName() {
        return firstName;
    }
    public void setFirstName(String firstName) {
        this.firstName = firstName;
    }
    public String getEmpId() {
        return empId;
    }
    public void setEmpId(String empId) {
        this.empId = empId;
    }
    public int getAge() {
        return age;
    }
    public void setAge(int age) {
        this.age = age;
    }
    
    public String getFullName(){
     return this.firstName + " " + this.lastName;
    }
   
    public int getSalary() {
      return salary;
    }
}

Total Salary Calculation

import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class ParallelDemo {

    public static void main(String[] args) {
        Salary sal = new Salary();
        List<Employee> empList = createList();
        empList.parallelStream().forEach(sal::doProcess);
        
        System.out.println("Total - " + sal.getTotalSalary());

    }
    // Stub method to create list of employee objects
    private static List createList(){
        List<Employee> empList = IntStream.rangeClosed(1, 500).mapToObj(Employee::new)
          .collect(Collectors.toList());
        return empList;
    }
}

class Salary{
    //shared variable
    private int total = 0;
    
    public void doProcess(Employee emp){
        addSalary(emp.getSalary());
    }
    
    public void addSalary(int salary){
        total = total + salary;
    }
    public int getTotalSalary(){
        return total;
    }
}

Output

Total – 113359, Total – 125250, Total – 120901, Total – 123835, Total – 125250

I got these 5 outputs on executing it 5 times. You can see that output is different (Correct output is 125250 by the way). It is because total is changed from the parallel stream which is a shared variable.

If you have seen the first example I have given for parallel stream you must have got an idea what’s the better way to do it.

public class ParallelDemo {

    public static void main(String[] args) {
        Salary sal = new Salary();
        List<Employee> empList = createList();
        //empList.parallelStream().forEach(sal::doProcess);
        int totalSalary = empList.parallelStream().mapToInt(e -> e.getSalary()).sum();
        sal.addSalary(totalSalary);
        System.out.println("Total - " + sal.getTotalSalary());

    }
    // Stub method to create list of employee objects
    private static List createList(){
        List<Employee> empList = IntStream.rangeClosed(1, 500).mapToObj(Employee::new)
         .collect(Collectors.toList());
        return empList;
    }
}

Here note that you are not changing the shared variable in the stream. You are getting the salaries of the employees and summing it. That way, even in parallel stream there is no problem as different threads are getting different data to add and then those partial results are combined to get the total salary.

Using Collectors.groupingByConcurrent with Parallel Streams

Operation groupingBy performs poorly with parallel streams. This is because it operates by merging two maps by key, which is computationally expensive. With parallel stream in Java you should use groupingByConcurrent operation instead of groupingBy which returns an instance of ConcurrentMap instead of Map.

As example – If you have an Employee class and you want to group employees on the basis of sex it can be done as -

import java.util.Arrays;
import java.util.List;

import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;

public class ParallelDemo1 {

    public static void main(String[] args) {
        ParallelDemo1 pd = new ParallelDemo1();
        // getting list of employee 
        List<Employee> empList = pd.createList();
        
        ConcurrentMap<Character, List<Employee>> bySalary = empList.parallelStream()
         .collect(Collectors.groupingByConcurrent(e -> e.sex));
        bySalary.forEach((K, V)->{
            System.out.println("Key- " + K + " Value ");
            V.forEach(v->System.out.println(v.name));
        });      
    }
       
    // Stub method to create list of employee objects
    private List<Employee> createList(){
        List<Employee> empList = Arrays.asList(new Employee("E001", 40, "Ram", 'M', 5000), 
        new Employee("E002", 35, "Sheila", 'F', 7000), 
        new Employee("E003", 24, "Mukesh", 'M', 9000), 
        new Employee("E004", 37, "Rani", 'F', 10000));
        
        return empList;
    }

    class Employee {
        private String empId;
        private int age;
        private String name;
        private char sex;
        private int salary;
        Employee(String empId, int age, String name, char sex, int salary){
            this.empId = empId;
            this.age = age;
            this.name = name;
            this.sex = sex;
            this.salary = salary;
        }        
    }
}

Output

Key- F Value 
Sheila
Rani
Key- M Value 
Mukesh
Ram

Using forEachOrdered with Parallel Streams

The order in which a pipeline processes the elements of a stream depends on whether the stream is executed in serial or in parallel.

As example:

Integer[] intArray = {1, 2, 3, 4, 5, 6, 7, 8 };
List<Integer> listOfIntegers = new ArrayList<>(Arrays.asList(intArray));
System.out.println("listOfIntegers:");
listOfIntegers.stream().forEach(e -> System.out.print(e + " "));

Output

listOfIntegers:
1 2 3 4 5 6 7 8

Here you can see that the pipeline prints the elements of the list listOfIntegers in the order that they were added to the list.

With parallel stream-

System.out.println("Parallel stream");
listOfIntegers.parallelStream().forEach(e -> System.out.print(e + " "));

Output

Parallel stream:
3 4 1 6 2 5 7 8

Here you can see that the pipeline prints the elements of the list in an apparently random order. When you execute a stream in parallel, the Java compiler and runtime determine the order in which to process the stream's elements to maximize the benefits of parallel computing unless otherwise specified by the stream operation.

Using forEachOrdered -

System.out.println("With forEachOrdered:");
listOfIntegers.parallelStream().forEachOrdered(e -> System.out.print(e + " "));

Output

With forEachOrdered:
8 7 6 5 4 3 2 1

Note that the method forEachOrdered is used here, which processes the elements of the stream in the order specified by its source, regardless of whether you executed the stream in serial or parallel. Note that you may lose the benefits of parallelism if you use operations like forEachOrdered with parallel streams.

Points to note

  1. Parallelism is not automatically faster than performing operations serially, although it can be if you have enough data and processor cores.
  2. There is increased overhead of splitting the data, managing multiple threads, combining the partial results when you are using parallel stream in Java.
  3. Make sure that there is enough data for computation that offsets this increased overhead and time saved in parallel processing of the data scores over any overhead tasks.
  4. Performance of parallel execution also depends upon the number of processors available.

That's all for this topic Parallel Stream in Java Stream API. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Java Advanced Tutorial Page


Related Topics

  1. Java Stream API Examples
  2. Reduction Operations in Java Stream API
  3. Collecting in Java Stream API
  4. Java Stream API Interview Questions
  5. Lambda Expressions in Java 8

You may also like-

  1. Method Reference in Java
  2. Optional Class in Java 8
  3. Race condition in Java multi-threading
  4. AtomicLong in Java Concurrency
  5. Lock Striping in Java Concurrency
  6. How ArrayList works internally in Java
  7. Difference between HashMap and ConcurrentHashMap in Java
  8. Creating Custom Exception Class in Java