Java Multithreading | Part 5
Published on July 22, 2024 • 12 min read

Basic Thread Handling in Java
Let's start by exploring various thread handling methods with examples in Java.
- Joining Threads
- Daemon Threads
- Sleeping Threads
- Interrupting Threads
Joining Threads
A thread is always created by another thread except for the main application thread. Let's study the following code snippet:
class Demonstration {
public static void main(String args[]) throws InterruptedException {
ExecuteMe executeMe = new ExecuteMe();
Thread innerThread = new Thread(executeMe);
innerThread.setDaemon(true);
innerThread.start();
}
}
class ExecuteMe implements Runnable {
public void run() {
while (true) {
System.out.println("Say Hello over and over again.");
try {
Thread.sleep(500);
} catch (InterruptedException ie) {
// swallow interrupted exception
}
}
}
}
If you execute the above code, you'll see no output because the main thread exits right after starting the innerThread
. Once it exits, the JVM also kills the spawned thread. If we want the main thread to wait for the innerThread
to finish before proceeding, we can use the join
method:
Thread innerThread = new Thread(executeMe);
innerThread.start();
innerThread.join();
Daemon Threads
A daemon thread runs in the background but is killed by the JVM as soon as the main application thread exits. A thread can be marked daemon as follows:
innerThread.setDaemon(true);
If a spawned thread isn't marked as a daemon, the JVM will wait for it to finish before tearing down the process.
Sleeping Threads
A thread can be made dormant for a specified period using the sleep
method. However, avoid using sleep
for thread coordination. Here is an example:
class SleepThreadExample {
public static void main(String args[]) throws Exception {
ExecuteMe executeMe = new ExecuteMe();
Thread innerThread = new Thread(executeMe);
innerThread.start();
innerThread.join();
System.out.println("Main thread exiting.");
}
static class ExecuteMe implements Runnable {
public void run() {
System.out.println("Hello. innerThread going to sleep");
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
// swallow interrupted exception
}
}
}
}
In this example, the innerThread
sleeps for 1 second, and the main thread exits only after innerThread
is done processing.
Interrupting Threads
To handle a rogue thread that sleeps forever or goes into an infinite loop, we can use the interrupt
method. Here's an example:
class HelloWorld {
public static void main(String args[]) throws InterruptedException {
ExecuteMe executeMe = new ExecuteMe();
Thread innerThread = new Thread(executeMe);
innerThread.start();
// Interrupt innerThread after waiting for 5 seconds
System.out.println("Main thread sleeping at " + System.currentTimeMillis() / 1000);
Thread.sleep(5000);
innerThread.interrupt();
System.out.println("Main thread exiting at " + System.currentTimeMillis() / 1000);
}
static class ExecuteMe implements Runnable {
public void run() {
try {
// sleep for a thousand minutes
System.out.println("innerThread goes to sleep at " + System.currentTimeMillis() / 1000);
Thread.sleep(1000 * 1000);
} catch (InterruptedException ie) {
System.out.println("innerThread interrupted at " + System.currentTimeMillis() / 1000);
}
}
}
}
Executor Framework
In Java, the primary abstraction for executing logical task units is the Executor framework, not the Thread
class. The classes in the Executor framework separate:
- Task Submission
- Task Execution
A task can be represented by an object of a class implementing the
Runnable
interface
The framework allows specifying different policies for task execution. Java offers three interfaces to manage thread lifecycle:
- Executor Interface
- ExecutorService
- ScheduledExecutorService
The Executor interface forms the basis for the asynchronous task execution framework in Java.
Executor Interface
You don't need to create your own executor class as Java's
java.util.concurrent
package offers several types of executors suitable for different scenarios.
Thread Pools
Thread pools in Java implement the Executor interface. They decouple task submission from execution, allowing flexible configuration and seamless switching between executors.
A thread pool has worker threads assigned to tasks. Once a task is finished, the thread returns to the pool. Thread pools are typically bound to a queue from which tasks are dequeued for execution.
Benefits of Thread Pools
- No Latency: Threads are pre-created, eliminating creation delays.
- Memory Management: Limits prevent excessive memory usage.
- Throughput Control: Fine-tuning ensures optimal system performance.
- Graceful Degradation: The system manages load without crashing.
Let's understand the need of ThreadPool with an example. Let's say you are asked to design a method to process client purchase orders promptly. Let's explore the possible approaches.
Sequential Approach
In the sequential approach, the method accepts an order and processes it before moving to the next one. This method blocks other requests until the current request is processed.
void receiveAndExecuteClientOrders() {
while (true) {
Order order = waitForNextOrder();
order.execute();
}
}
This code processes each order sequentially and lacks responsiveness and throughput.
Unbounded Thread Approach
A novice might improve the sequential approach by spawning a new thread for each order.
void receiveAndExecuteClientOrdersBetter() {
while (true) {
final Order order = waitForNextOrder();
Thread thread = new Thread(new Runnable() {
public void run() {
order.execute();
}
});
thread.start();
}
}
This approach accepts an order and creates a thread to handle its execution. However, it has significant drawbacks:
- Thread creation and teardown are costly.
- Active threads consume memory even when idle.
- There's a limit on the number of threads the JVM and OS can handle.
- The application can become unresponsive with a high volume of requests, leading to a backlog and potential crashes.
Clearly the above two approaches are not efficient to handle this situation. Next, we will see how to over these issues using Thread Pools.
Using Thread Pools
void receiveAndExecuteClientOrdersBest() {
int expectedConcurrentOrders = 100;
Executor executor = Executors.newFixedThreadPool(expectedConcurrentOrders);
while (true) {
final Order order = waitForNextOrder();
executor.execute(new Runnable() {
public void run() {
order.execute();
}
});
}
}
The example uses a fixed thread pool to handle client orders efficiently.
Types of Thread Pools
Java offers various preconfigured thread pool implementations, each suitable for different use cases. We'll explore the key types available in the java.util.concurrent
package.
Fixed Thread Pool
newFixedThreadPool
creates a pool with a fixed number of threads. Any number of tasks can be submitted, and completed tasks allow threads to be reused.
ExecutorService fixedPool = Executors.newFixedThreadPool(10);
Single Thread Executor
newSingleThreadExecutor
uses a single worker thread to process tasks sequentially. If the thread dies, it is replaced with a new one.
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
Cached Thread Pool
newCachedThreadPool
creates new threads as needed and reuses idle ones. It terminates threads that remain idle to conserve memory, making it ideal for short-lived tasks.
ExecutorService cachedPool = Executors.newCachedThreadPool();
Scheduled Thread Pool
newScheduledThreadPool
allows tasks to be executed periodically or after a delay.
ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(5);
ForkJoinPool
newWorkStealingPool
is used for tasks that split into subtasks and combine results upon completion, applying the divide and conquer paradigm.
ExecutorService forkJoinPool = Executors.newWorkStealingPool();
Executor Lifecycle
Executors have distinct lifecycle stages:
- Running: Accepting new tasks and processing them.
- Shutting Down: No longer accepting new tasks but completing existing ones.
- Terminated: All tasks are finished, and the executor is shut down.
Executors can shut down gracefully, allowing tasks to complete, or abruptly, canceling ongoing tasks.
ThreadPoolExecutor
While the ThreadPoolExecutor
class offers fine-tuning options for unusual use cases, it is generally recommended to use thread pools from the Executors
factory methods due to their pre-configured settings for common scenarios. Here are some of the provided thread pools:
Executors.newCachedThreadPool()
: Unbounded thread pool with automatic thread reclamation.Executors.newFixedThreadPool(int)
: Fixed size thread pool.Executors.newSingleThreadExecutor()
: Single background thread.Executors.newScheduledThreadPool(int)
: Fixed size thread pool supporting delayed and periodic task execution.
It helps to create customizable ThreadPool
To effectively utilize a ThreadPoolExecutor
, it is crucial to understand the parameters required to instantiate it. Let's see the constructor and each of its arguments:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler)
Core Parameters
corePoolSize
The number of threads to keep in the pool, even if they are idle.
- allowCoreThreadTimeOut: If set to
TRUE
(default isFALSE
), idle core threads will terminate after the keep-alive time.
keepAliveTime:
The time that threads in excess of the core pool size will wait for new tasks before terminating.
maximumPoolSize:
The maximum number of threads allowed in the pool. If the number of threads is equal to corePoolSize and the queue is full, new threads will be created up to this limit.
TimeUnit
The time unit for the keepAliveTime
, such as milliseconds, seconds, or hours.
BlockingQueue
The queue used to hold tasks before they are executed by worker threads.
- Bounded Queue: A queue with fixed capacity (e.g.,
ArrayBlockingQueue
). - Unbounded Queue: A queue with no fixed capacity (e.g.,
LinkedBlockingQueue
).
ThreadFactory
A factory for creating new threads. This provides customization options such as:
- Custom thread names
- Custom thread priorities
- Setting the thread daemon flag
RejectedExecutionHandler
A handler for tasks that cannot be accepted by the thread pool. It is useful for logging or debugging purposes.
- AbortPolicy: Throws
RejectedExecutionException
. - CallerRunsPolicy: Executes the rejected task in the caller's thread.
- DiscardPolicy: Silently discards the rejected task.
- DiscardOldestPolicy: Discards the oldest task in the queue to accommodate the new task.
Example of ThreadPoolExecutor
package multithreading;
import java.util.concurrent.*;
public class ThreadPoolExecutorExample {
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2,5,1,
TimeUnit.MINUTES,
new LinkedBlockingDeque<>(3),
new CustomThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
threadPoolExecutor.allowCoreThreadTimeOut(true);
int i = 0;
try {
// submit ten tasks
for (; i < 10; i++) {
int finalI = i;
threadPoolExecutor.submit(new Runnable() {
@Override
public void run() {
System.out.println("Task: " + (finalI + 1) + " Executing by worker thread :" + Thread.currentThread().getName() );
try {
// simulate work by sleeping for 1 second
Thread.sleep(1000);
} catch (InterruptedException ie) {
// ignore for now
}
}
});
}
} catch (RejectedExecutionException ree) {
// Let's see which task gets rejected
System.out.println("Task " + (i + 1) + " rejected.");
} finally {
threadPoolExecutor.shutdown();
}
}
}
class CustomThreadFactory implements ThreadFactory{
private int threadId = 1;
@Override
public Thread newThread(Runnable r) {
Thread th = new Thread(r);
th.setName("CustomThread-" + threadId++);
return th;
}
}
Output:
Task 9 rejected.
Task: 6 Executing by worker thread :CustomThread-3
Task: 7 Executing by worker thread :CustomThread-4
Task: 2 Executing by worker thread :CustomThread-2
Task: 1 Executing by worker thread :CustomThread-1
Task: 8 Executing by worker thread :CustomThread-5
Task: 3 Executing by worker thread :CustomThread-4
Task: 4 Executing by worker thread :CustomThread-3
Task: 5 Executing by worker thread :CustomThread-5
The reason your code is not showing tasks starting from 9 is due to the ThreadPoolExecutor.AbortPolicy()
rejection policy. When the queue is full and the maximum number of threads are already in use, the AbortPolicy
will reject any new tasks, resulting in the RejectedExecutionException
.
Let's break down the situation:
- Core and Maximum Pool Size: I have set the core pool size to 2 and the maximum pool size to 5.
- Blocking Queue: I have provided a
LinkedBlockingDeque
with a capacity of 3. - Task Submission: You're trying to submit 10 tasks.
Execution Flow
- First 2 Tasks: Immediately picked up by the two core threads.
- Next 3 Tasks: Placed in the queue, as the core threads are busy.
- Tasks 6-8: New threads are created up to the maximum pool size of 5 to handle these tasks.
- Beyond Task 8: The pool is at its maximum capacity (5 threads) and the queue is full (3 tasks). The
AbortPolicy
will then reject any additional tasks.
Fixing the Issue
To observe the execution of all tasks, you can either:
- Increase the Queue Size: This will allow more tasks to be queued before they get rejected.
- Change the Rejection Policy: Use a different
RejectedExecutionHandler
to handle rejected tasks in a different manner. - Increase the Maximum Pool Size: Allow more threads to handle more tasks concurrently.
Fixing the issue using different Rejection Policy
- CallerRunsPolicy:
Executes the rejected task in the caller's thread.
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2,5,1,
TimeUnit.MINUTES,
new LinkedBlockingDeque<>(3),
new CustomThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy());
Output:
Task: 1 Executing by worker thread :CustomThread-1
Task: 7 Executing by worker thread :CustomThread-4
Task: 9 Executing by worker thread :main
Task: 2 Executing by worker thread :CustomThread-2
Task: 6 Executing by worker thread :CustomThread-3
Task: 8 Executing by worker thread :CustomThread-5
Task: 4 Executing by worker thread :CustomThread-3
Task: 10 Executing by worker thread :CustomThread-2
Task: 3 Executing by worker thread :CustomThread-1
Task: 5 Executing by worker thread :CustomThread-5
Notice that in the output of the above program, when the thread pool can’t accept any more tasks, the main thread that is submitting the tasks, is itself pulled-in to execute the submitted task. Consequently, the submission of new tasks slows down as the main thread now executes the task itself.
- DiscardPolicy:
Silently discards the rejected task.
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2,5,1,
TimeUnit.MINUTES,
new LinkedBlockingDeque<>(3),
new CustomThreadFactory(),
new ThreadPoolExecutor.DiscardPolicy());
Output:
Task: 8 Executing by worker thread :CustomThread-5
Task: 6 Executing by worker thread :CustomThread-3
Task: 2 Executing by worker thread :CustomThread-2
Task: 1 Executing by worker thread :CustomThread-1
Task: 7 Executing by worker thread :CustomThread-4
Task: 3 Executing by worker thread :CustomThread-4
Task: 4 Executing by worker thread :CustomThread-5
Task: 5 Executing by worker thread :CustomThread-2
- DiscardOldestPolicy:
Discards the oldest task in the queue to accommodate the new task.
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2,5,1,
TimeUnit.MINUTES,
new LinkedBlockingDeque<>(3),
new CustomThreadFactory(),
new ThreadPoolExecutor.DiscardPolicy());
Output:
Task: 8 Executing by worker thread :CustomThread-5
Task: 6 Executing by worker thread :CustomThread-3
Task: 7 Executing by worker thread :CustomThread-4
Task: 2 Executing by worker thread :CustomThread-2
Task: 1 Executing by worker thread :CustomThread-1
Task: 5 Executing by worker thread :CustomThread-4
Task: 9 Executing by worker thread :CustomThread-3
Task: 10 Executing by worker thread :CustomThread-1
Shutting Down
To shut down a ThreadPoolExecutor
, call the shutdown()
method. If a pool is no longer referenced and has no active threads, it will automatically shut down. If shutdown()
isn't called, configure the pool to ensure idle threads are eventually terminated by setting corePoolSize
to zero and specifying an appropriate keepAliveTime
. Alternatively, if corePoolSize
is non-zero, use the allowCoreThreadTimeOut(boolean)
method to apply the timeout policy to both core and non-core threads.