Tuesday, March 12, 2013

Actor Model: Multi-threaded Parallel Processing in Java

Actor Model: Multi-threaded Parallel Processing in Java


The Actor model provides a constrained way to use multi-threaded parallel processing. Each Actor is used to process queued requests (or "messages") one at a time, as one stage of a pipeline. Below is one way to implement the Actor model in Java.


When multiple tasks need to be performed in a pipeline, there is sometimes a desire to execute them concurrently using separate threads, for example to take advantage of hardware with multiple CPU cores. But since concurrent sequential processing is notoriously prone to bugs that are difficult to replicate and isolate, it is helpful to use a programming model that imposes some structure on the use of separate threads and how they can interact.

In the Actor model, each actor runs in its own thread and only operates locally on its own queue of tasks. Multiple actors can be set up in a pipeline to work in parallel, each actor consuming the tasks in its own queue, and potentially adding tasks to the queues of other actors. For example, one actor may save RDF graphs from an HTTP endpoint, while another actor downstream later performs a computation on those graphs.

Another reason for using an actor that run in its own thread, processing one task at a time, is to impose throttling, so that too many threads are not trying to run at once. Throttling is not only helpful in preventing one client from consuming inordinate resources. In many cases it can actually improve total throughput, by preventing resource contention.


Each actor is represented by a separate Java class and has its own queue of similar tasks (or messages) that it will process, one at a time. Each task is represented as an instance of the actor class. A task's constructors create an instance that can be sent to the actor by calling the "execute" method on that instance, thus queuing the task for processing. The constructors take required parameters as arguments; optional parameters may be set via setter methods.

Each actor (a Java class) has its own thread, which it uses to asynchronously process the tasks in its queue. Different actors process different type of tasks in different queues and different threads. A different actor thread is held in a static field called "actor" in each task class. The actor is an instance of a standard class called ExecutorService that is provided by the JVM. The queue is inside the actor and maintained by the actor, so custom code does not need to deal with the queue maintenance, thus (hopefully) reducing the opportunity for thread programming errors.

Each task class must implement a "run" method, which will be called by the JVM when it is time to process one instance (or message).

Sample Code

We'll sketch out how to define a task class called GraphReaderTask, which will read RDF graphs from a set of URLs, and store those graphs into an RDF repository using Sesame. First, we'll need to import some of the standard Java concurrency classes:
import java.util.concurrent.*;
Our GraphReaderTask class (or a nested class) must implement the Callable interface:
public class GraphReaderTask implements Callable<Void> {
Here is the static "actor" that holds the thread for the GraphReaderTask:
private static final ExecutorService actor = Executors.newSingleThreadExecutor();
Next, some fields that the GraphReaderTask will need, in processing each message. Each message
private final Repository repository; // A Sesame RDF repository
private final String url; // The URL of an RDF graphs to save
private Future<Void> ctrl;
Now we can define a GraphReaderTask constructor and control methods:
public GraphReaderTask(Repository repository, String url) {
        this.repository = repository;
        this.url = url;
public boolean isSubmitted() {
 return ctrl != null;
public boolean isCancelled() {
 return ctrl != null && ctrl.isCancelled();
public boolean isDone() {
 return ctrl != null && ctrl.isDone();
public synchronized void submit() {
 if (ctrl == null) {
  ctrl = actor.submit(this);
 } else {
  throw new IllegalStateException();
public boolean cancel() {
 return ctrl.cancel(false);
public void await() throws InterruptedException, IOException,
  OpenRDFException {
 try {
 } catch (ExecutionException e) {
  try {
   throw e.getCause();
  } catch (Error cause) {
   throw cause;
  } catch (RuntimeException cause) {
   throw cause;
  } catch (IOException cause) {
   throw cause;
  } catch (OpenRDFException cause) {
   throw cause;
  } catch (Throwable cause) {
   throw new UndeclaredThrowableException(cause);
Now we can define a "call" method, which will be invoked by the actor when it is time to process the next task from this actor's queue, and must perform the guts of whatever this actor/task should do. In this example, the GraphReaderTask simply reads an RDF graph from a URL and stores it into our Sesame repository.

Remember that all instances of GraphReaderTask are associated with one actor, that is an ExecutorService, which provides features for shutting down gracefully. So before we actually start doing any work, we first need to check to see if the task is cancelled, and if so, merely return without doing anything (except perhaps writing a note to a log).
public Void call() throws IOException, OpenRDFException {
 if (isCancelled())
  return null;
 URLConnection http = new URL(url).openConnection();
 http.setRequestProperty("Accept", "application/rdf+xml");
 InputStream in = http.getInputStream();

 RepositoryConnection con = repository.getConnection();
 try {
  ValueFactory vf = con.getValueFactory();
  URI graph = vf.createURI(url);

  if (isCancelled())
   return null;
  con.add(in, url, RDFFormat.RDFXML, graph);

 } finally {
 return null;
Now that we have defined the GraphReaderTask class, we need to make use of it.
new GraphReaderTask(repository, url).submit();
This technique allows the caller thread to continue with other processing, while the dedicated graph reader thread takes care of parsing RDF. By using a queue we ensure that threads are not blocked when they could be performing other operations. The await() method can be used by the caller to re-join when the task is complete and propagate any exceptions that may have occurred.