1 | package com.pivovarit.collectors; | |
2 | ||
3 | import java.util.concurrent.CompletableFuture; | |
4 | import java.util.concurrent.Executor; | |
5 | import java.util.concurrent.Executors; | |
6 | import java.util.concurrent.FutureTask; | |
7 | import java.util.concurrent.Semaphore; | |
8 | import java.util.function.BiConsumer; | |
9 | import java.util.function.Supplier; | |
10 | ||
11 | /** | |
12 | * @author Grzegorz Piwowarek | |
13 | */ | |
14 | final class Dispatcher<T> { | |
15 | ||
16 | private final CompletableFuture<Void> completionSignaller = new CompletableFuture<>(); | |
17 | private final Executor executor; | |
18 | private final Semaphore limiter; | |
19 | ||
20 | private Dispatcher() { | |
21 | this.executor = Executors.newVirtualThreadPerTaskExecutor(); | |
22 | this.limiter = null; | |
23 | } | |
24 | ||
25 | private Dispatcher(Executor executor, int permits) { | |
26 | this.executor = executor; | |
27 | this.limiter = new Semaphore(permits); | |
28 | } | |
29 | ||
30 | static <T> Dispatcher<T> from(Executor executor, int permits) { | |
31 |
1
1. from : replaced return value with null for com/pivovarit/collectors/Dispatcher::from → KILLED |
return new Dispatcher<>(executor, permits); |
32 | } | |
33 | ||
34 | static <T> Dispatcher<T> virtual() { | |
35 |
1
1. virtual : replaced return value with null for com/pivovarit/collectors/Dispatcher::virtual → KILLED |
return new Dispatcher<>(); |
36 | } | |
37 | ||
38 | CompletableFuture<T> enqueue(Supplier<T> supplier) { | |
39 | InterruptibleCompletableFuture<T> future = new InterruptibleCompletableFuture<>(); | |
40 | completionSignaller.whenComplete(shortcircuit(future)); | |
41 | try { | |
42 |
1
1. enqueue : removed call to java/util/concurrent/Executor::execute → TIMED_OUT |
executor.execute(completionTask(supplier, future)); |
43 | } catch (Throwable e) { | |
44 | completionSignaller.completeExceptionally(e); | |
45 |
1
1. enqueue : replaced return value with null for com/pivovarit/collectors/Dispatcher::enqueue → KILLED |
return CompletableFuture.failedFuture(e); |
46 | } | |
47 |
1
1. enqueue : replaced return value with null for com/pivovarit/collectors/Dispatcher::enqueue → KILLED |
return future; |
48 | } | |
49 | ||
50 | private FutureTask<T> completionTask(Supplier<T> supplier, InterruptibleCompletableFuture<T> future) { | |
51 | FutureTask<T> task = new FutureTask<>(() -> { | |
52 |
1
1. lambda$completionTask$0 : negated conditional → TIMED_OUT |
if (!completionSignaller.isCompletedExceptionally()) { |
53 | try { | |
54 |
1
1. lambda$completionTask$0 : negated conditional → KILLED |
if (limiter == null) { |
55 | future.complete(supplier.get()); | |
56 | } else { | |
57 | try { | |
58 |
1
1. lambda$completionTask$0 : removed call to java/util/concurrent/Semaphore::acquire → KILLED |
limiter.acquire(); |
59 | future.complete(supplier.get()); | |
60 | } finally { | |
61 |
1
1. lambda$completionTask$0 : removed call to java/util/concurrent/Semaphore::release → TIMED_OUT |
limiter.release(); |
62 | } | |
63 | } | |
64 | } catch (Throwable e) { | |
65 | completionSignaller.completeExceptionally(e); | |
66 | } | |
67 | } | |
68 | }, null); | |
69 |
1
1. completionTask : removed call to com/pivovarit/collectors/Dispatcher$InterruptibleCompletableFuture::completedBy → KILLED |
future.completedBy(task); |
70 |
1
1. completionTask : replaced return value with null for com/pivovarit/collectors/Dispatcher::completionTask → KILLED |
return task; |
71 | } | |
72 | ||
73 | private static <T> BiConsumer<T, Throwable> shortcircuit(InterruptibleCompletableFuture<?> future) { | |
74 |
1
1. shortcircuit : replaced return value with null for com/pivovarit/collectors/Dispatcher::shortcircuit → KILLED |
return (__, throwable) -> { |
75 |
1
1. lambda$shortcircuit$1 : negated conditional → TIMED_OUT |
if (throwable != null) { |
76 | future.completeExceptionally(throwable); | |
77 | future.cancel(true); | |
78 | } | |
79 | }; | |
80 | } | |
81 | ||
82 | static final class InterruptibleCompletableFuture<T> extends CompletableFuture<T> { | |
83 | ||
84 | private volatile FutureTask<T> backingTask; | |
85 | ||
86 | private void completedBy(FutureTask<T> task) { | |
87 | backingTask = task; | |
88 | } | |
89 | ||
90 | @Override | |
91 | public boolean cancel(boolean mayInterruptIfRunning) { | |
92 | var task = backingTask; | |
93 |
1
1. cancel : negated conditional → KILLED |
if (task != null) { |
94 | task.cancel(mayInterruptIfRunning); | |
95 | } | |
96 |
2
1. cancel : replaced boolean return with false for com/pivovarit/collectors/Dispatcher$InterruptibleCompletableFuture::cancel → SURVIVED 2. cancel : replaced boolean return with true for com/pivovarit/collectors/Dispatcher$InterruptibleCompletableFuture::cancel → SURVIVED |
return super.cancel(mayInterruptIfRunning); |
97 | } | |
98 | } | |
99 | } | |
Mutations | ||
31 |
1.1 |
|
35 |
1.1 |
|
42 |
1.1 |
|
45 |
1.1 |
|
47 |
1.1 |
|
52 |
1.1 |
|
54 |
1.1 |
|
58 |
1.1 |
|
61 |
1.1 |
|
69 |
1.1 |
|
70 |
1.1 |
|
74 |
1.1 |
|
75 |
1.1 |
|
93 |
1.1 |
|
96 |
1.1 2.2 |