In programming language terminology Concurrency term refers to Multithreading. Basically, multithreading is a type of execution model which will allow multiple threads which exist within the context of a process so that they all can execute independently but they can share their process resources.
In Java, JVM (Java Virtual Machine) has been designed to support concurrent programming and these all executions will take place in the context of threads. In Java, memory model describes how threads in the Java programming language can interact through memory. In this modern platform, basically the code is frequently not executed in the order it was written. It can be reordered by the compiler, processor and memory can achieve maximum utilized performance.
In this article, I will easily explain you how concurrency enhanced in java 8 with proper programming concepts and examples.
For the enhancement of concurrency in Java 8, java has introduced two new interfaces and four new classes in java.util.concurrent package.
Interfaces | Classes |
1. CompletableFuture.AsynchronousCompletionTask 2. CompletionStage<T> | 1. CompletableFuture<T> 2. ConcurrentHashMap.KeySetView<K,V> 3. CountedCompleter<T> 4. CompletionException |
In programming language terminology Concurrency term refers to Multithreading. Basically, multithreading is a type of execution model which will allow multiple threads which exist within the context of a process so that they all can execute independently but they can share their process resources.
The Java memory model describes how threads in the Java programming language interact through memory. In modern platforms, code is frequently not executed in the order it was written. It is reordered by the compiler, the processor and the memory subsystem to achieve maximum performance
In Java, JVM (Java Virtual Machine) has been designed to support concurrent programming and all these executions will take place in the context of threads. In Java, memory model describes how threads in the Java programming language can interact through memory. In this modern platform, basically the code is frequently not executed in the order it was written. It can be reordered by the compiler, processor and memory can achieve maximum utilized performance.
The aim of this article is not to make you masters in Java 8 Concurrency, but to help
you guide towards that goal. Sometimes it can help just to know that there is some API that
might be suitable for a particular situation.
A brief introduction on Enhancement Over Concurrency in java:
Java 1.0 : Thread, Runnable
java 1.2 : ThreadLocal
java 5 : Executor framework
java 6 : Minimal changes related to concurrency
java 7: Fork/Join framework and the Phaser
java 8 : introduced Stream API along with many other classes
Important Java 8 concurrency enhancements:
- Stream API, Lambda expressions and Parallel streams
- Stamped Lock
- Parallel sort for arrays
- Default ForkJoinPool: Common pool.
- CountedCompleter
- CompletableFuture
- Double Added, LongAdder, DoubleAccumulator, LongAccumulator.
- New methods in Collection, ConcurrentMap and ConcurrentHashMap
- Working With parallel stream:
- Stream() vs. parallelStream() in Collection interface
- Arrays do no have a parallelStream() method.
- parallel() vs. sequential()
- Parallel streams internally uses fork/join framework
- Elements may be processed in any order in parallel streams
- Avoid using stateful operations or that based on order within parallel streams
- Operations that depend on previous state like a loop iteration that is based on previous.
eg. Sorted, findFirst.
Program to illustrate the performance using stream vs parallel stream
Student: Its a Dto class which will have information about student details objects.
package com.advance.computing.java8.concurrency;
public class Student {
private int roll;
private String name;
private String subject;
private int marks;
public Student() {
super();
}
public Student(int roll, String name, String subject, int marks) {
super();
this.roll = roll;
this.name = name;
this.subject = subject;
this.marks = marks;
}
public int getRoll() {
return roll;
}
public void setRoll(int roll) {
this.roll = roll;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getSubject() {
return subject;
}
public void setSubject(String subject) {
this.subject = subject;
}
public int getMarks() {
return marks;
}
public void setMarks(int marks) {
this.marks = marks;
}
@Override
public String toString() {
return "Student [roll=" + roll + ", name=" + name + ", subject=" + subject + ", marks=" + marks + "]";
}
}
ConcurrencyStreamTest: Class contain business logic where student marks sorted using sequential stream and parallel stream.
package com.advance.computing.java8.concurrency;
package com.advance.computing.java8.concurrency;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
public class ConcurrencyStreamTest {
public static void main(String args[]) {
List studentList = new ArrayList();
studentList.add(new Student(111, "AAA", "CS", 80));
studentList.add(new Student(222, "BBB", "CS", 70));
studentList.add(new Student(333, "CCC", "CS", 60));
studentList.add(new Student(444, "DDD", "IT", 50));
studentList.add(new Student(555, "EEE", "IT", 65));
studentList.add(new Student(666, "EEE", "IT", 75));
System.out.println("Original List\n"); studentList.forEach(System.out::println); /* Apply sequential stream for list of object */ long startTime = System.currentTimeMillis(); List sortedItems = studentList.stream().sorted(Comparator.comparing(Student::getMarks)) .collect(Collectors.toList()); long endTime = System.currentTimeMillis(); System.out.println("\nSorting marks using sequential stream: \n"); sortedItems.forEach(System.out::println); System.out.println("Total time taken in process : " + (endTime - startTime) + " milisec."); /* Using parallel stream */ startTime = System.currentTimeMillis(); List anotherSortedItems = studentList.parallelStream().sorted(Comparator.comparing(Student::getMarks)) .collect(Collectors.toList()); endTime = System.currentTimeMillis(); System.out.println("\nSorting marks using parallel stream:\n "); anotherSortedItems.forEach(System.out::println); System.out.println("Total the time taken process : " + (endTime - startTime) + " milisec."); Integer totalMarks = studentList.parallelStream().map(e -> e.getMarks()).reduce(0, (a1, a2) -> a1 + a2); System.out.println("Total Marks: " + totalMarks); }
}
Output:
Original List
Student
[roll=111, name=AAA, subject=CS, marks=80]
Student [roll=222, name=BBB, subject=CS, marks=70] Student [roll=333, name=CCC, subject=CS, marks=60] Student [roll=444, name=DDD, subject=IT, marks=50] Student [roll=555, name=EEE, subject=IT, marks=65] Student [roll=666, name=EEE, subject=IT, marks=75]
Sorting marks using sequential stream:
Student
[roll=444, name=DDD, subject=IT, marks=50]
Student [roll=333, name=CCC, subject=CS, marks=60] Student [roll=555, name=EEE, subject=IT, marks=65] Student [roll=222, name=BBB, subject=CS, marks=70] Student [roll=666, name=EEE, subject=IT, marks=75] Student [roll=111, name=AAA, subject=CS, marks=80]
Total time taken in process : 7 milisec.
Sorting marks using parallel stream:
Student
[roll=444, name=DDD, subject=IT, marks=50]
Student [roll=333, name=CCC, subject=CS, marks=60] Student [roll=555, name=EEE, subject=IT, marks=65] Student [roll=222, name=BBB, subject=CS, marks=70] Student [roll=666, name=EEE, subject=IT, marks=75] Student [roll=111, name=AAA, subject=CS, marks=80]
Total the time taken process : 1 milisec.
Total Marks: 400
Note:
If
you want particular job using multiple threads in parallel cores, then only you
need to call parallelStream() method instead of stream() method. The execution
time will vary according to the system performance because parallelism is not
automatically faster than performing operations , although it can be if you
have enough data and processor cores.
Basically , parallelStream() method should be use when the output of the
operation won’t needed to be dependent on the order of elements present in
collection object.
2. WORKING WITH ATOMIC VARIABLES
2.1 :
DoubleAdder : preferable to alternatives when frequently updated
but less frequently read
2.2 : LongAdder : under high contention, expected throughput of this class is
significantly higher compared to AtomicLong, at the expense of higher space
consumption.
2.3 : DoubleAccumulator : preferable to alternatives when frequently updated but less
frequently read
2.4 : LongAccumulator : under high contention, expected throughput of this
class is significantly higher compared to AtomicLong, at the expense of higher
space consumption.
3. Working with Executor Service
3.1 : ExecutorService
: The executor services are one of the most
important feature of the Concurrency API in java 8. We can do how to execute
code in parallel via tasks and executor services where in Java 8 introduces
ExecutorService interface.
3.2 : Basically ExecutorService is a higher-level replacement for
working with threads directly.
3.3 : Executors are capable of running asynchronous tasks and typically
manage a pool of threads, so there is no need to create new threads manually.
3.4 : ExecutorService interface has the following implementations in the
java.util.concurrent package:
3.4.1 : ThreadPoolExecutor
3.4.2 : ScheduledThreadPoolExecutor
Java has introduces
two new interfaces and Four new classes for the enhancement of concurrency in
java 8 under
java.util.concurrent package
Interfaces:
1.
CompletableFuture.AsynchronousCompletionTask: A marker interface identifying asynchronous tasks
produced by async methods.
2. CompletionStage: It’s a stage of a possibly asynchronous computation,
which performs an action or computes a value when another
CompletionStage completes
Classes:
1.
CompletableFuture: It’s a Future that may be
explicitly completed (setting its value and status), and may be used as a
CompletionStage, supporting dependent functions and actions that trigger upon
its completion.
2. ConcurrentHashMap.KeySetView: It is a view of a ConcurrentHashMap as
a Set of keys, in which additions may optionally be enabled by
mapping to a common value.
3. CountedCompleter: A ForkJoinTask with a completion action
performed when triggered and there are no remaining pending actions.
4. CompletionException: It will throw exception when an error or other
exception is encountered in the course of completing a result or task.
(Note: For the details of concurrency classes and interfaces i’ll explain use cases and examples in my next article which will coming soon.)
Some important guidelines we should follow for Where and How we can use concurrency using java 8:
- Identify correct independent tasks.
- Find easily parallelizable version of algorithm.
- Use right identity while using reduce() with parallel streams.
- Avoid using stateful operations within parallel streams.
- Intermediate operations are not executed until a terminal operation (for all streams)
- Collect might be more efficient that reduce in most cases (for all streams)
- Iterate() method of Stream interface should be avoided as much as possible.
- SplittableRandom class is more suitable in parallel processing than Random class.
- Should use immutable objects as Identity object.
- All threads share the identity object in case of parallel streams.
- Parallelism threshold parameter of Concurrency methods should be used wisely
eg: search(), reduce(), compute etc. of ConcurrentHashMap