1 | package com.pivovarit.collectors; | |
2 | ||
3 | import java.util.concurrent.BlockingQueue; | |
4 | import java.util.concurrent.CompletableFuture; | |
5 | import java.util.concurrent.Executor; | |
6 | import java.util.concurrent.FutureTask; | |
7 | import java.util.concurrent.LinkedBlockingQueue; | |
8 | import java.util.concurrent.RejectedExecutionException; | |
9 | import java.util.concurrent.Semaphore; | |
10 | import java.util.concurrent.ThreadFactory; | |
11 | import java.util.concurrent.atomic.AtomicBoolean; | |
12 | import java.util.function.Function; | |
13 | import java.util.function.Supplier; | |
14 | ||
15 | import static com.pivovarit.collectors.Preconditions.requireValidExecutor; | |
16 | ||
17 | /** | |
18 | * @author Grzegorz Piwowarek | |
19 | */ | |
20 | final class Dispatcher<T> { | |
21 | ||
22 | private static final Runnable POISON_PILL = () -> System.out.println("Why so serious?"); | |
23 | ||
24 | private final CompletableFuture<Void> completionSignaller = new CompletableFuture<>(); | |
25 | private final BlockingQueue<Runnable> workingQueue = new LinkedBlockingQueue<>(); | |
26 | ||
27 | private final ThreadFactory dispatcherThreadFactory = Thread::startVirtualThread; | |
28 | ||
29 | private final Executor executor; | |
30 | private final Semaphore limiter; | |
31 | ||
32 | private final AtomicBoolean started = new AtomicBoolean(false); | |
33 | ||
34 | private volatile boolean shortCircuited = false; | |
35 | ||
36 | Dispatcher(Executor executor, int permits) { | |
37 |
1
1. <init> : removed call to com/pivovarit/collectors/Preconditions::requireValidExecutor → SURVIVED |
requireValidExecutor(executor); |
38 | this.executor = executor; | |
39 | this.limiter = new Semaphore(permits); | |
40 | } | |
41 | ||
42 | Dispatcher(Executor executor) { | |
43 |
1
1. <init> : removed call to com/pivovarit/collectors/Preconditions::requireValidExecutor → SURVIVED |
requireValidExecutor(executor); |
44 | this.executor = executor; | |
45 | this.limiter = null; | |
46 | } | |
47 | ||
48 | void start() { | |
49 |
1
1. start : negated conditional → TIMED_OUT |
if (!started.getAndSet(true)) { |
50 | dispatcherThreadFactory.newThread(() -> { | |
51 | try { | |
52 | while (true) { | |
53 | try { | |
54 |
1
1. lambda$start$0 : negated conditional → KILLED |
if (limiter != null) { |
55 |
1
1. lambda$start$0 : removed call to java/util/concurrent/Semaphore::acquire → KILLED |
limiter.acquire(); |
56 | } | |
57 | } catch (InterruptedException e) { | |
58 |
1
1. lambda$start$0 : removed call to com/pivovarit/collectors/Dispatcher::handle → NO_COVERAGE |
handle(e); |
59 | } | |
60 | Runnable task; | |
61 |
1
1. lambda$start$0 : negated conditional → TIMED_OUT |
if ((task = workingQueue.take()) != POISON_PILL) { |
62 |
2
1. lambda$start$1 : removed call to java/util/concurrent/Executor::execute → TIMED_OUT 2. lambda$start$0 : removed call to com/pivovarit/collectors/Dispatcher::retry → TIMED_OUT |
retry(() -> executor.execute(() -> { |
63 | try { | |
64 |
1
1. lambda$start$2 : removed call to java/lang/Runnable::run → TIMED_OUT |
task.run(); |
65 | } finally { | |
66 |
1
1. lambda$start$2 : negated conditional → TIMED_OUT |
if (limiter != null) { |
67 |
1
1. lambda$start$2 : removed call to java/util/concurrent/Semaphore::release → TIMED_OUT |
limiter.release(); |
68 | } | |
69 | } | |
70 | })); | |
71 | } else { | |
72 | break; | |
73 | } | |
74 | } | |
75 | } catch (Throwable e) { | |
76 |
1
1. lambda$start$0 : removed call to com/pivovarit/collectors/Dispatcher::handle → KILLED |
handle(e); |
77 | } | |
78 | }); | |
79 | } | |
80 | } | |
81 | ||
82 | void stop() { | |
83 | try { | |
84 |
1
1. stop : removed call to java/util/concurrent/BlockingQueue::put → SURVIVED |
workingQueue.put(POISON_PILL); |
85 | } catch (InterruptedException e) { | |
86 | completionSignaller.completeExceptionally(e); | |
87 | } | |
88 | } | |
89 | ||
90 | boolean isRunning() { | |
91 |
2
1. isRunning : replaced boolean return with false for com/pivovarit/collectors/Dispatcher::isRunning → SURVIVED 2. isRunning : replaced boolean return with true for com/pivovarit/collectors/Dispatcher::isRunning → TIMED_OUT |
return started.get(); |
92 | } | |
93 | ||
94 | CompletableFuture<T> enqueue(Supplier<T> supplier) { | |
95 | InterruptibleCompletableFuture<T> future = new InterruptibleCompletableFuture<>(); | |
96 | workingQueue.add(completionTask(supplier, future)); | |
97 | completionSignaller.exceptionally(shortcircuit(future)); | |
98 |
1
1. enqueue : replaced return value with null for com/pivovarit/collectors/Dispatcher::enqueue → KILLED |
return future; |
99 | } | |
100 | ||
101 | private FutureTask<Void> completionTask(Supplier<T> supplier, InterruptibleCompletableFuture<T> future) { | |
102 | FutureTask<Void> task = new FutureTask<>(() -> { | |
103 | try { | |
104 |
1
1. lambda$completionTask$0 : negated conditional → TIMED_OUT |
if (!shortCircuited) { |
105 | future.complete(supplier.get()); | |
106 | } | |
107 | } catch (Throwable e) { | |
108 |
1
1. lambda$completionTask$0 : removed call to com/pivovarit/collectors/Dispatcher::handle → TIMED_OUT |
handle(e); |
109 | } | |
110 | }, null); | |
111 |
1
1. completionTask : removed call to com/pivovarit/collectors/Dispatcher$InterruptibleCompletableFuture::completedBy → KILLED |
future.completedBy(task); |
112 |
1
1. completionTask : replaced return value with null for com/pivovarit/collectors/Dispatcher::completionTask → KILLED |
return task; |
113 | } | |
114 | ||
115 | private void handle(Throwable e) { | |
116 | shortCircuited = true; | |
117 | completionSignaller.completeExceptionally(e); | |
118 | } | |
119 | ||
120 | private static Function<Throwable, Void> shortcircuit(InterruptibleCompletableFuture<?> future) { | |
121 |
1
1. shortcircuit : replaced return value with null for com/pivovarit/collectors/Dispatcher::shortcircuit → KILLED |
return throwable -> { |
122 | future.completeExceptionally(throwable); | |
123 | future.cancel(true); | |
124 | return null; | |
125 | }; | |
126 | } | |
127 | ||
128 | static final class InterruptibleCompletableFuture<T> extends CompletableFuture<T> { | |
129 | ||
130 | private volatile FutureTask<?> backingTask; | |
131 | ||
132 | private void completedBy(FutureTask<Void> task) { | |
133 | backingTask = task; | |
134 | } | |
135 | ||
136 | @Override | |
137 | public boolean cancel(boolean mayInterruptIfRunning) { | |
138 |
1
1. cancel : negated conditional → KILLED |
if (backingTask != null) { |
139 | backingTask.cancel(mayInterruptIfRunning); | |
140 | } | |
141 |
2
1. cancel : replaced boolean return with false for com/pivovarit/collectors/Dispatcher$InterruptibleCompletableFuture::cancel → SURVIVED 2. cancel : replaced boolean return with true for com/pivovarit/collectors/Dispatcher$InterruptibleCompletableFuture::cancel → SURVIVED |
return super.cancel(mayInterruptIfRunning); |
142 | } | |
143 | } | |
144 | ||
145 | private static void retry(Runnable runnable) { | |
146 | try { | |
147 |
1
1. retry : removed call to java/lang/Runnable::run → TIMED_OUT |
runnable.run(); |
148 | } catch (RejectedExecutionException e) { | |
149 |
1
1. retry : removed call to java/lang/Thread::onSpinWait → SURVIVED |
Thread.onSpinWait(); |
150 |
1
1. retry : removed call to java/lang/Runnable::run → KILLED |
runnable.run(); |
151 | } | |
152 | } | |
153 | } | |
Mutations | ||
37 |
1.1 |
|
43 |
1.1 |
|
49 |
1.1 |
|
54 |
1.1 |
|
55 |
1.1 |
|
58 |
1.1 |
|
61 |
1.1 |
|
62 |
1.1 2.2 |
|
64 |
1.1 |
|
66 |
1.1 |
|
67 |
1.1 |
|
76 |
1.1 |
|
84 |
1.1 |
|
91 |
1.1 2.2 |
|
98 |
1.1 |
|
104 |
1.1 |
|
108 |
1.1 |
|
111 |
1.1 |
|
112 |
1.1 |
|
121 |
1.1 |
|
138 |
1.1 |
|
141 |
1.1 2.2 |
|
147 |
1.1 |
|
149 |
1.1 |
|
150 |
1.1 |