We will first go over some basic consideration to decide whether and when we should use threads. Then we will talk about thread pools and the thread pool implementation provided by Java. Java provides some convenient methods for building ExecutorService backed by thread pool. Specifically, we will talk about Executors.newFixedThreadPool() and the pitfall related to this method that makes it unsuitable for produciton.
In what situations should we use threads
Before we even get into looking at using Java thread API properly, we should think about if we even need to use threads at all. What would be the appropriate situations to use threads? Because it is not uncommon to encounter scenarios where threads are used, but it is not necessary, nor helpful to use them in the first place.
We are going to mention 2 very common scenarios where threads are usually used and the second scenario is actually what we are more interested in:
First scenario is when we are writing GUI program where it is common to have a dedicated thread to handle UI events which may hand off heavy workload to other threads so the program can stay responsive.
The other scenario is that in a very common workload pattern, we are going to encounter a part of workload that will not require much CPU usage. The most common cases are any IO operations such as network access or file access. In this case, by putting our IO workloads onto different threads, we allow our OS or Runtime the opportunities to swap out those threads waiting on non-CPU work, and put in another thread that potentially require CPU. So, we can see the reason we want to use threads here is to achieve high CPU utilization and avoid CPU idle time.
To elaborate on this later scenario further, it seems like in general if our execution ever encounter any workload that will not require CPU then we should put it on a new thread. However, in practice, it is not practical to do so either due to the amount of work required or due to that every time we try to make use of threads there is also some overhead cost involved.
Besides the earlier mentioned practicality reason, there are also other scenarios we should consider that makes it not very helpful to use threads even if the execution pattern is the typical CPU intensive work followed by non-CPU work. One mundane case would be that in your program there is only one stream of execution and there is simply no other work to do.
Another more interesting case is that assume you have multiple parallel identical tasks happening with the typical CPU work then non-CPU work pattern. If your execution is non-conditional, in another word each of the identical task needs to do the CPU work then non-CPU work with no exception, then in this case it is still not helpful to use threads. The reason is that the non-CPU work will likely to be the obvious bottleneck here and it has one-to-one relationship with the CPU work. So, even if you use threads to finish the CPU work quickly, overall, your program will still need to wait for the non-CPU work to complete.
What is even more interesting about this case is that if we actually use threads here to handle the non-CPU work, in certain cases it will lead to potential memory leak and other undesirable behaviors. This is actually the critical issue we are going to explain and address in later part of the article.
With the above scenarios considered, if we want to take advantage of using threads to improve CPU utilization. We typically need a workload that is first CPU intensive and later will conditionally perform some non-CPU work. By placing the conditional non-CPU work on a thread, we can benefit from finishing those pure CPU tasks more quickly since they would not get stuck waiting for threads with non-CPU work hogging the CPU.
What if we simply spawn new threads and why we should use thread pools
Let’s look at the most basic way to use thread. That is, simply create a new thread and start it.
new Thread(someRunnableDoingIO).start();
Commonly, this code may go along with some server request processing code, or it may be part of some background processing job.
Even though this way of using threads seems simple and logically correct, but in operation, this can commonly lead to serious performance problem. The issue here is that there is no limitation to the number of threads we can spawn here. As more threads are spawned, it is going to use up more and more memory, and the performance of the entire program could come to a near halt due to the context switch between the threads.
To address this issue, the concept of thread pooling is introduced. A thread pool usually define a fixed limit on number of threads that can be active. If the maximum number of threads in a thread pool are all being used to execute some work, then new work that comes in will have to wait for a thread in the thread pool to free up in order to get executed. With the thread count limitation in place, it prevents the issues mentioned earlier where we could spawn way too many threads which could kill our program performance.
The Java API offers an implementation of thread pooling via the ThreadPoolExecutor class. The ThreadPoolExecutor class is commonly exposed through the ExecutorService interface. Java also provides some convenient methods for building ExecutorService backed by a thread pool. Most commonly there is the Executors.newFixedThreadPool() method that can return us an ExecutorService back by a thread pool with fixed number of threads. However, there is a serious flaw with this ExecutorService returned by the method that makes it not usable in production. We are going to examine this issue next and go over the solution.
About Executors.newFixedThreadPool() and why you should not use it in a production environment
There are several methods in java.util.concurrent.Executors providing simple calls to construct ExecutorService. We are more interested in ones related to thread pooling. So specifically, we will focus on looking at:
Executors.newFixedThreadPool(int nThreads)
Here is the JavaDoc on this method:
Creates a thread pool that reuses a fixed number of threads operating off a shared unbounded queue. At any point, at most nThreads threads will be active processing tasks. If additional tasks are submitted when all threads are active, they will wait in the queue until a thread is available. If any thread terminates due to a failure during execution prior to shutdown, a new one will take its place if needed to execute subsequent tasks. The threads in the pool will exist until it is explicitly shutdown.
First thing to notice here is probably what most would guess, this method returns an ExecutorService that is backed by a thread pool with a maximum of nThreads threads.
The important thing to keep in mind here is the number of available threads is fixed. So, as we submit more and more work for the ExecutorService, eventually we would have all the threads occupied and there would be no more available thread to take on the new work we are trying to submit. In that case, as the JavaDoc mentioned, the ExecutorService hold a queue and the newly submitted work will be placed onto this queue.
At first this may seem to be all fine, but there is one detail here that will cause issues in operation. Notice the doc mentioned it create an “unbounded” queue for us. Which means if the rate new work is being submitted to the ExecutorService outpace the rate works are being consumed by the threads in ExecutorService. Then this unbounded queue will just keep building up. This can first cause a potential memory leak situation. Second, the queue being built up will probably hold a lot of our unprocessed data. In case of a program crash, we are at risk of losing a large amount of data being held in the queue.
Let’s look at an example to explain the problem in details:
NewFixedThreadPoolUsage.java:
package ca.justsomethoughts.example;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class NewFixedThreadPoolUsage {
private static final Logger LOGGER = LoggerFactory.getLogger(NewFixedThreadPoolUsage.class);
private static final int THREAD_COUNT = 10;
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(THREAD_COUNT);
for (int i = 0; i < 1000000; ++i) {
executorService.execute(new NonCpuWork(i));
LOGGER.info("{} submitted to executorService.", i);
}
}
}
NonCpuWork.java:
package ca.justsomethoughts.example;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class NonCpuWork implements Runnable {
private static final Logger LOGGER = LoggerFactory.getLogger(NonCpuWork.class);
private static final int WORK_MILLISECONDS = 3000;
private int id;
public NonCpuWork(int id) {
this.id = id;
}
public void run() {
try {
LOGGER.info("Starting thread {}.", id);
// This sleep just represent some non-CPU work. Commonly, IO such as file read write,
// network read write, etc.
Thread.sleep(WORK_MILLISECONDS);
LOGGER.info("Finished thread {}.", id);
} catch (InterruptedException e) {
LOGGER.debug("Interrupted.", e);
}
}
}
Here is an excerpt of the output we will examine to explain the issue:
...lines omitted...
[main] INFO ca.justsomethoughts.example.NewFixedThreadPoolUsage - 999980 submitted to executorService.
[main] INFO ca.justsomethoughts.example.NewFixedThreadPoolUsage - 999981 submitted to executorService.
[main] INFO ca.justsomethoughts.example.NewFixedThreadPoolUsage - 999982 submitted to executorService.
[main] INFO ca.justsomethoughts.example.NewFixedThreadPoolUsage - 999983 submitted to executorService.
[main] INFO ca.justsomethoughts.example.NewFixedThreadPoolUsage - 999984 submitted to executorService.
[main] INFO ca.justsomethoughts.example.NewFixedThreadPoolUsage - 999985 submitted to executorService.
[main] INFO ca.justsomethoughts.example.NewFixedThreadPoolUsage - 999986 submitted to executorService.
[main] INFO ca.justsomethoughts.example.NewFixedThreadPoolUsage - 999987 submitted to executorService.
[main] INFO ca.justsomethoughts.example.NewFixedThreadPoolUsage - 999988 submitted to executorService.
[main] INFO ca.justsomethoughts.example.NewFixedThreadPoolUsage - 999989 submitted to executorService.
[main] INFO ca.justsomethoughts.example.NewFixedThreadPoolUsage - 999990 submitted to executorService.
[main] INFO ca.justsomethoughts.example.NewFixedThreadPoolUsage - 999991 submitted to executorService.
[main] INFO ca.justsomethoughts.example.NewFixedThreadPoolUsage - 999992 submitted to executorService.
[main] INFO ca.justsomethoughts.example.NewFixedThreadPoolUsage - 999993 submitted to executorService.
[main] INFO ca.justsomethoughts.example.NewFixedThreadPoolUsage - 999994 submitted to executorService.
[main] INFO ca.justsomethoughts.example.NewFixedThreadPoolUsage - 999995 submitted to executorService.
[main] INFO ca.justsomethoughts.example.NewFixedThreadPoolUsage - 999996 submitted to executorService.
[main] INFO ca.justsomethoughts.example.NewFixedThreadPoolUsage - 999997 submitted to executorService.
[main] INFO ca.justsomethoughts.example.NewFixedThreadPoolUsage - 999998 submitted to executorService.
[main] INFO ca.justsomethoughts.example.NewFixedThreadPoolUsage - 999999 submitted to executorService.
[pool-1-thread-3] INFO ca.justsomethoughts.example.NonCpuWork - Finished thread 12.
[pool-1-thread-1] INFO ca.justsomethoughts.example.NonCpuWork - Finished thread 11.
[pool-1-thread-3] INFO ca.justsomethoughts.example.NonCpuWork - Starting thread 20.
[pool-1-thread-1] INFO ca.justsomethoughts.example.NonCpuWork - Starting thread 21.
[pool-1-thread-8] INFO ca.justsomethoughts.example.NonCpuWork - Finished thread 15.
[pool-1-thread-8] INFO ca.justsomethoughts.example.NonCpuWork - Starting thread 22.
...more...
With our ExecutorService set to a fixed count of 10 threads, what we would expect to happen in our run is in the beginning the 10 threads will quickly get assigned to execute 10 instances of NonCpuWork. Because all threads in the ExecutorService are now occupied, new jobs submitted will now be placed in the internal queue in the ExecutorService. As the first 10 NonCpuWork are taking quite a while to finish, we can see in the output above our loop would proceed with queuing all the jobs in the ExecutorService. After all the jobs are queued the first 10 NonCpuWork finally finishes and another 10 jobs in the queue get executed.
This behavior is not suitable for production for the 2 reasons mentioned earlier. Imagine each instance of NonCpuWork holds a considerable amount of data, then we can see this accumulation of NonCpuWork in the ExecutorService can be a potential cause of out of memory error. The second reason, as mentioned earlier, is that with the queue build up and our program crashes we would lose a lot of data or miss important work that needed to be processed.
In a real world situation where I encountered this issue, each of the NonCpuWork could be a message pulled of a JMS message queue and converted to a NonCpuWork class. In the example above, imagine the looping thread is actually a JMS client thread pulling messages off JMS queue. Then here we really are just pulling messages off the JMS queue way too quickly and queuing them up in our ExecutorService because NonCpuWork cannot finish fast enough to keep pace with the speed new jobs are being queued. In another word, our program is just taking work way too fast from the JMS queue than it can actually handle. Also, to the second point we mentioned earlier, if the program crashes with a lot of NonCpuWork sitting in the queue in the ExecutorService, each of these NonCpuWork was just converted from a JMS message, it would be equivalent to losing those JMS messages.
For these reasons, Executors.newFixedThreadPool is actually not suitable for production. Next, we will look at the proper way to take advantage of ThreadPoolExecutor in order to implement a thread pool that is less likely to run into production issues.
How should we use the thread pool implementation, ThreadPoolExecutor, provided by Java API in a production ready way
We went over that if we simply use the convenient method Executors.newFixedThreadPool, it will return us an ExecutorService back by a thread pool. However, this ExecutorService has some serious flaws that makes it inappropriate for production. The root cause of these issues is because the ExecutorService uses an unbounded queue that will allow it to hold on to infinite number of jobs.
Therefore, to properly take advantage of the thread pool provided by Java, we would need to build our thread pool with a limited queue, and the Java API does provide a way to do so. We should use the ThreadPoolExecutor which is basically an implementation of ExecutorService that is backed by a thread pool, and we will use the following constructor to demonstrate how to use this class in a way more suitable for production:
ThreadPoolExecutor(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler
)
Let’s take a look at the parameters:
corePoolSize is the number of threads that will be created and kept in the thread pool no matter current workload of the ThreadPoolExecutor.
maximumPoolSize is the maximum number of threads the ThreadPoolExecutor will hold.
keepAliveTime together with unit define the idle time before a thread gets deallocated. This deallocation will only happen when the ThreadPoolExecutor is holding more threads than the core pool size.
workQueue allows us to pass in the queue that will be holding the work/job to be executed on the threads.
handler is a RejectedExecutionHandler that allows us to specify how to handle the case when the ThreadPoolExecutor rejects handling some submitted work for any reason.
As mentioned earlier, we want to limit the number of jobs the ThreadPoolExecutor can hold. We can accomplish this by providing a workQueue that can only hold a specified maximum number of items. Java provides a few classes that implements the BlockingQueue interface. Here we will simply use the LinkedBlockingQueue class.
We can create a LinkedBlockingQueue with a specific capacity:
new LinkedBlockingQueue(int capacity)
Using this BlockingQueue in our ThreadPoolExecutor will ensure that our ThreadPoolExecutor will no longer accept infinite amount of jobs, but what happens when our workQueue got filled up to capacity? This is where the RejectedExecutionHandler come in to play. When our ThreadPoolExecutor needs to reject a job for any reason, including when the workQueue is filled up to capacity. Java will call RejectedExecutionHandler.rejectExecution from the thread that tries to submit the new work. So, here in our rejectExecution what we should do is first check if the reason for rejection is because workQueue is full. If we confirm workQueue being full is the reason for rejection, then because rejectExecution is executed on the thread that tried to submit the new work, we can wait a few seconds and try to submit the work again. Here is an example:
ThreadPoolExecutorUsage.java:
package ca.justsomethoughts.example;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ThreadPoolExecutorUsage {
private static final Logger LOGGER = LoggerFactory.getLogger(ThreadPoolExecutorUsage.class);
private static final int corePoolSize = 5;
public static void main(String[] args) {
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>(corePoolSize * 2);
RejectedExecutionHandler handler = new WaitAndRetryRejectedExecutionHandler();
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, corePoolSize * 2,
1, TimeUnit.MINUTES, workQueue, handler);
for (int i = 0; i < 1000000; ++i) {
threadPoolExecutor.execute(new NonCpuWork(i));
LOGGER.info("{} submitted to executorService.", i);
}
}
}
NonCpuWork.java:
package ca.justsomethoughts.example;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class NonCpuWork implements Runnable {
private static final Logger LOGGER = LoggerFactory.getLogger(NonCpuWork.class);
private static final int WORK_MILLISECONDS = 3000;
private int id;
public NonCpuWork(int id) {
this.id = id;
}
public void run() {
try {
LOGGER.info("Starting thread {}.", id);
// This sleep just represent some non-CPU work. Commonly, IO such as file read write,
// network read write, etc.
Thread.sleep(WORK_MILLISECONDS);
LOGGER.info("Finished thread {}.", id);
} catch (InterruptedException e) {
LOGGER.debug("Interrupted.", e);
}
}
}
WaitAndRetryRejectedExecutionHandler.java:
package ca.justsomethoughts.example;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class WaitAndRetryRejectedExecutionHandler implements RejectedExecutionHandler {
private static final Logger logger =
LoggerFactory.getLogger(WaitAndRetryRejectedExecutionHandler.class);
private static final int WAIT_MILLISECONDS = 2000;
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (!executor.isShutdown()) {
try {
Thread.sleep(WAIT_MILLISECONDS);
executor.execute(r);
} catch (InterruptedException e) {
logger.warn("Interrupted waiting to re-submit runnable.", e);
}
}
}
}
Excerpt of the program output:
...lines omitted...
[pool-1-thread-3] INFO ca.justsomethoughts.example.NonCpuWork - Starting thread 24.
[pool-1-thread-9] INFO ca.justsomethoughts.example.NonCpuWork - Finished thread 8.
[pool-1-thread-9] INFO ca.justsomethoughts.example.NonCpuWork - Starting thread 25.
[pool-1-thread-7] INFO ca.justsomethoughts.example.NonCpuWork - Finished thread 12.
[pool-1-thread-7] INFO ca.justsomethoughts.example.NonCpuWork - Starting thread 26.
[pool-1-thread-2] INFO ca.justsomethoughts.example.NonCpuWork - Finished thread 11.
[pool-1-thread-10] INFO ca.justsomethoughts.example.NonCpuWork - Finished thread 10.
[pool-1-thread-2] INFO ca.justsomethoughts.example.NonCpuWork - Starting thread 27.
[pool-1-thread-5] INFO ca.justsomethoughts.example.NonCpuWork - Finished thread 7.
[pool-1-thread-10] INFO ca.justsomethoughts.example.NonCpuWork - Starting thread 28.
[pool-1-thread-5] INFO ca.justsomethoughts.example.NonCpuWork - Starting thread 29.
[main] INFO ca.justsomethoughts.example.ThreadPoolExecutorUsage - 30 submitted to executorService.
[main] INFO ca.justsomethoughts.example.ThreadPoolExecutorUsage - 31 submitted to executorService.
[main] INFO ca.justsomethoughts.example.ThreadPoolExecutorUsage - 32 submitted to executorService.
[main] INFO ca.justsomethoughts.example.ThreadPoolExecutorUsage - 33 submitted to executorService.
[main] INFO ca.justsomethoughts.example.ThreadPoolExecutorUsage - 34 submitted to executorService.
[main] INFO ca.justsomethoughts.example.ThreadPoolExecutorUsage - 35 submitted to executorService.
[main] INFO ca.justsomethoughts.example.ThreadPoolExecutorUsage - 36 submitted to executorService.
[main] INFO ca.justsomethoughts.example.ThreadPoolExecutorUsage - 37 submitted to executorService.
[main] INFO ca.justsomethoughts.example.ThreadPoolExecutorUsage - 38 submitted to executorService.
[main] INFO ca.justsomethoughts.example.ThreadPoolExecutorUsage - 39 submitted to executorService.
[pool-1-thread-4] INFO ca.justsomethoughts.example.NonCpuWork - Finished thread 20.
[pool-1-thread-4] INFO ca.justsomethoughts.example.NonCpuWork - Starting thread 30.
[pool-1-thread-8] INFO ca.justsomethoughts.example.NonCpuWork - Finished thread 21.
[pool-1-thread-8] INFO ca.justsomethoughts.example.NonCpuWork - Starting thread 31.
[pool-1-thread-3] INFO ca.justsomethoughts.example.NonCpuWork - Finished thread 24.
...more...
This execution pattern is a lot more desirable. The ThreadPoolExecutor will accept new jobs to run until all threads are occupied then it will also accept a limited number of jobs to hold in the internal workQueue. After that it will reject any new job. In our WaitAndRetryRejectedExecutorHandler, we make the submitter wait and retry until the ThreadPoolExecutor is able to handle more job.
Most importantly, our ThreadPoolExecutor will no longer accept any new job without limitation. It will accept jobs and execute jobs in batches. This solves issues that were present when using the ExecutorService returned by Executors.newFixedThreadPool(). Our program is less likely to run out of memory because the ExecutorService could be holding on to a large number of jobs, and we alleviate the problem if the program crash we lose everything being held in the ExecutorService internal workQueue. We could still lose some data, but not as serious.
In the real live scenario I was working with, which I mentioned previously, this also have another important implication. Without this wait and retry mechanism, we could be pulling way too many JMS messages off the JMS queue, and only end up shoving them into the ExecutorService. With this mechanism in place, it effectively works like a throttle so our program would not pull off more messages than it can actually handle. That means you would have a more accurate view of your whole system when you look at the message count of the JMS queue. Otherwise, this number could become a little bit obscured due to too many messages being pulled off.
Conclusion
We first looked at some situations where it is effective to use threads and some situations where it is not effective to use threads. Second, we looked at how issues could occur if we simply spawn new threads whenever we need them. Next, we looked at how using thread pooling could some of those issues. Then, we looked at Executors.newFixedThreadPool offered by Java API and the pitfall with using ExecutorService returned from this method call. Finally, we look at how to use ThreadPoolExecutor to implement a thread pool that is more ready for production operation.
Leave a comment