Java - Structured Concurrency Using StructuredTaskScope

This tutorial shows you how to create a structured concurrency in Java using StructuredTaskScope, including how to use ShutDownOnFailure, ShutDownOnSuccess, or custom policy.

Java allows you to execute code concurrently by creating threads. Java 19 brought an improvement by introducing a feature called virtual threads, which is lighter and easier to use than the traditional threads. Therefore, you can create virtual threads to run tasks concurrently. However, if you have multiple tasks executed in different threads, one of the problems that commonly arises is how to control when to cancel the execution of tasks. There are some different policies for it. For example, when a task fails, the other tasks should not be continued since the result will be useless. In another policy, we only want to get the result of a task that completes first. Handling such cases can be quite complex.

Fortunately, Java 21 introduced a feature called structured concurrency. It adds a class named StructuredTaskScope, which is designed to streamline error handling and cancellation. Besides reducing the effort for writing the code, it can also improve the readability of the code.

Using StructuredTaskScope

To create a structured concurrency using StructuredTaskScope, you have to create an instance of it. The StructuredTaskScope class implements AutoCloseable. Therefore, you can use a try-with-resources block.

Inside the block, you need to create some subtasks to be executed using the fork(Callable) method. Then, call the join() or joinUntil(Instant) method of the scope, which makes it wait until a certain condition meets. For example, you can wait until a subtask has completed or until any subtask fails, depending on the shutdown policy used.

StructuredTaskScope States

A StructuredTaskScope has three states: OPEN, SHUTDOWN, and CLOSED. The order of the states are OPEN -> SHUTDOWN -> CLOSED. It's not possible for the state to go backward.

The initial state is OPEN. In this state, it's possible to fork new subtasks while waiting for existing subtasks to complete.

The second state is SHUTDOWN. Any thread can set the scope's state to SHUTDOWN, not necessarily the scope owner. In this state, all subtasks have been completed (either success or failed) or interrupted. If you try to fork a new subtask, it won't be executed. Updating to this state can be done by using the shutdown() method. Usually, it's already handled by the classes that extend StructuredTaskScope, such as ShutDownOnFailure and ShutDownOnSuccess, when a certain condition meets. However, you can also call it manually if necessary. The isShutdown() method can be used to check whether the scope is already shut down.

The last state is CLOSED. Only the scope owner can update to this state. In this state, it's not allowed to fork a new subtask or call the join() or joinUntil(Instant) method. If you do one of them, an IllegalStateException exception will be thrown. Updating the state to CLOSED is handled inside the close() method. You don't need to manually call the method if you use a try-with-resources block.

Create Subtasks

To create a subtask to be run concurrently, you can use the fork(Callable) method of StructuredTaskScope. You have to pass a Callable to be run as the argument.

  public <U extends T> Subtask<U> fork(Callable<? extends U> task)

The method starts a new virtual thread to execute a value-returning method that's passed as the argument. It creates the new thread using the ThreadFactory of the scope. The current ScopedValue bindings are inherited to the new thread as well.

The return type of the fork method is Subtask, which represents a forked subtask. Using the returned Subtask object, you can get the current state of the subtask.

Get Subtask State

First, you can get the state of the subtask by calling the state() method which returns an enum whose values are:

  • UNAVAILABLE: The subtask has been forked but not completed or forked/completed after the scope was shut down. The result or exception is not available.
  • SUCCESS: The subtask completed successfully with a result. The result can be obtained by using the get() method.
  • FAILED: The subtask failed with an exception. The exception can be obtained by using the exception() method.

Example:

  State state = mySubtask.state();

Call join() Method

The join() method is used to wait for all subtasks of the scope to finish or the task scope to shut down. The call to the method is required if you want to get the result or exception of the scope.

  public StructuredTaskScope<T> join() throws InterruptedException

There are several rules regarding how to call the join() method.

  • It can only be invoked by the scope owner. If you call it from a forked thread for example, it will throw java.lang.WrongThreadException: Current thread not owner.
  • It can only be invoked if the state of the scope is not CLOSED. If the scope is already closed and you call the join() method, you'll get java.lang.IllegalStateException: Task scope is closed exception.
  • It may throw InterruptedException if the thread is interrupted while waiting.
  List<Double> getResult() throws InterruptedException {
    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
      Subtask<Double> subtask1 = scope.fork(Math::random);
      Subtask<Double> subtask2 = scope.fork(Math::random);

      scope.join();

      // Get result or exception
      return List.of(subtask1.get(), subtask2.get());
    }
  }

There is also an alternative method called joinUntil(Instant) which does the similar thing, but with a deadline. It can also throw a TimeoutException if the waiting time exceeds the deadline.

  public StructuredTaskScope<T> joinUntil(Instant deadline) throws InterruptedException, TimeoutException

Get Subtask Result

If a subtask has completed successfully, you can get the returned value by using the get() method.

  Subtask<Double> mySubtask = scope.fork(Math::random);
  // Other subtasks

  scope.join();

  Double subtask1Result = mySubtask.get();

The method may throw an IllegalStateException in these conditions:

  • If the owner did not join after forking subtask
  • If the subtask has not completed (status: UNAVAILABLE).
  • If the subtask completed unsuccessfully (status: FAILED).

Therefore, you have to make sure to only call the get() only after the join() or joinUntil(Instant) method of the scope has been called and the subtask status is SUCCESS.

Get Subtask Exception

A subtask can also fail. If that happens, you may want to get the exception in order to check the cause. It can be done by calling the exception() method.

  Subtask<Double> mySubtask = scope.fork(() -> {
    throw new RuntimeException();
  });
  // Other subtasks

  scope.join();

  Throwable subtask1Exception = mySubtask.exception();

The method may throw an IllegalStateException in these conditions:

  • If the owner did not join after forking subtask
  • If the subtask has not completed (status: UNAVAILABLE).
  • If the subtask completed with a result (status: SUCCESS).

That means you should only call the exception() after calling the join() or joinUntil(Instant) method of the scope and if the subtask completed with an exception.

Shut Down Policies

If there are several subtasks running concurrently, sometimes it would be preferred to cancel unfinished subtasks on a certain condition. For example, if one of the subtasks fails, the others should be canceled because the results will be useless. However, in another case, we only want to get the result from any subtask that completes successfully first. Java already provides two implementations of StructuredTaskScope, ShutdownOnFailure and ShutdownOnSuccess to handle those two different policies.

ShutdownOnFailure Policy

With this policy, the scope's shutdown() method will be called when any subtask throws an exception. It causes the unfinished threads to be interrupted and awakens the task scope owner. This policy is suitable for cases where you need all subtasks to complete successfully because the results will be useless if any of them fails.

The ShutdownOnFailure class also provides some additional methods. There is a method called exception, which is used to return the exception of the first subtask that failed. If there is no failed subtask, it will return an empty Optional.

  public Optional<Throwable> exception()

It also has methods named throwIfFailed for throwing an exception when any subtask fails.

  public void throwIfFailed() throws ExecutionException
  public <X extends Throwable> void throwIfFailed(Function<Throwable, ? extends X> esf) throws X

There are several rules for calling the exception and throwIfFailed methods.

  • The methods can only be called by the scope owner. Otherwise, it will throw a WrongThreadException.
  • The methods can only be called after the join() or joinUntil(Instant) method has been called. Otherwise, it will throw an IllegalStateException.
  • The parameter-less throwIfFailed method will throw an ExecutionException if there is a failed subtask with the exception of the first failed subtask as the cause. Meanwhile, the throwIfFailed method that has a parameter allows you to throw any exception by passing a function.

In the example below, there is a structured concurrency using the ShutdownOnFailure policy with two subtasks.

  public class ShutDownOnFailureExample {
  
    record HighScore(String user, Integer score) {}
    record Result(Integer userScore, List<HighScore> highScores) {}
  
    private Integer getUserScore() throws InterruptedException {
      int delay = 100;
      System.out.println("getUserScore will be delayed for " + delay + " ms");
      Thread.sleep(delay);
      return 50;
    }
  
    private List<HighScore> getHighScores() throws InterruptedException {
      int delay = 50;
      System.out.println("getHighScores will be delayed for " + delay + " ms");
      Thread.sleep(delay);
      return List.of(
          new HighScore("Player A", 100),
          new HighScore("Player B", 90),
          new HighScore("Player C", 80)
      );
    }
  
    Result getData() throws ExecutionException, InterruptedException {
      try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
        StructuredTaskScope.Subtask<Integer> userScoreSubtask = scope.fork(this::getUserScore);
        StructuredTaskScope.Subtask<List<HighScore>> highScoresSubtask = scope.fork(this::getHighScores);
  
        scope.join()
            .throwIfFailed();
  
        return new Result(userScoreSubtask.get(), highScoresSubtask.get());
      }
    }
  
    private void runExample() throws ExecutionException, InterruptedException, TimeoutException {
      Result result = getData();
      System.out.println("User score: " + result.userScore);
      System.out.println("High scores: " + result.highScores);
    }
  
    public static void main(String[] args) {
      ShutDownOnFailureExample example = new ShutDownOnFailureExample();
  
      try {
        example.runExample();
      } catch (ExecutionException | InterruptedException | TimeoutException e) {
        throw new RuntimeException(e);
      }
    }
  }

Output:

  getHighScores will be delayed for 50 ms
  getUserScore will be delayed for 100 ms
  User score: 50
  High scores: [HighScore[user=Player A, score=100], HighScore[user=Player B, score=90], HighScore[user=Player C, score=80]]

If there is no exception thrown, the call to throwIfFailed() will not throw any exception as well. The result of each subtask can be obtained by using the get() method.

Now we change one of the subtasks to throw an exception.

  private Integer getUserScore() throws InterruptedException {
    int delay = 100;
    System.out.println("getUserScore will be delayed for " + delay + " ms");
    Thread.sleep(delay);
    throw new RuntimeException("getUserScore error");
  }

Output:

  getUserScore will be delayed for 100 ms
  getHighScores will be delayed for 50 ms
  java.lang.RuntimeException: getUserScore error

As a result, an exception is thrown because of the call to the throwIfFailed method. In case the throwIfFailed method is not called, it won't throw the exception. However, if you try to get the result of a subtask whose state is not SUCCESS, you'll get an IllegalStateException.

Another alternative is to use the throwIfFailed(Function<Throwable, ? extends X> esf) method, which has one parameter to pass a function which can return any exception that you want. When there is a failed subtask, the function will be invoked with the first exception thrown as the argument.

  scope.join()
      .throwIfFailed((ex) -> {
        throw new RuntimeException("Wrapped exception", ex);
      });

If you want to get the exception without rethrowing it or mapping it to another exception, you can call the exception() method.

  Optional<Throwable> exception = scope.join()
      .exception();
  exception.ifPresent(Throwable::printStackTrace);

Below is another case where both subtasks throw an exception.

  private Integer getUserScore() throws InterruptedException {
    int delay = 100;
    System.out.println("getUserScore will be delayed for " + delay + " ms");
    Thread.sleep(delay);
    throw new RuntimeException("getUserScore error");
  }

  private List<HighScore> getHighScores() throws InterruptedException {
    int delay = 50;
    System.out.println("getHighScores will be delayed for " + delay + " ms");
    Thread.sleep(delay);
    throw new RuntimeException("getHighScores error");
  }

Output:

  getHighScores will be delayed for 50 ms
  getUserScore will be delayed for 100 ms
  Exception in thread "main" java.lang.RuntimeException: java.util.concurrent.ExecutionException: java.lang.RuntimeException: getHighScores error

Because highScoresSubtask throws the exception before userScoreSubtask, the exception of highScoresSubtask is the one that will be thrown when calling throwIfFailed. The same also applies if you call the exception method. In fact, most likely the userScoreSubtask is already interrupted before throwing an exception.

ShutdownOnSuccess Policy

This policy calls the shutdown() method when any of the subtasks has completed with a result. As it happens, the unfinished threads are interrupted and the task scope owner is awakened. You should use this policy for cases where you only need to get the result from any subtask that completes the quickest.

The ShutdownOnSuccess class provides several methods called result for returning the result of the first subtask.

  public T result() throws ExecutionException
  public <X extends Throwable> T result(Function<Throwable, ? extends X> esf) throws X

There are several rules for calling the result methods.

  • The methods can only be called by the scope owner. Otherwise, it will throw a WrongThreadException.
  • The methods can only be called after the join() or joinUntil(Instant) method has been called. Otherwise, it will throw an IllegalStateException.
  • If there's no subtask completed, the methods will throw an IllegalStateException.
  • The one without parameter will throw an ExecutionException if no subtasks completed but at least one task failed. Meanwhile, the one with a parameter allows you to throw any exception by passing a function.

The ShutdownOnSuccess class has a parameterized type which is used to specify the type of the value returned by the subtasks, since it's quite common for the subtasks to have the same return type. If you don't specify the type, the return type of the result methods will be an Object.

In the example below, we have two subtasks for getting an opponent. We only want to get one result from the quickest one.

  public class ShutDownOnSuccessExample {
  
    record Player(String id, String name) {}
  
    private Player getOpponent1Subtask() throws InterruptedException {
      int delay = 100;
      System.out.println("getOpponent1 will be delayed for " + delay + " ms");
      Thread.sleep(delay);
      System.out.println("getOpponent1Subtask - done");
      return new Player("A", "Player A");
    }

    private Player getOpponent2() throws InterruptedException {
      int delay = 50;
      System.out.println("getOpponent2 will be delayed for " + delay + " ms");
      Thread.sleep(delay);
      System.out.println("getOpponent2 - done");
      return new Player("B", "Player B");
    }
  
    Player getOpponent() throws ExecutionException, InterruptedException {
      try (var scope = new StructuredTaskScope.ShutdownOnSuccess<Player>()) {
        StructuredTaskScope.Subtask<Player> getOpponent1Subtask = scope.fork(this::getOpponent1Subtask);
        StructuredTaskScope.Subtask<Player> getOpponent2Subtask = scope.fork(this::getOpponent2);
  
        System.out.println("-----Before join-----");
        System.out.println("getOpponent1Subtask - state: " + getOpponent1Subtask.state());
        System.out.println("getOpponent2Subtask - state: " + getOpponent2Subtask.state());
  
        Player opponent = scope.join()
            .result();
        System.out.println(opponent);
  
        System.out.println("-----After join-----");
        System.out.println("getOpponent1Subtask - state: " + getOpponent1Subtask.state());
        System.out.println("getOpponent2Subtask - state: " + getOpponent2Subtask.state());
  
        return opponent;
      }
    }
  
    private void runExample() throws ExecutionException, InterruptedException {
      Player opponent = getOpponent();
    }
  
    public static void main(String[] args) {
      ShutDownOnSuccessExample example = new ShutDownOnSuccessExample();
  
      try {
        example.runExample();
      } catch (ExecutionException | InterruptedException e) {
        throw new RuntimeException(e);
      }
    }
  }

Output:

  -----Before join-----
  getOpponent1Subtask - state: UNAVAILABLE
  getOpponent2Subtask - state: UNAVAILABLE
  getOpponent2 will be delayed for 50 ms
  getOpponent1 will be delayed for 100 ms
  getOpponent2 - done
  Player[id=B, name=Player B]
  -----After join-----
  getOpponent1Subtask - state: UNAVAILABLE
  getOpponent2Subtask - state: SUCCESS

From the output, we can see that the initial state of both subtasks are UNAVAILABLE. Then, it turns out that the getOpponent2Subtask completed faster since it has a shorter delay time. As a result, the result returned by the result method is the one from the getOpponent2Subtask. The getOpponent1Subtask is interrupted and hence it doesn't complete which can be seen that the 'getOpponent1Subtask - done' text is not printed. After scope is shut down, the state of the getOpponent2Subtask becomes SUCCESS, while the state of the getOpponent1Subtask remains UNAVAILABLE.

Now, we want to see what will happen if one of the subtasks fails. For example, we change the getOpponent2 method to throw an exception.

  private Player getOpponent1Subtask() throws InterruptedException {
    int delay = 100;
    System.out.println("getOpponent1Subtask will be delayed for " + delay + " ms");
    Thread.sleep(delay);
    System.out.println("getOpponent1Subtask - done");
    return new Player("A", "Player A");
  }

  private Player getOpponent2() throws InterruptedException {
    int delay = 50;
    System.out.println("getOpponent2 will be delayed for " + delay + " ms");
    Thread.sleep(delay);
    throw new RuntimeException("getOpponent2 error");
  }

Output:

  -----Before join-----
  getOpponent1Subtask - state: UNAVAILABLE
  getOpponent2Subtask - state: UNAVAILABLE
  getOpponent2 will be delayed for 50 ms
  getOpponent1 will be delayed for 100 ms
  getOpponent1Subtask - done
  Player[id=A, name=Player A]
  -----After join-----
  getOpponent1Subtask - state: SUCCESS
  getOpponent2Subtask - state: FAILED

If there is a failed subtask, the others will not be interrupted. In addition, the result method is still able to return a value as long as there is a subtask that executed successfully.

Now, let's see what's going to happen if both subtasks are failed.

  private Player getOpponent1Subtask() throws InterruptedException {
    int delay = 100;
    System.out.println("getOpponent1Subtask will be delayed for " + delay + " ms");
    Thread.sleep(delay);
    throw new RuntimeException("getOpponent1Subtask error");
  }

  private Player getOpponent2() throws InterruptedException {
    int delay = 50;
    System.out.println("getOpponent2 will be delayed for " + delay + " ms");
    Thread.sleep(delay);
    throw new RuntimeException("getOpponent2 error");
  }

Output:

  -----Before join-----
  getOpponent1Subtask - state: UNAVAILABLE
  getOpponent2Subtask - state: UNAVAILABLE
  getOpponent2 will be delayed for 50 ms
  getOpponent1 will be delayed for 100 ms
  Exception in thread "main" java.lang.RuntimeException: java.util.concurrent.ExecutionException: java.lang.RuntimeException: getOpponent2 error
      at com.woolha.example.structuredconcurrency.ShutDownOnSuccessExample.main(ShutDownOnSuccessExample.java:60)

Because there is no result available, the call to result causes an exception. Java will throw the exception that occurred first. In the example above, the exception from getOpponent2Subtask is the one that's thrown because it's thrown before the exception from getOpponent1Subtask.

Alternatively, you can change the code to use the result(Function<Throwable, ? extends X> esf) method, which accepts a function that allows you to throw any exception that you want. If no subtask is successful and there is any subtask throwing an exception, the passed function will be invoked with an argument whose value is the first exception thrown by the subtasks.

  Player opponent = scope.join()
      .result((ex) -> { // ex is the first exception thrown
        throw new RuntimeException("Wrapper exception", ex);
      });

Custom Policy

It's also possible to create a custom policy that use your own logic. You can create a class that extends StructuredTaskScope. Most likely, you need to override the handleComplete method. It's the method to be invoked when each sub task completed, either success or failed. Inside, you can write any code to determine when to call the shutdown method. You may also need to override the join() and joinUntil(Instant deadline) methods to return an object whose type is the custom class. In addition, you can also add additional methods.

The example below is a custom policy that calls the shutdown method after all subtasks have been completed. It also stores the result of successful subtasks in a List.

  static class MyCustomScope <T> extends StructuredTaskScope<T> {

    private final Queue<T> results = new ConcurrentLinkedQueue<>();
    private final int numberOfTasks;
    private int completedCount = 0;

    MyCustomScope(int numberOfTasks) {
      super(null, Thread.ofVirtual().factory());
      this.numberOfTasks = numberOfTasks;
    }

    @Override
    protected void handleComplete(Subtask<? extends T> subtask) {
      System.out.println("handleComplete");
      System.out.println(subtask.state());
      if (subtask.state() == Subtask.State.SUCCESS) {
        this.results.add(subtask.get());
      }

      this.completedCount++;

      if (this.completedCount == this.numberOfTasks) {
        System.out.println("xxx1");
        super.shutdown();
      } else {
        System.out.println("xxx2");
      }
    }

    @Override
    public MyCustomScope<T> join() throws InterruptedException {
      super.join();
      return this;
    }

    @Override
    public MyCustomScope<T> joinUntil(Instant deadline) throws InterruptedException, TimeoutException {
      super.joinUntil(deadline);
      return this;
    }

    public List<T> results() {
      super.ensureOwnerAndJoined();
      return new ArrayList<>(results);
    }
  }

Summary

In this tutorial, we have learned how to create structured concurrency in Java using StructuredTaskScope. You can control when the threads can be canceled by selecting the shutdown policy to be used. There are some built-in policies which include ShutdownOnSuccess and ShutdownOnFailure. You can also create a custom policy if necessary.

If this feature is still being a preview feature in the Java version that you use, you have to add --enable-preview to run the code.

You can also read about: