1 | package com.pivovarit.collectors; | |
2 | ||
3 | import java.util.concurrent.BlockingQueue; | |
4 | import java.util.concurrent.CompletableFuture; | |
5 | import java.util.concurrent.Executor; | |
6 | import java.util.concurrent.FutureTask; | |
7 | import java.util.concurrent.LinkedBlockingQueue; | |
8 | import java.util.concurrent.RejectedExecutionException; | |
9 | import java.util.concurrent.Semaphore; | |
10 | import java.util.concurrent.ThreadFactory; | |
11 | import java.util.concurrent.atomic.AtomicBoolean; | |
12 | import java.util.function.Function; | |
13 | import java.util.function.Supplier; | |
14 | ||
15 | import static com.pivovarit.collectors.Preconditions.requireValidExecutor; | |
16 | ||
17 | /** | |
18 | * @author Grzegorz Piwowarek | |
19 | */ | |
20 | final class Dispatcher<T> { | |
21 | ||
22 | private static final Runnable POISON_PILL = () -> System.out.println("Why so serious?"); | |
23 | ||
24 | private final CompletableFuture<Void> completionSignaller = new CompletableFuture<>(); | |
25 | private final BlockingQueue<Runnable> workingQueue = new LinkedBlockingQueue<>(); | |
26 | ||
27 | private final ThreadFactory dispatcherThreadFactory = Thread::startVirtualThread; | |
28 | ||
29 | private final Executor executor; | |
30 | private final Semaphore limiter; | |
31 | ||
32 | private final AtomicBoolean started = new AtomicBoolean(false); | |
33 | ||
34 | Dispatcher(Executor executor, int permits) { | |
35 |
1
1. <init> : removed call to com/pivovarit/collectors/Preconditions::requireValidExecutor → SURVIVED |
requireValidExecutor(executor); |
36 | this.executor = executor; | |
37 | this.limiter = new Semaphore(permits); | |
38 | } | |
39 | ||
40 | Dispatcher(Executor executor) { | |
41 |
1
1. <init> : removed call to com/pivovarit/collectors/Preconditions::requireValidExecutor → SURVIVED |
requireValidExecutor(executor); |
42 | this.executor = executor; | |
43 | this.limiter = null; | |
44 | } | |
45 | ||
46 | void start() { | |
47 |
1
1. start : negated conditional → TIMED_OUT |
if (!started.getAndSet(true)) { |
48 | dispatcherThreadFactory.newThread(() -> { | |
49 | try { | |
50 | while (true) { | |
51 | try { | |
52 |
1
1. lambda$start$0 : negated conditional → KILLED |
if (limiter != null) { |
53 |
1
1. lambda$start$0 : removed call to java/util/concurrent/Semaphore::acquire → KILLED |
limiter.acquire(); |
54 | } | |
55 | } catch (InterruptedException e) { | |
56 |
1
1. lambda$start$0 : removed call to com/pivovarit/collectors/Dispatcher::handleException → NO_COVERAGE |
handleException(e); |
57 | } | |
58 | Runnable task; | |
59 |
1
1. lambda$start$0 : negated conditional → TIMED_OUT |
if ((task = workingQueue.take()) != POISON_PILL) { |
60 |
2
1. lambda$start$1 : removed call to java/util/concurrent/Executor::execute → TIMED_OUT 2. lambda$start$0 : removed call to com/pivovarit/collectors/Dispatcher::retry → TIMED_OUT |
retry(() -> executor.execute(() -> { |
61 | try { | |
62 |
1
1. lambda$start$2 : removed call to java/lang/Runnable::run → TIMED_OUT |
task.run(); |
63 | } finally { | |
64 |
1
1. lambda$start$2 : negated conditional → TIMED_OUT |
if (limiter != null) { |
65 |
1
1. lambda$start$2 : removed call to java/util/concurrent/Semaphore::release → TIMED_OUT |
limiter.release(); |
66 | } | |
67 | } | |
68 | })); | |
69 | } else { | |
70 | break; | |
71 | } | |
72 | } | |
73 | } catch (Throwable e) { | |
74 |
1
1. lambda$start$0 : removed call to com/pivovarit/collectors/Dispatcher::handleException → KILLED |
handleException(e); |
75 | } | |
76 | }); | |
77 | } | |
78 | } | |
79 | ||
80 | void stop() { | |
81 | try { | |
82 |
1
1. stop : removed call to java/util/concurrent/BlockingQueue::put → SURVIVED |
workingQueue.put(POISON_PILL); |
83 | } catch (InterruptedException e) { | |
84 | completionSignaller.completeExceptionally(e); | |
85 | } | |
86 | } | |
87 | ||
88 | boolean isRunning() { | |
89 |
2
1. isRunning : replaced boolean return with false for com/pivovarit/collectors/Dispatcher::isRunning → SURVIVED 2. isRunning : replaced boolean return with true for com/pivovarit/collectors/Dispatcher::isRunning → TIMED_OUT |
return started.get(); |
90 | } | |
91 | ||
92 | CompletableFuture<T> enqueue(Supplier<T> supplier) { | |
93 | InterruptibleCompletableFuture<T> future = new InterruptibleCompletableFuture<>(); | |
94 | workingQueue.add(completionTask(supplier, future)); | |
95 | completionSignaller.exceptionally(shortcircuit(future)); | |
96 |
1
1. enqueue : replaced return value with null for com/pivovarit/collectors/Dispatcher::enqueue → KILLED |
return future; |
97 | } | |
98 | ||
99 | private FutureTask<Void> completionTask(Supplier<T> supplier, InterruptibleCompletableFuture<T> future) { | |
100 | FutureTask<Void> task = new FutureTask<>(() -> { | |
101 | try { | |
102 | future.complete(supplier.get()); | |
103 | } catch (Throwable e) { | |
104 |
1
1. lambda$completionTask$0 : removed call to com/pivovarit/collectors/Dispatcher::handleException → TIMED_OUT |
handleException(e); |
105 | } | |
106 | }, null); | |
107 |
1
1. completionTask : removed call to com/pivovarit/collectors/Dispatcher$InterruptibleCompletableFuture::completedBy → KILLED |
future.completedBy(task); |
108 |
1
1. completionTask : replaced return value with null for com/pivovarit/collectors/Dispatcher::completionTask → KILLED |
return task; |
109 | } | |
110 | ||
111 | private void handleException(Throwable e) { | |
112 | completionSignaller.completeExceptionally(e); | |
113 | ||
114 | for (Runnable runnable : workingQueue) { | |
115 |
1
1. handleException : negated conditional → SURVIVED |
if (runnable instanceof FutureTask<?> task) { |
116 | task.cancel(true); | |
117 | } | |
118 | } | |
119 | } | |
120 | ||
121 | private static Function<Throwable, Void> shortcircuit(InterruptibleCompletableFuture<?> future) { | |
122 |
1
1. shortcircuit : replaced return value with null for com/pivovarit/collectors/Dispatcher::shortcircuit → KILLED |
return throwable -> { |
123 | future.completeExceptionally(throwable); | |
124 | future.cancel(true); | |
125 | return null; | |
126 | }; | |
127 | } | |
128 | ||
129 | static final class InterruptibleCompletableFuture<T> extends CompletableFuture<T> { | |
130 | ||
131 | private volatile FutureTask<?> backingTask; | |
132 | ||
133 | private void completedBy(FutureTask<Void> task) { | |
134 | backingTask = task; | |
135 | } | |
136 | ||
137 | @Override | |
138 | public boolean cancel(boolean mayInterruptIfRunning) { | |
139 |
1
1. cancel : negated conditional → KILLED |
if (backingTask != null) { |
140 | backingTask.cancel(mayInterruptIfRunning); | |
141 | } | |
142 |
2
1. cancel : replaced boolean return with false for com/pivovarit/collectors/Dispatcher$InterruptibleCompletableFuture::cancel → SURVIVED 2. cancel : replaced boolean return with true for com/pivovarit/collectors/Dispatcher$InterruptibleCompletableFuture::cancel → SURVIVED |
return super.cancel(mayInterruptIfRunning); |
143 | } | |
144 | } | |
145 | ||
146 | private static void retry(Runnable runnable) { | |
147 | try { | |
148 |
1
1. retry : removed call to java/lang/Runnable::run → TIMED_OUT |
runnable.run(); |
149 | } catch (RejectedExecutionException e) { | |
150 |
1
1. retry : removed call to java/lang/Thread::onSpinWait → SURVIVED |
Thread.onSpinWait(); |
151 |
1
1. retry : removed call to java/lang/Runnable::run → KILLED |
runnable.run(); |
152 | } | |
153 | } | |
154 | } | |
Mutations | ||
35 |
1.1 |
|
41 |
1.1 |
|
47 |
1.1 |
|
52 |
1.1 |
|
53 |
1.1 |
|
56 |
1.1 |
|
59 |
1.1 |
|
60 |
1.1 2.2 |
|
62 |
1.1 |
|
64 |
1.1 |
|
65 |
1.1 |
|
74 |
1.1 |
|
82 |
1.1 |
|
89 |
1.1 2.2 |
|
96 |
1.1 |
|
104 |
1.1 |
|
107 |
1.1 |
|
108 |
1.1 |
|
115 |
1.1 |
|
122 |
1.1 |
|
139 |
1.1 |
|
142 |
1.1 2.2 |
|
148 |
1.1 |
|
150 |
1.1 |
|
151 |
1.1 |