ForkJoinPool Example
ForkJoinPool Example
ForkJoinPool Example
There were few homegrown frameworks that would distribute the work across multiple cores and then join them to return the
result set. Java 7 has incorporated this feature as a Fork and Join framework.
Basically the Fork-Join breaks the task at hand into mini-tasks until the mini-task is simple enough
that it can be solved without further breakups. Its like a divide-and-conquer algorithm. One important
concept to note in this framework is that ideally no worker thread is idle. They implement a workstealing algorithm in that idle workers steal the work from those workers who are busy.
Its based on the work of Doug Lea, a thought leader on Java concurrency. Fork/Join deals with the
threading hassles; you just indicate to the framework which portions of the work can be broken apart and
handled recursively. It employs pseudocode (as taken from Doug Leas paper on the subject):
Result solve(Problem problem) {
if (problem is small)
directly solve problem
else {
split problem into independent parts
fork new subtasks to solve each part
join all subtasks
compose result from subresults
}
}
Discussion Points
1) Core Classes used in Fork/Join Framework
i) ForkJoinPool
ii) ForkJoinTask
2) Example Implementations of Fork/Join Pool Framework
i) Implementation Sourcecode
ii) How it works?
3) Difference between Fork/Join Framework And ExecutorService
4) Existing Implementations in JDK
5) Conclusion
ForkJoinPool
The ForkJoinPool is basically a specialized implementation of ExecutorService implementing
the work-stealing algorithm we talked about above. We create an instance of ForkJoinPool by
providing the target parallelism level i.e. the number of processors as shown below:
ForkJoinTask
This is an abstract class for creating tasks that run within a ForkJoinPool. The Recursiveaction
and RecursiveTask are the only two direct, known subclasses of ForkJoinTask. The only
difference between these two classes is that the RecursiveAction does not return a value while
RecursiveTask does have a return value and returns an object of specified type.
In both cases, you would need to implement the compute method in your subclass that
performs the main computation desired by the task.
The ForkJoinTask class provides several methods for checking the execution status of a task. The
isDone() method returns true if a task completes in any way. The isCompletedNormally() method
returns true if a task completes without cancellation or encountering an exception, and isCancelled()
returns true if the task was cancelled. Lastly, isCompletedabnormally() returns true if the task was either
cancelled or encountered an exception.
Implementation Sourcecode
FolderProcessor.java
package forkJoinDemoAsyncExample;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.RecursiveTask;
public class FolderProcessor extends RecursiveTask<List<String>>
{
private static final long serialVersionUID = 1L;
//This attribute will store the full path of the folder this task is going to
process.
private final String
path;
//This attribute will store the name of the extension of the files this task is
going to look for.
private final String
extension;
//Implement the constructor of the class to initialize its attributes
public FolderProcessor(String path, String extension)
{
this.path = path;
this.extension = extension;
}
//Implement the compute() method. As you parameterized the RecursiveTask class
with the List<String> type,
//this method has to return an object of that type.
@Override
protected List<String> compute()
{
//List to store the names of the files stored in the folder.
List<String> list = new ArrayList<String>();
//FolderProcessor tasks to store the subtasks that are going to process the
subfolders stored in the folder
List<FolderProcessor> tasks = new ArrayList<FolderProcessor>();
//Get the content of the folder.
File file = new File(path);
File content[] = file.listFiles();
//For each element in the folder, if there is a subfolder, create a new
FolderProcessor object
//and execute it asynchronously using the fork() method.
if (content != null)
{
for (int i = 0; i < content.length; i++)
{
if (content[i].isDirectory())
{
FolderProcessor task = new
FolderProcessor(content[i].getAbsolutePath(), extension);
task.fork();
tasks.add(task);
}
//Otherwise, compare the extension of the file with the extension you are
looking for using the checkFile() method
//and, if they are equal, store the full path of the file in the list of
strings declared earlier.
else
{
if (checkFile(content[i].getName()))
{
list.add(content[i].getAbsolutePath());
}
}
}
}
//If the list of the FolderProcessor subtasks has more than 50 elements,
//write a message to the console to indicate this circumstance.
System.out.printf("******************************************\n");
try
{
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e)
{
e.printStackTrace();
}
} while ((!system.isDone()) || (!apps.isDone()) || (!documents.isDone()));
//Shut down ForkJoinPool using the shutdown() method.
pool.shutdown();
//Write the number of results generated by each task to the console.
List<String> results;
results = system.join();
System.out.printf("System: %d files found.\n", results.size());
results = apps.join();
System.out.printf("Apps: %d files found.\n", results.size());
results = documents.join();
System.out.printf("Documents: %d files found.\n", results.size());
}
How it works?
In the FolderProcessor class, Each task processes the content of a folder. As you know, this content
has the following two kinds of elements:
Files
Other folders
If the task finds a folder, it creates another Task object to process that folder and sends it to the pool
using the fork() method. This method sends the task to the pool that will execute it if it has a free
worker-thread or it can create a new one. The method returns immediately, so the task can continue
processing the content of the folder. For every file, a task compares its extension with the one its looking
for and, if they are equal, adds the name of the file to the list of results.
Once the task has processed all the content of the assigned folder, it waits for the finalization of all
the tasks it sent to the pool using the join() method. This method called in a task waits for the
finalization of its execution and returns the value returned by the compute() method. The task groups the
results of all the tasks it sent with its own results and returns that list as a return value of the compute()
method.
Conclusion
Designing good multi-threaded algorithms is hard, and fork/join doesnt work in every circumstance.
Its very useful within its own domain of applicability, but in the end, you have to decide whether your
problem fits within the framework, and if not, you must be prepared to develop your own solution,
building on the superb tools provided by java.util.concurrent package.
References
http://gee.cs.oswego.edu/dl/papers/fj.pdf
http://docs.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html
http://www.packtpub.com/java-7-concurrency-cookbook/book