CompletableFuture
and its quirks
Principal Engineer @ Mi|iM
Async
...but how to get the result back?
Future
Spawn point for results of async operations
Future<Integer> result = someAsyncMethod();
Integer integer = result.get();
Future
Spawn point for results of async operations
Future<Integer> result = someAsyncMethod();
// ...
Integer integer = result.get();
Any method can become async
Future<Integer> result = executor.submit(() -> someSyncMethod());
Integer integer = result.get();
Java Future Implementations:
- java.util.concurrent.Future (JDK 1.5)
- java.util.concurrent.CompletableFuture (JDK 1.8)
Future's Limitations
- Blocking API
- Limited exception handling
- A Future can't be manually completed
- Multiple Futures can't be combined
Future<Integer> result = executor.submit(() -> ...);
try {
Integer integer = result.get(); // blocks
} catch (InterruptedException e) {
// handle
} catch (ExecutionException e) {
// handle
}
Before JDK 8: ListenableFuture
ListenableFuture<Result> future = service.query(name);
future.addListener(new Runnable() {
public void run() {
// ...
}
}, executor);
JDK 8: CompletableFuture
CompletableFuture<Integer> cf = new CompletableFuture<>();
cf.complete(1);
CompletableFuture<Integer> cf1 = ...
CompletableFuture<Integer> cf2 = ...
CompletableFuture<Void> processed = cf1
.thenApply(i -> i + 1)
.thenCombine(cf2, Integer::sum)
.thenRun(() -> { /*...*/});
Cancelling a CompletableFuture
cf.cancel();
CompletableFuture<Void> processed = cf1
.thenApply(i -> i + 1)
.thenCombine(cf2, Integer::sum)
.thenRun(() -> { /*...*/}); // what if I cancel this one?
CompletableFuture<Integer> cf = new CompletableFuture<>();
// ...
cf.cancel();
Cancels a CompletableFuture and not tasks completing it
// @param mayInterruptIfRunning this value has no effect in this
// implementation because interrupts are not used to control
// processing.
Technically, you could hack it...
static final class InterruptibleCompletableFuture<T> extends CompletableFuture<T> {
private volatile FutureTask<?> backingTask;
private void completedBy(FutureTask<Void> task) {
backingTask = task;
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (backingTask != null) {
backingTask.cancel(mayInterruptIfRunning);
}
return super.cancel(mayInterruptIfRunning);
}
}
Combining Multiple Futures...
CompletableFuture#allOf/anyOf
CompletableFuture<Integer> cf1 = new CompletableFuture<>();
CompletableFuture<Integer> cf2 = new CompletableFuture<>();
CompletableFuture.allOf(cf1, cf2);
CompletableFuture.anyOf(cf1, cf2);
CompletableFuture#allOf
public static CompletableFuture<Void> allOf(
CompletableFuture<?>... cfs) {
return andTree(cfs, 0, cfs.length - 1);
}
- Accepts CompletableFuture<?>... cfs
- Returns CompletableFuture<Void>
- Doesn't shortcircuit on exception
Can you spot the issue?
CompletableFuture#anyOf
public static CompletableFuture<Object> anyOf(
CompletableFuture<?>... cfs) {
return // ...
}
- Accepts CompletableFuture<?>... cfs
- Returns CompletableFuture<Object>
- Doesn't wait for the first successful completion
CompletableFuture vs Threads
Straightforward with JDK 1.5's Future
ExecutorService executor = Executors.newFixedThreadPool(4);
Future<?> future = executor.submit(() -> { /* ... */ });
With CompletableFuture
CompletableFuture<Void> future = CompletableFuture
.runAsync(() -> { /* ... */ }) // 1
.thenRun(() -> { /* ... */ }) // 2
.thenRun(() -> { /* ... */ });
- What thread pool does #1 run on?
- What thread does #2 run on?
Following CompletableFuture#runAsync

ThreadPerTaskExecutor

- ForkJoinPool.commonPool() is not much better either
ForkJoinPool.commonPool()
- Just a single instance
- Good for non-blocking operations
- Used by Stream API
- Used by Arrays.parallel*
- Used by Project Loom
list.stream()
.collect(parallel(i -> foo(i), toList(), executor, parallelism))
.orTimeout(1000, MILLISECONDS)
.thenAcceptAsync(System.out::println, otherExecutor)
.thenRun(() -> System.out.println("Finished!"));
Always provide an Executor instance
CompletableFuture<Void> future = CompletableFuture
.runAsync(() -> { /* ... */ }, executor) // 1
.thenRun(() -> { /* ... */ }) // 2
.thenRun(() -> { /* ... */ });
- What thread does #2 run on?
- Either the same thread as #1 or the calling thread
- Not an issue for simple non-blocking processing
Solution: then***Async
CompletableFuture<Void> future = CompletableFuture
.runAsync(() -> { /* ... */ }, executor)
.thenRunAsync(() -> { /* ... */ }, executor)
.thenRunAsync(() -> { /* ... */ }, executor);
CompletableFuture#applyToEither vs Exceptions
CompletableFuture#applyToEither
CompletableFuture<Integer> cf1 = ...
CompletableFuture<Integer> cf2 = ...
CompletableFuture<Integer> either = cf1.applyToEither(cf2, i -> i);
CompletableFuture#applyToEither
CompletableFuture<Integer> cf1 = completedFuture(42);
CompletableFuture<Integer> cf2 = failedFuture(new NullPointerException());
CompletableFuture<Integer> either = cf1.applyToEither(cf2, i -> i);
CompletableFuture#applyToEither
CompletableFuture<Integer> cf1 = failedFuture(new NullPointerException());
CompletableFuture<Integer> cf2 = completedFuture(42)
CompletableFuture<Integer> either = cf1.applyToEither(cf2, i -> i);
Thank You!