UP | HOME

Using blocking queues with ExecutorService

I read this totally inadaquate explanation of blocking queues on "Hacker" "News", and thought I'd cover some ground they missed.

How do we use blocking queues in conjunction with an ExecutorService, the standard way of working with threads in java? And how can we string multiple thread pools together into a pipeline.

ExecutorService calls offer, not put, which instead of blocking will reject requests when the queue is full. We have to define our own blocking queue which overrides offer and instead calls blocking put.

import java.util.concurrent.LinkedBlockingQueue;

public class ExecutorBlockingQueue<T> extends LinkedBlockingQueue<T> {

  public ExecutorBlockingQueue(int size) {
    super(size);
  }

  // ExecutorService calls offer, not put, so we'll "intercept" it here
  @Override
  public boolean offer(T t) {
    try {
      put(t);
      return true;
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
    }

    return false;
  }

}

Then define a static constructor that provides an instance of our fancy new queue blocking queue to an executor.

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Executors2 {

  private static final Logger LOG = LogManager.getLogger(Executors2.class);

  public static ExecutorService blockingExecutorService(int queueSize, int threadCount) {
    BlockingQueue<Runnable> queue = new ExecutorBlockingQueue<>(queueSize);
    return new ThreadPoolExecutor(threadCount, threadCount, 20, TimeUnit.SECONDS, queue);
  }

  public static void blockingShutdown(ExecutorService executorService) throws InterruptedException {
    executorService.shutdown();
    while (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
      LOG.debug("Waiting for ExecutorService to terminate...");
    }
  }

}

Now you can easily chain together executors in a pipeline, defining a Runnable for each stage of work.

private Runnable downloadRunnable(String url) {
  return () -> {
    // Do Work...
    parseExecutor.execute(parseRunnable(result));
  }
}

private Runnable parseExecutor(String html) {
  return () -> {
    // Do Work...
    outputExecutor.execute(outputRunnable(result));
  }
}

private Runnable outputExecutor(String result) {
  return () -> {
    // Do Work...
  }
}

Finally, setup your executors, and feed the first stage in your main thread. The blockingShutdown method will drain the queue, so long as you call it on the executors in the same order work is done.

ExecutorService downloadExecutor = Executors2.blockingExecutorService(10, 10);
ExecutorService parseExecutor = Executors2.blockingExecutorService(100, 10);
ExecutorService outputExecutor = Executors.newSingleThreadedExecutor();

// ...

while (hasNextUrl()) downloadExecutor.execute(getNextUrl());

Executors2.blockingShutdown(downloadExecutor);
Executors2.blockingShutdown(parseExecutor);
Executors2.blockingShutdown(outputExecutor);