1 | package com.pivovarit.collectors; | |
2 | ||
3 | import java.util.concurrent.BlockingQueue; | |
4 | import java.util.concurrent.CompletableFuture; | |
5 | import java.util.concurrent.Executor; | |
6 | import java.util.concurrent.ExecutorService; | |
7 | import java.util.concurrent.Executors; | |
8 | import java.util.concurrent.FutureTask; | |
9 | import java.util.concurrent.LinkedBlockingQueue; | |
10 | import java.util.concurrent.RejectedExecutionException; | |
11 | import java.util.concurrent.Semaphore; | |
12 | import java.util.concurrent.ThreadPoolExecutor; | |
13 | import java.util.concurrent.atomic.AtomicBoolean; | |
14 | import java.util.function.Function; | |
15 | import java.util.function.Supplier; | |
16 | ||
17 | /** | |
18 | * @author Grzegorz Piwowarek | |
19 | */ | |
20 | final class Dispatcher<T> { | |
21 | ||
22 |
1
1. lambda$static$0 : removed call to java/io/PrintStream::println → NO_COVERAGE |
private static final Runnable POISON_PILL = () -> System.out.println("Why so serious?"); |
23 | ||
24 | private final CompletableFuture<Void> completionSignaller = new CompletableFuture<>(); | |
25 | private final BlockingQueue<Runnable> workingQueue = new LinkedBlockingQueue<>(); | |
26 | ||
27 | private final Executor executor; | |
28 | private final Semaphore limiter; | |
29 | ||
30 | private final AtomicBoolean started = new AtomicBoolean(false); | |
31 | ||
32 | private volatile boolean shortCircuited = false; | |
33 | ||
34 | private Dispatcher() { | |
35 | this.executor = defaultExecutorService(); | |
36 | this.limiter = null; | |
37 | } | |
38 | ||
39 | private Dispatcher(Executor executor, int permits) { | |
40 |
1
1. <init> : removed call to com/pivovarit/collectors/Dispatcher::requireValidExecutor → KILLED |
requireValidExecutor(executor); |
41 | this.executor = executor; | |
42 | this.limiter = new Semaphore(permits); | |
43 | } | |
44 | ||
45 | private Dispatcher(int permits) { | |
46 | this.executor = defaultExecutorService(); | |
47 | this.limiter = new Semaphore(permits); | |
48 | } | |
49 | ||
50 | static <T> Dispatcher<T> from(Executor executor, int permits) { | |
51 |
1
1. from : replaced return value with null for com/pivovarit/collectors/Dispatcher::from → KILLED |
return new Dispatcher<>(executor, permits); |
52 | } | |
53 | ||
54 | static <T> Dispatcher<T> virtual() { | |
55 |
1
1. virtual : replaced return value with null for com/pivovarit/collectors/Dispatcher::virtual → KILLED |
return new Dispatcher<>(); |
56 | } | |
57 | ||
58 | static <T> Dispatcher<T> virtual(int permits) { | |
59 |
1
1. virtual : replaced return value with null for com/pivovarit/collectors/Dispatcher::virtual → KILLED |
return new Dispatcher<>(permits); |
60 | } | |
61 | ||
62 | void start() { | |
63 |
1
1. start : negated conditional → TIMED_OUT |
if (!started.getAndSet(true)) { |
64 | Thread.ofVirtual().start(() -> { | |
65 | try { | |
66 | while (true) { | |
67 | try { | |
68 |
1
1. lambda$start$3 : negated conditional → KILLED |
if (limiter != null) { |
69 |
1
1. lambda$start$3 : removed call to java/util/concurrent/Semaphore::acquire → KILLED |
limiter.acquire(); |
70 | } | |
71 | } catch (InterruptedException e) { | |
72 |
1
1. lambda$start$3 : removed call to com/pivovarit/collectors/Dispatcher::handle → NO_COVERAGE |
handle(e); |
73 | } | |
74 | Runnable task; | |
75 |
1
1. lambda$start$3 : negated conditional → TIMED_OUT |
if ((task = workingQueue.take()) != POISON_PILL) { |
76 |
1
1. lambda$start$3 : removed call to com/pivovarit/collectors/Dispatcher::retry → TIMED_OUT |
retry(() -> { |
77 |
1
1. lambda$start$2 : removed call to java/util/concurrent/Executor::execute → TIMED_OUT |
executor.execute(() -> { |
78 | try { | |
79 |
1
1. lambda$start$1 : removed call to java/lang/Runnable::run → TIMED_OUT |
task.run(); |
80 | } finally { | |
81 |
1
1. lambda$start$1 : negated conditional → TIMED_OUT |
if (limiter != null) { |
82 |
1
1. lambda$start$1 : removed call to java/util/concurrent/Semaphore::release → TIMED_OUT |
limiter.release(); |
83 | } | |
84 | } | |
85 | }); | |
86 | }); | |
87 | } else { | |
88 | break; | |
89 | } | |
90 | } | |
91 | } catch (Throwable e) { | |
92 |
1
1. lambda$start$3 : removed call to com/pivovarit/collectors/Dispatcher::handle → TIMED_OUT |
handle(e); |
93 | } | |
94 | }); | |
95 | } | |
96 | } | |
97 | ||
98 | void stop() { | |
99 | try { | |
100 |
1
1. stop : removed call to java/util/concurrent/BlockingQueue::put → SURVIVED |
workingQueue.put(POISON_PILL); |
101 | } catch (InterruptedException e) { | |
102 | completionSignaller.completeExceptionally(e); | |
103 | } | |
104 | } | |
105 | ||
106 | boolean isRunning() { | |
107 |
2
1. isRunning : replaced boolean return with false for com/pivovarit/collectors/Dispatcher::isRunning → SURVIVED 2. isRunning : replaced boolean return with true for com/pivovarit/collectors/Dispatcher::isRunning → TIMED_OUT |
return started.get(); |
108 | } | |
109 | ||
110 | CompletableFuture<T> enqueue(Supplier<T> supplier) { | |
111 | InterruptibleCompletableFuture<T> future = new InterruptibleCompletableFuture<>(); | |
112 | workingQueue.add(completionTask(supplier, future)); | |
113 | completionSignaller.exceptionally(shortcircuit(future)); | |
114 |
1
1. enqueue : replaced return value with null for com/pivovarit/collectors/Dispatcher::enqueue → KILLED |
return future; |
115 | } | |
116 | ||
117 | private FutureTask<Void> completionTask(Supplier<T> supplier, InterruptibleCompletableFuture<T> future) { | |
118 | FutureTask<Void> task = new FutureTask<>(() -> { | |
119 | try { | |
120 |
1
1. lambda$completionTask$4 : negated conditional → TIMED_OUT |
if (!shortCircuited) { |
121 | future.complete(supplier.get()); | |
122 | } | |
123 | } catch (Throwable e) { | |
124 |
1
1. lambda$completionTask$4 : removed call to com/pivovarit/collectors/Dispatcher::handle → TIMED_OUT |
handle(e); |
125 | } | |
126 | }, null); | |
127 |
1
1. completionTask : removed call to com/pivovarit/collectors/Dispatcher$InterruptibleCompletableFuture::completedBy → KILLED |
future.completedBy(task); |
128 |
1
1. completionTask : replaced return value with null for com/pivovarit/collectors/Dispatcher::completionTask → KILLED |
return task; |
129 | } | |
130 | ||
131 | private void handle(Throwable e) { | |
132 | shortCircuited = true; | |
133 | completionSignaller.completeExceptionally(e); | |
134 | } | |
135 | ||
136 | private static Function<Throwable, Void> shortcircuit(InterruptibleCompletableFuture<?> future) { | |
137 |
1
1. shortcircuit : replaced return value with null for com/pivovarit/collectors/Dispatcher::shortcircuit → KILLED |
return throwable -> { |
138 | future.completeExceptionally(throwable); | |
139 | future.cancel(true); | |
140 | return null; | |
141 | }; | |
142 | } | |
143 | ||
144 | static final class InterruptibleCompletableFuture<T> extends CompletableFuture<T> { | |
145 | ||
146 | private volatile FutureTask<?> backingTask; | |
147 | ||
148 | private void completedBy(FutureTask<Void> task) { | |
149 | backingTask = task; | |
150 | } | |
151 | ||
152 | @Override | |
153 | public boolean cancel(boolean mayInterruptIfRunning) { | |
154 |
1
1. cancel : negated conditional → KILLED |
if (backingTask != null) { |
155 | backingTask.cancel(mayInterruptIfRunning); | |
156 | } | |
157 |
2
1. cancel : replaced boolean return with false for com/pivovarit/collectors/Dispatcher$InterruptibleCompletableFuture::cancel → SURVIVED 2. cancel : replaced boolean return with true for com/pivovarit/collectors/Dispatcher$InterruptibleCompletableFuture::cancel → SURVIVED |
return super.cancel(mayInterruptIfRunning); |
158 | } | |
159 | } | |
160 | ||
161 | private static ExecutorService defaultExecutorService() { | |
162 |
1
1. defaultExecutorService : replaced return value with null for com/pivovarit/collectors/Dispatcher::defaultExecutorService → KILLED |
return Executors.newVirtualThreadPerTaskExecutor(); |
163 | } | |
164 | ||
165 | private static void requireValidExecutor(Executor executor) { | |
166 |
1
1. requireValidExecutor : negated conditional → KILLED |
if (executor instanceof ThreadPoolExecutor tpe) { |
167 | switch (tpe.getRejectedExecutionHandler()) { | |
168 | case ThreadPoolExecutor.DiscardPolicy __ -> | |
169 | throw new IllegalArgumentException("Executor's RejectedExecutionHandler can't discard tasks"); | |
170 | case ThreadPoolExecutor.DiscardOldestPolicy __ -> | |
171 | throw new IllegalArgumentException("Executor's RejectedExecutionHandler can't discard tasks"); | |
172 | default -> { | |
173 | // no-op | |
174 | } | |
175 | } | |
176 | } | |
177 | } | |
178 | ||
179 | private static void retry(Runnable runnable) { | |
180 | try { | |
181 |
1
1. retry : removed call to java/lang/Runnable::run → TIMED_OUT |
runnable.run(); |
182 | } catch (RejectedExecutionException e) { | |
183 |
1
1. retry : removed call to java/lang/Thread::onSpinWait → SURVIVED |
Thread.onSpinWait(); |
184 |
1
1. retry : removed call to java/lang/Runnable::run → TIMED_OUT |
runnable.run(); |
185 | } | |
186 | } | |
187 | } | |
Mutations | ||
22 |
1.1 |
|
40 |
1.1 |
|
51 |
1.1 |
|
55 |
1.1 |
|
59 |
1.1 |
|
63 |
1.1 |
|
68 |
1.1 |
|
69 |
1.1 |
|
72 |
1.1 |
|
75 |
1.1 |
|
76 |
1.1 |
|
77 |
1.1 |
|
79 |
1.1 |
|
81 |
1.1 |
|
82 |
1.1 |
|
92 |
1.1 |
|
100 |
1.1 |
|
107 |
1.1 2.2 |
|
114 |
1.1 |
|
120 |
1.1 |
|
124 |
1.1 |
|
127 |
1.1 |
|
128 |
1.1 |
|
137 |
1.1 |
|
154 |
1.1 |
|
157 |
1.1 2.2 |
|
162 |
1.1 |
|
166 |
1.1 |
|
181 |
1.1 |
|
183 |
1.1 |
|
184 |
1.1 |