1 | package com.pivovarit.collectors; | |
2 | ||
3 | import java.util.concurrent.BlockingQueue; | |
4 | import java.util.concurrent.CompletableFuture; | |
5 | import java.util.concurrent.Executor; | |
6 | import java.util.concurrent.ExecutorService; | |
7 | import java.util.concurrent.Executors; | |
8 | import java.util.concurrent.FutureTask; | |
9 | import java.util.concurrent.LinkedBlockingQueue; | |
10 | import java.util.concurrent.RejectedExecutionException; | |
11 | import java.util.concurrent.Semaphore; | |
12 | import java.util.concurrent.ThreadFactory; | |
13 | import java.util.concurrent.atomic.AtomicBoolean; | |
14 | import java.util.function.Function; | |
15 | import java.util.function.Supplier; | |
16 | ||
17 | import static com.pivovarit.collectors.Preconditions.requireValidExecutor; | |
18 | ||
19 | /** | |
20 | * @author Grzegorz Piwowarek | |
21 | */ | |
22 | final class Dispatcher<T> { | |
23 | ||
24 | private static final Runnable POISON_PILL = () -> System.out.println("Why so serious?"); | |
25 | ||
26 | private final CompletableFuture<Void> completionSignaller = new CompletableFuture<>(); | |
27 | private final BlockingQueue<Runnable> workingQueue = new LinkedBlockingQueue<>(); | |
28 | ||
29 | private final ThreadFactory dispatcherThreadFactory = Thread::startVirtualThread; | |
30 | ||
31 | private final Executor executor; | |
32 | private final Semaphore limiter; | |
33 | ||
34 | private final AtomicBoolean started = new AtomicBoolean(false); | |
35 | ||
36 | private volatile boolean shortCircuited = false; | |
37 | ||
38 | private Dispatcher() { | |
39 | this.executor = defaultExecutorService(); | |
40 | this.limiter = null; | |
41 | } | |
42 | ||
43 | private Dispatcher(Executor executor, int permits) { | |
44 |
1
1. <init> : removed call to com/pivovarit/collectors/Preconditions::requireValidExecutor → SURVIVED |
requireValidExecutor(executor); |
45 | this.executor = executor; | |
46 | this.limiter = new Semaphore(permits); | |
47 | } | |
48 | ||
49 | private Dispatcher(int permits) { | |
50 | this.executor = defaultExecutorService(); | |
51 | this.limiter = new Semaphore(permits); | |
52 | } | |
53 | ||
54 | private Dispatcher(Executor executor) { | |
55 |
1
1. <init> : removed call to com/pivovarit/collectors/Preconditions::requireValidExecutor → SURVIVED |
requireValidExecutor(executor); |
56 | this.executor = executor; | |
57 | this.limiter = null; | |
58 | } | |
59 | ||
60 | static <T> Dispatcher<T> from(Executor executor) { | |
61 |
1
1. from : replaced return value with null for com/pivovarit/collectors/Dispatcher::from → KILLED |
return new Dispatcher<>(executor); |
62 | } | |
63 | ||
64 | static <T> Dispatcher<T> from(Executor executor, int permits) { | |
65 |
1
1. from : replaced return value with null for com/pivovarit/collectors/Dispatcher::from → KILLED |
return new Dispatcher<>(executor, permits); |
66 | } | |
67 | ||
68 | static <T> Dispatcher<T> virtual() { | |
69 |
1
1. virtual : replaced return value with null for com/pivovarit/collectors/Dispatcher::virtual → KILLED |
return new Dispatcher<>(); |
70 | } | |
71 | ||
72 | static <T> Dispatcher<T> virtual(int permits) { | |
73 |
1
1. virtual : replaced return value with null for com/pivovarit/collectors/Dispatcher::virtual → KILLED |
return new Dispatcher<>(permits); |
74 | } | |
75 | ||
76 | void start() { | |
77 |
1
1. start : negated conditional → TIMED_OUT |
if (!started.getAndSet(true)) { |
78 | dispatcherThreadFactory.newThread(() -> { | |
79 | try { | |
80 | while (true) { | |
81 | try { | |
82 |
1
1. lambda$start$3 : negated conditional → KILLED |
if (limiter != null) { |
83 |
1
1. lambda$start$3 : removed call to java/util/concurrent/Semaphore::acquire → KILLED |
limiter.acquire(); |
84 | } | |
85 | } catch (InterruptedException e) { | |
86 |
1
1. lambda$start$3 : removed call to com/pivovarit/collectors/Dispatcher::handle → NO_COVERAGE |
handle(e); |
87 | } | |
88 | Runnable task; | |
89 |
1
1. lambda$start$3 : negated conditional → TIMED_OUT |
if ((task = workingQueue.take()) != POISON_PILL) { |
90 |
2
1. lambda$start$3 : removed call to com/pivovarit/collectors/Dispatcher::retry → TIMED_OUT 2. lambda$start$2 : removed call to java/util/concurrent/Executor::execute → TIMED_OUT |
retry(() -> executor.execute(() -> { |
91 | try { | |
92 |
1
1. lambda$start$1 : removed call to java/lang/Runnable::run → TIMED_OUT |
task.run(); |
93 | } finally { | |
94 |
1
1. lambda$start$1 : negated conditional → TIMED_OUT |
if (limiter != null) { |
95 |
1
1. lambda$start$1 : removed call to java/util/concurrent/Semaphore::release → TIMED_OUT |
limiter.release(); |
96 | } | |
97 | } | |
98 | })); | |
99 | } else { | |
100 | break; | |
101 | } | |
102 | } | |
103 | } catch (Throwable e) { | |
104 |
1
1. lambda$start$3 : removed call to com/pivovarit/collectors/Dispatcher::handle → KILLED |
handle(e); |
105 | } | |
106 | }); | |
107 | } | |
108 | } | |
109 | ||
110 | void stop() { | |
111 | try { | |
112 |
1
1. stop : removed call to java/util/concurrent/BlockingQueue::put → SURVIVED |
workingQueue.put(POISON_PILL); |
113 | } catch (InterruptedException e) { | |
114 | completionSignaller.completeExceptionally(e); | |
115 | } | |
116 | } | |
117 | ||
118 | boolean isRunning() { | |
119 |
2
1. isRunning : replaced boolean return with false for com/pivovarit/collectors/Dispatcher::isRunning → SURVIVED 2. isRunning : replaced boolean return with true for com/pivovarit/collectors/Dispatcher::isRunning → TIMED_OUT |
return started.get(); |
120 | } | |
121 | ||
122 | CompletableFuture<T> enqueue(Supplier<T> supplier) { | |
123 | InterruptibleCompletableFuture<T> future = new InterruptibleCompletableFuture<>(); | |
124 | workingQueue.add(completionTask(supplier, future)); | |
125 | completionSignaller.exceptionally(shortcircuit(future)); | |
126 |
1
1. enqueue : replaced return value with null for com/pivovarit/collectors/Dispatcher::enqueue → KILLED |
return future; |
127 | } | |
128 | ||
129 | private FutureTask<Void> completionTask(Supplier<T> supplier, InterruptibleCompletableFuture<T> future) { | |
130 | FutureTask<Void> task = new FutureTask<>(() -> { | |
131 | try { | |
132 |
1
1. lambda$completionTask$4 : negated conditional → TIMED_OUT |
if (!shortCircuited) { |
133 | future.complete(supplier.get()); | |
134 | } | |
135 | } catch (Throwable e) { | |
136 |
1
1. lambda$completionTask$4 : removed call to com/pivovarit/collectors/Dispatcher::handle → TIMED_OUT |
handle(e); |
137 | } | |
138 | }, null); | |
139 |
1
1. completionTask : removed call to com/pivovarit/collectors/Dispatcher$InterruptibleCompletableFuture::completedBy → KILLED |
future.completedBy(task); |
140 |
1
1. completionTask : replaced return value with null for com/pivovarit/collectors/Dispatcher::completionTask → KILLED |
return task; |
141 | } | |
142 | ||
143 | private void handle(Throwable e) { | |
144 | shortCircuited = true; | |
145 | completionSignaller.completeExceptionally(e); | |
146 | } | |
147 | ||
148 | private static Function<Throwable, Void> shortcircuit(InterruptibleCompletableFuture<?> future) { | |
149 |
1
1. shortcircuit : replaced return value with null for com/pivovarit/collectors/Dispatcher::shortcircuit → KILLED |
return throwable -> { |
150 | future.completeExceptionally(throwable); | |
151 | future.cancel(true); | |
152 | return null; | |
153 | }; | |
154 | } | |
155 | ||
156 | static final class InterruptibleCompletableFuture<T> extends CompletableFuture<T> { | |
157 | ||
158 | private volatile FutureTask<?> backingTask; | |
159 | ||
160 | private void completedBy(FutureTask<Void> task) { | |
161 | backingTask = task; | |
162 | } | |
163 | ||
164 | @Override | |
165 | public boolean cancel(boolean mayInterruptIfRunning) { | |
166 |
1
1. cancel : negated conditional → KILLED |
if (backingTask != null) { |
167 | backingTask.cancel(mayInterruptIfRunning); | |
168 | } | |
169 |
2
1. cancel : replaced boolean return with false for com/pivovarit/collectors/Dispatcher$InterruptibleCompletableFuture::cancel → SURVIVED 2. cancel : replaced boolean return with true for com/pivovarit/collectors/Dispatcher$InterruptibleCompletableFuture::cancel → SURVIVED |
return super.cancel(mayInterruptIfRunning); |
170 | } | |
171 | } | |
172 | ||
173 | private static ExecutorService defaultExecutorService() { | |
174 |
1
1. defaultExecutorService : replaced return value with null for com/pivovarit/collectors/Dispatcher::defaultExecutorService → KILLED |
return Executors.newVirtualThreadPerTaskExecutor(); |
175 | } | |
176 | ||
177 | private static void retry(Runnable runnable) { | |
178 | try { | |
179 |
1
1. retry : removed call to java/lang/Runnable::run → TIMED_OUT |
runnable.run(); |
180 | } catch (RejectedExecutionException e) { | |
181 |
1
1. retry : removed call to java/lang/Thread::onSpinWait → SURVIVED |
Thread.onSpinWait(); |
182 |
1
1. retry : removed call to java/lang/Runnable::run → KILLED |
runnable.run(); |
183 | } | |
184 | } | |
185 | } | |
Mutations | ||
44 |
1.1 |
|
55 |
1.1 |
|
61 |
1.1 |
|
65 |
1.1 |
|
69 |
1.1 |
|
73 |
1.1 |
|
77 |
1.1 |
|
82 |
1.1 |
|
83 |
1.1 |
|
86 |
1.1 |
|
89 |
1.1 |
|
90 |
1.1 2.2 |
|
92 |
1.1 |
|
94 |
1.1 |
|
95 |
1.1 |
|
104 |
1.1 |
|
112 |
1.1 |
|
119 |
1.1 2.2 |
|
126 |
1.1 |
|
132 |
1.1 |
|
136 |
1.1 |
|
139 |
1.1 |
|
140 |
1.1 |
|
149 |
1.1 |
|
166 |
1.1 |
|
169 |
1.1 2.2 |
|
174 |
1.1 |
|
179 |
1.1 |
|
181 |
1.1 |
|
182 |
1.1 |