Press "Enter" to skip to content

Parallel Collection Processing: Without Parallel Streams (1/3)

Stream API is a convenient tool for collection processing, especially if you need to parallelize the execution of CPU-intensive tasks.

However, unsurprisingly, such convenience comes at a price, so better be aware of how to achieve the same without relying on the tool.

Downsides of Parallel Streams

Parallel collection processing using Stream API can be as easy as:

List<Integer> result = list.parallelStream()
  .map(toSomething())
  .collect(Collectors.toList());

Unfortunately, we can’t choose an execution facility, define per-instance parallelism, or avoid blocking the calling thread.

What’s more, it turns out that the hardwired thread pool is the static common ForkJoinPool which is a default execution facility not only for parallel streams, but for multiple methods from CompletableFuture, and potentially even Project Loom’s virtual threads (fibers) – polluting such a thread pool with long-running operations could be disastrous for performance.

On the one hand, we could mitigate the issue by using a ManagedBlocker, but it turns out that ForkJoinPool is not a part of the public contract:

Note, however, that this technique (…) is an implementation “trick” and is not guaranteed to work. Indeed, the threads or thread pool that is used for execution of parallel streams is unspecified.

source: Stuart Marks @ Stack Overflow

But what to do if we want to process collections using long-running and/or blocking tasks?

Parallel Collection Processing Without Parallel Streams

Our task will be to take a simple list of integers and process them using a slow identity function.

The first thing you will notice is that in all the below examples, we need to manually manage our resources – which is something that standard Parallel Streams hide from us by relying on a static thread pool. Keep in mind that proper tuning of thread pools will be covered in a separate article.

Before we dig into contemporary techniques, let’s quickly revisit how something like this could be achieved using legacy Java versions.

Before Java 8

Let’s start with a point of reference, which is how sequential processing would look like:

List<Integer> results = new ArrayList<>();

for (Integer integer : integers) {
    results.add(Utils.process(integer));
}

As mentioned above, to be able to process anything asynchronously, we need to have a thread/thread-pool where we could schedule our operations on:

ExecutorService executor = Executors.newFixedThreadPool(10);

And now, we can submit all operations to the above thread pool:

List<Future<Integer>> futures = new ArrayList<>();

for (Integer integer : integers) {
    Future<Integer> result = executor.submit(
      () -> Utils.process(integer));
    futures.add(result);
}

At this point, the processing is already parallelized, and all we need to do is to wait for the processing to end, and repackage our results into another list instance(and potentially free no-longer-needed resources):

List<Integer> results = new ArrayList<>();

for (Future<Integer> task : futures) {
    results.add(task.get());
}

In this setup, results will maintain the original order.

However, it’s not difficult to process results as they arrive by relying on a CompletionService:

CompletionService<Integer> completionService =
  new ExecutorCompletionService<>(executor);

for (Integer integer : integers) {
    completionService.submit(() -> Utils.process(integer));
}

List<Integer> results = new ArrayList<>();

for (int i = 0; i < integers.size(); i++) {
    results.add(completionService.take().get());
}

As you can see, it wasn’t tough, but the code is unnecessarily imperative and not composable, which could be painful if we wanted to apply some advanced processing.

Additionally, it’s non-trivial to control the parallelism level and introduce short-circuiting on exceptions.

Luckily, Java 8’s CompletableFuture introduced a significant improvement when it comes to dealing with asynchronous computations.

Since Java 8

Java 8 was a revolution that brought us tools like Stream API and CompletableFuture.

Luckily, if we don’t want to use Parallel Streams, we can still use Stream API for the orchestration of parallel calls!

So, this time, our point of reference is:

List<Integer> results = integers.stream()
  .map(Utils::process)
  .collect(toList());

We’ll need a thread pool to schedule our operations on:

ExecutorService executor = Executors.newFixedThreadPool(10);

And introducing parallelism is as easy as:

integers.stream()
  .map(i -> CompletableFuture.supplyAsync(() -> process(i), executor))
  ...

However, this is where things start getting tricky.

You might be tempted to finish the above by adding a single .map(CompletableFuture:: join) line and call it a day, but it won’t work as expected because of the nature of Stream API:

List<Integer> results = integers.stream()
  .map(i -> CompletableFuture.supplyAsync(() -> process(i), executor))
  .map(CompletableFuture::join)
  .collect(Collectors.toList())

Streams process things lazily, which means that the above pipeline would submit each operation and immediately wait for a result to arrive before starting remaining operations, which could quickly end up with a processing time longer than processing the collection sequentially!

And I’ve seen this on production more than once, beware!

That means that we need first to make sure that all operations are submitted, and only then start processing results. Thankfully, Stream API Collectors are incredibly composable, so the solution is relatively straightforward:

List<Integer> results = integers.stream()
  .map(i -> CompletableFuture.supplyAsync(() -> process(i), executor))
  .collect(collectingAndThen(toList(), list -> list.stream()
    .map(CompletableFuture::join)
    .collect(toList())));

As you can see, we collect all CompletableFutures first, and then(pun intended) wait for results to arrive.

However, the above has one huge deficiency – lack of short-circuiting on exceptions.

Have a look at the below example:

@Test
void example_no_shortcircuit() throws Exception {
    ExecutorService executor = Executors.newFixedThreadPool(10);

    List<Integer> results = IntStream.range(0, 10).boxed()
      .map(i -> CompletableFuture.supplyAsync(() -> {
          if (i != 9) {
              try {
                  Thread.sleep(10000);
              } catch (InterruptedException e) {
                  throw new RuntimeException(e);
              }
              return i;
          } else {
              throw new RuntimeException();
          }
      }, executor))
      .collect(collectingAndThen(toList(), list -> list.stream()
        .map(CompletableFuture::join)
        .collect(toList())));
}

Even though the exception is thrown immediately, we still need to wait for other operations to complete… only to receive the exception. What a waste of time!

We could bypass the above problem by relying on the revised implementation of CompletableFuture#allOf instead:

static <T> CompletableFuture<List<T>> allOfOrException(Collection<CompletableFuture<T>> futures) {
    CompletableFuture<List<T>> result = futures.stream()
      .collect(collectingAndThen(
        toList(),
        l -> CompletableFuture.allOf(l.toArray(new CompletableFuture[0]))
          .thenApply(__ -> l.stream()
            .map(CompletableFuture::join)
            .collect(Collectors.toList()))));

    for (CompletableFuture<?> f : futures) {
        f.handle((__, ex) -> ex == null 
          || result.completeExceptionally(ex));
    }

    return result;
}

More details about the above implementation can be found in a separate article about CompletableFuture#ofAll.

So, after encapsulating all the above complexity, the short-circuiting version would look like the following:

List<Integer> results = integers.stream()
  .map(i -> CompletableFuture.supplyAsync(() -> process(i), executor))
  .collect(collectingAndThen(toList(), l -> allOfOrException(l).join()));

This is quite decent already, however, there’s a low-hanging fruit to seize.

In the example above, we wait for all the results to arrive by calling the join() method on the aggregated future containing the list of results, but what if instead of blocking, we’d let users choose when and if at all by returning a future with the result instead?

CompletableFuture<List<Integer>> results = integers.stream()
  .map(i -> CompletableFuture.supplyAsync(() -> process(i), executor))
  .collect(collectingAndThen(toList(), l -> allOfOrException(l)));

That would result in having a much more versatile tool that could leverage the power of CompletableFuture and enable the basics of reactive programming in core Java:

integers.stream()
  .map(i -> CompletableFuture.supplyAsync(() -> process(i), executor))
  .collect(collectingAndThen(toList(), l -> allOfOrException(l)))
  .orTimeout(1000, MILLISECONDS)
  .thenAcceptAsync(System.out::println, otherExecutor)
  .thenRun(() -> System.out.println("Finished!"));

This can be further packaged into a single static method:

public static <T, R> CompletableFuture<List<R>> inParallel(
  Collection<? extends T> source, Function<T, R> mapper, Executor executor) {
    return source.stream()
      .map(i -> supplyAsync(() -> mapper.apply(i), executor))
      .collect(collectingAndThen(toList(), l -> allOfOrException(l)));
}

However, the price to be paid is a drastic increase in complexity. And we still don’t have a convenient way to put a cap on maximum parallelism.

This is why I created a tool that encapsulates all of the above into a set of convenient collectors that can be used without the above hassle – parallel-collectors.

More about it in the next article from the series.

Conclusion

Stream API is a great tool for collection processing, especially if you need to parallelize the execution of CPU-intensive tasks. Unfortunately, it’s not the best choice for running blocking operations even when using ManagedBlocker.

Luckily, we can orchestrate parallel collection processing manually, or use a tool like parallel-collectors.

Remember, even though it’s relatively easy to parallelize things, it doesn’t always mean that you should. Parallelism comes with a price that can be often higher than not using it at all. Threads are expensive to create, maintain, and switch between, and you can only create a limited number of them.

It’s important to follow up on the root cause and double-check if parallelism is the way to go.

It often turns out that the root cause can be addressed by using a simple JOIN statement, batching, reorganizing your data… or even just by choosing a different API method.

Code samples can be found on GitHub.

Next Articles

 

Parallel Collection Processing: Without Parallel Streams (1/3)

Parallel Collection Processing: With Parallel Collectors (3/3)




If you enjoyed the content, consider supporting the site: