3/17/2020
Java Concurrency
Threads
In concurrent programming, there are two basic units of execution: processes and
threads.
On a computer run many active threads and processes, even if there is only a single-core
processor. This is achieved through the OS feature of time-slicing.
In Java, concurrent programming is concerned with threads.
1
3/17/2020
Threads vs Processes
Process Thread
Synonymous with program or Lightweight processes
application
Exists within a process (each process
Self-contained execution environment has at least one thread)
(own memory space)
Share resources (memory and files) =>
Inter Process Communication (IPC) communication problems
resources, such as pipes and sockets
Multithreading is a feature supported
Most implementations of the Java VM by the Java platform.
run as a single process.
Threads in Java
"Java threads are objects like any other Java objects. Threads are instances of class java.lang.Thread,
or instances of subclasses of this class. In addition to being objects, java threads can also execute
code."
In order to define a thread:
Extend Thread
Implement Runnable
Each thread overrides the run() method => this will be executed during the thread lifecycle
2
3/17/2020
Threads in Java
Threads Lifecycle
(From http://www.javatpoint.com/life-cycle-of-a-thread)
Threads in Java
Lifecycle:
Starting a thread class MyClass extends Thread{
Create a thread
Call the start() method on it => starts executing the run() method public void run() {
While(condition){
Terminating a Thread
// code for processing
When it exists the run() method => the thread is dead =>once it is dead it cannot sleep(1000);
be restarted
Pausing, suspending and resuming a thread }
Suspend for a certain amount of time using the sleep(milis) method }
Use The wait() method and notify mechanism;
Thread cleanup
As long as some other active object holds a reference to the terminated thread
object, other threads can execute methods on the terminated thread and
retrieve that information.
3
3/17/2020
Threads in Java
1. Create a thread by providing a Runnable object 2. Create a thread by extending the Thread class
The Runnable interface defines a single method named run
– the method will be implemented by the class The Thread class itself implements Runnable - its run method
implementing the interface and it will include the code that does not do anything. A class can extend the Thread class
will be executed in the thread and can provide an implementation of the run method.
class MyClass implements Runnable { class MyClass extends Thread{
public void run() { public void run() {
//Display info about this particular thread //Display info about this particular thread
System.out.println(Thread.currentThread()); System.out.println(Thread.currentThread());
} }
} }
… …
Thread t = new Thread(new MyClass()); MyClass t = new MyClass ();
t.start(); t.start();
The Runnable object is passed to the Thread constructor
Thread vs Runnable
Inheritance Option
Extends Thread => cannot have other inheritance
Reusability
Implements Runnable =>contains only the functionality we want in the run method
extends Thread" contains both thread and job specific behavior code
Object Oriented Design
Implements Runnable => Composite Design => A thread has-a Runnable behavior.
"extends Thread" is not a good Object Oriented practice.
Loosely Coupled
Implements Runnable => loosely coupled => splits code into 2 parts: behavior and thread
Extends thread => tightly coupled
Functions overhead
"extends Thread" means inheriting all the functions of the Thread class which we may do not need
4
3/17/2020
Threads – pausing execution
Thread.sleep()
Used to suspend the execution of a running thread for a specified duration
The current thread will be put in the wait state until the wait time ends
public class App
{ Exception thrown in case the current
public static void printMessages() throws InterruptedException { thread is interrupted by another thread
while sleep is active.
for (int i=0; i< 10; i++){
System.out.println( "Sending message number " + i);
Thread.sleep(4000);
} The time for suspending the execution of a
running thread must be given in
}
milliseconds and must be a positive
number. In the example, the thread’s
public static void main( String[] args ) throws InterruptedException { execution is suspended for 4 seconds.
App.printMessages();
}
}
Threads – Interrupts
A thread t in the waiting or sleeping state can be interrupted by calling the interrupt method declared in the
Thread class => the thread t will exit the wait/sleeping state and will throw an InterruptedException
The interrupted thread t must handle its interruption using one of the methods below [1]
The thread is invoking methods that throw InterruptedException => The thread is not invoking methods that throw InterruptedException =>
returns from the run method after catching the exception it will periodically invoke Thread.interrupted to check if an interrupt
has been received
public class MyThread implements Runnable{
public void run() { public class MyThread implements Runnable{
for (int i=0; i< 10; i++){ public void run() {
System.out.println( "Sending message number " + i); for (int i=0; i< 10; i++){
try { System.out.println( "Sending message number " + i);
Thread.sleep(4000); if(Thread.interrupted()){
} catch (InterruptedException e) { System.out.println("Someone interrupted me!");
System.out.println("Someone interrupted me!"); return;
return; }
} }
} }
} }…
} Thread thread = new Thread(new MyThread());
… thread.start(); thread.interrupt();
Thread thread = new Thread(new MyThread());
thread.start(); thread.interrupt();
10
5
3/17/2020
Scheduled Execution [2]
Timers - used to schedule the specified task for repeated fixed-delay execution, beginning after the
specified delay.
public class SendingMessageTask extends TimerTask { //Step 1
Steps for scheduling a task using Timer private String message;
public SendingMessageAction(String aMessage){
Step 1: Create a subclass of the TimerTask class this.message = aMessage;
and override the run method by specifying the }
instructions to be executed. @Override
public void run() {
Step 2: Create a thread using the Timer class. System.out.println("Sending the message " + this.message);
}
Each Timer object has a corresponding }
background thread that will execute the timer’s
tasks sequentially …
Timer aTimer = new Timer(); //Step 2
Step 3: Create an object of the subclass created SendingMessageTask sendingMessageTask = new SendingMessageTask("Hello"); //Step 3
at Step 1.
//Step 4 – schedule the task for repeated fixed-delay executions – e.g. delay = 1000
Step 4: Plan the execution of the object created milliseconds, time between successive task executions = 2000 milliseconds
at Step 3 using the schedule methods from the aTimer.schedule(sendingMessageTask, 1000, 2000);
Timer class.
11
Thread Safety
1) Stateless objects – no state for that class
I.e. a class that has no instance vars => its state cannot change by running
methods in different threads
Local vars (in method) are independent per thread (each has its own stack)
=> Thread Safety
12
6
3/17/2020
Thread Safety
2) Volatile variables
"The Java volatile keyword is used to mark a Java variable as "being stored in main
memory". More precisely that means, that every read of a volatile variable will be
read from the computer's main memory, and not from the CPU cache, and that
every write to a volatile variable will be written to main memory, and not just to the
CPU cache."
http://tutorials.jenkov.com/java-concurrency/volatile.html
=> Thread Safety
13
synchronize
synchronized(MyClass.class){
// some code
Thread Safety }
Or
synchronized(this){
// some code
}
3) Locks
- guard a shared resource from accessing or modifying it ReentrantLock
- guard resources in a block using synchronize private ReentrantLock lock;
- guard resources across blocks using ReentrantLock public void foo() { ...
lock.lock();
- allow multiple accesses to same resource : ...}
CountDownLatch public void bar() {...
lock.unlock();
...}
CountDownLatch
CountDownLatch latch= new
=> Thread Safety CountDownLatch(3);
Public void foo(){...
latch.countDown();
...}
14
7
3/17/2020
Synchronize -deadlock
Thread Safety public void transfer(Account a, Account b, double sum)
{
synchronized(a){ // Th1 locks account a
//Th2 locks account b
synchronized(b){
//transfer sum
}
3) Locks }
}
=> can lead to deadlocks
Solution: sort input accounts and synchronize
Call function :
in-order
Th1: transfer(a,b,sum1);
Th2: transfer(b,a,sum1);
15
Race Condition
public static Operation getInstance(){
Thread Safety If(instance ==null){
instance = new Operation();
}
return instance;
}
4) Atomicity – needed to avoid problems in case of :
• Th2 running
• Th1 running
=> two instances are created
a) Problem: race conditions
Synchronized
(getting the right answer depends on lucky timing)
Solution: synchronization. How? public static Operation getInstance(){
If(instance ==null){
1. synchronize entire method
synchronized(Operation.class){
( inefficient once the instance is created) If(instance ==null){
2. synchronize the instantiation piece of code instance = new Operation();
(i.e. only if "instance == null") }
}
}
return instance;
=> Thread Safety
}
16
8
3/17/2020
Compound Operations
int i=0;
Thread Safety i++;
/*Accessed simultaneously by both Th1 and Th2
Can lead to inconsistencies:
- result can be 1(both threads got 0 and
incremented to 1)
- result can be 2(second thread got the value 1
4) Atomicity – needed to avoid problems in case of : incremented by the first thread)
Atomic Operations
AtomicInteger i= new
b) Problem: compound actions AtomicInteger();
i++; i.getAndIncrement ();
Get I value & add one to it => two operations;
Solution: Atomic data types:
Ex. AtomicInteger
=> Thread Safety
17
Thread Safety
5) Use thread safe collections
A ) synchronized collections - synchronizes all methods
List<String> list = Collections.synchronizedList(new ArrayList<String>());
(!!! For iteration the collection needs to use external sync)
B) use concurrent collections
BlockingQueue
ConcurrentMap - uses a multitude of locks, each lock controls one
segment of the hash.
CopyOnWriteArrayList -achieves thread-safety by creating a separate
copy of List for each write operation.
18
9
3/17/2020
Assignment 2
Simulation Server
Scheduler
Manager
Producer Task 6 Task 6
…
Task 5
Put task
Task 2 Task 4
Blocking Queue
Task 4
Task 1 Task 3 Task3
Consumer Task0
Take task
server1 server2
public void run(){
//process task
Thread.sleep(currentTaskProcessingTime)
}
19
Assignment 2
Task
Modeled using:
arrivalTime
finishTime
processingPeriod
FinishTime = arrivalTime + processingPeriod+ waitingPeriodOnChosenServer
20
10
3/17/2020
Assignment 2
Server -- Runnable
Modeled using:
Tasks (BlockingQueue<Task>…)
WaitingPeriod (AtomicInteger –
decremented by current thread once a task is completed
incremented by scheduler thread adding new tasks)
21
Assignment 2
Scheduler
Sends tasks to Servers according to the
established strategy
Modeled Using :
Servers
Constraints:
maxNoServers,, maxLoadPerServer
22
11
3/17/2020
Assignment 2
Scheduler – Strategy Patter
Choose the policy to distribute clients
https://en.wikipedia.org/wiki/Strategy_pattern
23
Assignment 2
Simulation Manager- Runnable
Generates randomly the tasks with:
Arrival time
processingPeriod
Contains simulation loop:
CurrentTime
Call scheduler to dispatch tasks
Update UI
24
12
3/17/2020
Assignment 2
Simulation Manager- Runnable
Generates randomly the tasks with:
Arrival time
processingPeriod
Contains simulation loop:
CurrentTime
Call scheduler to dispatch tasks
Update UI
25
Java Concurrency
Advanced Concepts
26
13
3/17/2020
Contents
Single thread design vs Multi-Thread Design
Java Executor Framework
Result bearing Jobs: Callable vs Runnable
Swing Concurrency Support
27
Job Execution
Web Application executing user jobs
1. Single thread design
Create one thread per application
Poor performance handling only one job at a time
Job 5
The other jobs are waiting for the previous to complete
Job 4
=> poor responsiveness
Job 3
Job 2
Thread
Job 1
28
14
3/17/2020
Job Execution
Web Application executing user jobs
2. Multiple threads design
Create one thread for each job
Improved performance – jobs are completed in parallel Job 5
Thread 5
!!! Job handling must be thread safe
Job 4
Thread 4
Job 3
Thread 3
Job 2
Thread 2
Job 1 Thread 1
29
Job Execution
Web Application executing user jobs
2. Multiple threads design
Unbound Threads Creation
1. Thread lifecycle overhead
Thread creation and tear down are not free
2. Resource consumption
Job 5
Creating more threads than the available processors does not help Thread 5
(it may even hurt : put pressure on garbage collector and compete for CPU) Job 4
Thread 4
Job 3
Thread 3
3. Stability
Each system has a limit of threads that can be created influenced by: Job 2
Thread 2
- JVM parameters, stack size, underlying OS, etc.
Job 1 Thread 1
- Exceed limit => OutOfMemoryError
30
15
3/17/2020
Job Execution
Web Application executing user jobs
Executor
Executor exec=Executors.newFixedThreadPool(100);
3. Executor Framework
Runnable task = new Runnable() {
public void run() {
Executor – simple interface that provides the basis for a //execute job
flexible and powerful framework }
};
Lifecycle support , hooks for adding statistics gathering,
exec.execute(task);
application management, and monitoring
Select the optimal policy at runtime, depending on the available
hardware
31
Job Execution
Web Application executing user jobs
3. Executor Framework- Thread pools
newFixedThreadPool - fixed size of threads
newCachedThreadPool -
newSingleThreadExecutor –one thread (automatically replaced if it dies). Tasks processed sequentially (FIFO, LIFO, priority, etc.)
newScheduledThreadPool – fixed sized; supports delayed and periodic execution
32
16
3/17/2020
Job Execution
Web Application executing user jobs
3. Executor Framework -Lifecycle
Non-daemon threads ( failing to shot down an Executor, could prevent JVM from exiting; i.e. thread continues to run even
after main tread terminates)
ExecutorService - interface extending Executor and providing methods for handling lifecycle management operations:
shutdown() //graceful shutdown – no new tasks are accepted; the previously submitted tasks are allowed to complete
shotdownNow() //abrupt shutdown - cancels all running tasks
isShotdown()
isTerminated() // after all tasks have terminated , the Executor transitions to terminated state;
awaitTermination(long timeout, Timeunit unit
33
Job Execution
Web Application executing user jobs
Callable vs Runnable
public interface Callable<V> {
V call() throws Exception;
4. Result bearing Jobs }
Callable vs Runnable public interface Runnable{
Callable = runnable on steroids void run();
}
Future – the result of submitting a Callable or a
Runnable task to the Executor Service
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException,
TimeoutException;
}
34
17
3/17/2020
Job Execution
Web Application executing user jobs
4. Result bearing Jobs
Executor service usage :
execute(Runnable) – executes the task – no way of obtaining the result
submit(Runnable) - returns a Future object – one can check if the Runnable has finished execution
future.get(); //returns null if the task has finished correctly.
submit(Callable) – returns a Future object
future.get(); //returns the value returned by call – it’s blocking
35
Swing Concurrency Support
public class SwingWorkerExample extends SwingWorker<Integer, Integer> {
SwingWorker @Override
protected Integer doInBackground() throws Exception {
Support for UI ( using FutureTask and Executor) : Thread.sleep(1000);
publish(1);
Cancellation
Thread.sleep(1000);
Completion notification publish(2);
Progress indication Thread.sleep(1000);
publish(3);
return 13;
doInBackground – executes the long job }
One can publish intermediate results (publish method) @Override
protected void done() {
done – called once the doInBackground finishes try {
JOptionPane.showMessageDialog(null, get());
One can access the result by calling get() (see Futures) } catch (Exception e) {
e.printStackTrace();
process – called asynchronously to process the published }
information }
* UI components should only be allocated in done or process @Override
protected void process(List<Integer> v) {
which are executed on the Event Dispatch Thread for (int i=0; i < v.size(); i++) {
System.out.println("received values: " + v.get(i));
}
}
}
36
18
3/17/2020
References
[1] https://docs.oracle.com/javase/tutorial/essential/concurrency/index.html
[2] https://docs.oracle.com/javase/8/docs/api/java/util/Timer.html
[3] http://www.tutorialspoint.com/java/util/timer_schedule_period.htm
[4] http://www.javacodegeeks.com/2013/01/java-thread-pool-example-using-executors-
and-threadpoolexecutor.html
[5] B. Goetz et al., Java Concurrency in Practice, Addison-Wesley Professional; 1 edition (May
19, 2006)
37
19