1 | package com.pivovarit.collectors; | |
2 | ||
3 | import java.util.concurrent.BlockingQueue; | |
4 | import java.util.concurrent.CompletableFuture; | |
5 | import java.util.concurrent.Executor; | |
6 | import java.util.concurrent.ExecutorService; | |
7 | import java.util.concurrent.Executors; | |
8 | import java.util.concurrent.FutureTask; | |
9 | import java.util.concurrent.LinkedBlockingQueue; | |
10 | import java.util.concurrent.Semaphore; | |
11 | import java.util.concurrent.SynchronousQueue; | |
12 | import java.util.concurrent.ThreadPoolExecutor; | |
13 | import java.util.concurrent.TimeUnit; | |
14 | import java.util.concurrent.atomic.AtomicBoolean; | |
15 | import java.util.function.Function; | |
16 | import java.util.function.Supplier; | |
17 | ||
18 | import static java.lang.Runtime.getRuntime; | |
19 | ||
20 | /** | |
21 | * @author Grzegorz Piwowarek | |
22 | */ | |
23 | final class Dispatcher<T> { | |
24 | ||
25 |
1
1. lambda$static$0 : removed call to java/io/PrintStream::println → NO_COVERAGE |
private static final Runnable POISON_PILL = () -> System.out.println("Why so serious?"); |
26 | ||
27 | private final CompletableFuture<Void> completionSignaller = new CompletableFuture<>(); | |
28 | ||
29 | private final BlockingQueue<Runnable> workingQueue = new LinkedBlockingQueue<>(); | |
30 | ||
31 | private final ExecutorService dispatcher = newLazySingleThreadExecutor(); | |
32 | private final Executor executor; | |
33 | private final Semaphore limiter; | |
34 | ||
35 | private final AtomicBoolean started = new AtomicBoolean(false); | |
36 | ||
37 | private volatile boolean shortCircuited = false; | |
38 | ||
39 | private Dispatcher(Executor executor, int permits) { | |
40 | this.executor = executor; | |
41 | this.limiter = new Semaphore(permits); | |
42 | } | |
43 | ||
44 | static <T> Dispatcher<T> of(Executor executor, int permits) { | |
45 |
1
1. of : replaced return value with null for com/pivovarit/collectors/Dispatcher::of → KILLED |
return new Dispatcher<>(executor, permits); |
46 | } | |
47 | ||
48 | void start() { | |
49 |
1
1. start : negated conditional → KILLED |
if (!started.getAndSet(true)) { |
50 |
1
1. start : removed call to java/util/concurrent/ExecutorService::execute → TIMED_OUT |
dispatcher.execute(() -> { |
51 | try { | |
52 | while (true) { | |
53 | Runnable task; | |
54 |
1
1. lambda$start$1 : negated conditional → TIMED_OUT |
if ((task = workingQueue.take()) != POISON_PILL) { |
55 |
1
1. lambda$start$1 : removed call to java/util/concurrent/Semaphore::acquire → KILLED |
limiter.acquire(); |
56 |
1
1. lambda$start$1 : removed call to java/util/concurrent/Executor::execute → TIMED_OUT |
executor.execute(withFinally(task, limiter::release)); |
57 | } else { | |
58 | break; | |
59 | } | |
60 | } | |
61 | } catch (Throwable e) { | |
62 |
1
1. lambda$start$1 : removed call to com/pivovarit/collectors/Dispatcher::handle → TIMED_OUT |
handle(e); |
63 | } | |
64 | }); | |
65 | } | |
66 | } | |
67 | ||
68 | void stop() { | |
69 | try { | |
70 |
1
1. stop : removed call to java/util/concurrent/BlockingQueue::put → KILLED |
workingQueue.put(POISON_PILL); |
71 | } catch (InterruptedException e) { | |
72 | completionSignaller.completeExceptionally(e); | |
73 | } finally { | |
74 |
1
1. stop : removed call to java/util/concurrent/ExecutorService::shutdown → KILLED |
dispatcher.shutdown(); |
75 | } | |
76 | } | |
77 | ||
78 | boolean isRunning() { | |
79 |
2
1. isRunning : replaced boolean return with false for com/pivovarit/collectors/Dispatcher::isRunning → SURVIVED 2. isRunning : replaced boolean return with true for com/pivovarit/collectors/Dispatcher::isRunning → TIMED_OUT |
return started.get(); |
80 | } | |
81 | ||
82 | CompletableFuture<T> enqueue(Supplier<T> supplier) { | |
83 | InterruptibleCompletableFuture<T> future = new InterruptibleCompletableFuture<>(); | |
84 | workingQueue.add(completionTask(supplier, future)); | |
85 | completionSignaller.exceptionally(shortcircuit(future)); | |
86 |
1
1. enqueue : replaced return value with null for com/pivovarit/collectors/Dispatcher::enqueue → KILLED |
return future; |
87 | } | |
88 | ||
89 | private FutureTask<Void> completionTask(Supplier<T> supplier, InterruptibleCompletableFuture<T> future) { | |
90 | FutureTask<Void> task = new FutureTask<>(() -> { | |
91 | try { | |
92 |
1
1. lambda$completionTask$2 : negated conditional → TIMED_OUT |
if (!shortCircuited) { |
93 | future.complete(supplier.get()); | |
94 | } | |
95 | } catch (Throwable e) { | |
96 |
1
1. lambda$completionTask$2 : removed call to com/pivovarit/collectors/Dispatcher::handle → TIMED_OUT |
handle(e); |
97 | } | |
98 | }, null); | |
99 |
1
1. completionTask : removed call to com/pivovarit/collectors/Dispatcher$InterruptibleCompletableFuture::access$000 → KILLED |
future.completedBy(task); |
100 |
1
1. completionTask : replaced return value with null for com/pivovarit/collectors/Dispatcher::completionTask → KILLED |
return task; |
101 | } | |
102 | ||
103 | private void handle(Throwable e) { | |
104 | shortCircuited = true; | |
105 | completionSignaller.completeExceptionally(e); | |
106 | dispatcher.shutdownNow(); | |
107 | } | |
108 | ||
109 | private static Function<Throwable, Void> shortcircuit(InterruptibleCompletableFuture<?> future) { | |
110 |
1
1. shortcircuit : replaced return value with null for com/pivovarit/collectors/Dispatcher::shortcircuit → KILLED |
return throwable -> { |
111 | future.completeExceptionally(throwable); | |
112 | future.cancel(true); | |
113 | return null; | |
114 | }; | |
115 | } | |
116 | ||
117 | private static Runnable withFinally(Runnable task, Runnable finisher) { | |
118 |
1
1. withFinally : replaced return value with null for com/pivovarit/collectors/Dispatcher::withFinally → KILLED |
return () -> { |
119 | try { | |
120 |
1
1. lambda$withFinally$4 : removed call to java/lang/Runnable::run → TIMED_OUT |
task.run(); |
121 | } finally { | |
122 |
1
1. lambda$withFinally$4 : removed call to java/lang/Runnable::run → TIMED_OUT |
finisher.run(); |
123 | } | |
124 | }; | |
125 | } | |
126 | ||
127 | static int getDefaultParallelism() { | |
128 |
2
1. getDefaultParallelism : Replaced integer subtraction with addition → NO_COVERAGE 2. getDefaultParallelism : replaced int return with 0 for com/pivovarit/collectors/Dispatcher::getDefaultParallelism → NO_COVERAGE |
return Math.max(getRuntime().availableProcessors() - 1, 4); |
129 | } | |
130 | ||
131 | private static ThreadPoolExecutor newLazySingleThreadExecutor() { | |
132 |
1
1. newLazySingleThreadExecutor : replaced return value with null for com/pivovarit/collectors/Dispatcher::newLazySingleThreadExecutor → KILLED |
return new ThreadPoolExecutor(0, 1, |
133 | 0L, TimeUnit.MILLISECONDS, | |
134 | new SynchronousQueue<>(), | |
135 | task -> { | |
136 | Thread thread = Executors.defaultThreadFactory().newThread(task); | |
137 |
1
1. lambda$newLazySingleThreadExecutor$5 : removed call to java/lang/Thread::setName → SURVIVED |
thread.setName("parallel-collector-" + thread.getName()); |
138 |
1
1. lambda$newLazySingleThreadExecutor$5 : removed call to java/lang/Thread::setDaemon → SURVIVED |
thread.setDaemon(false); |
139 |
1
1. lambda$newLazySingleThreadExecutor$5 : replaced return value with null for com/pivovarit/collectors/Dispatcher::lambda$newLazySingleThreadExecutor$5 → KILLED |
return thread; |
140 | }); | |
141 | } | |
142 | ||
143 | static final class InterruptibleCompletableFuture<T> extends CompletableFuture<T> { | |
144 | private volatile FutureTask<?> backingTask; | |
145 | ||
146 | private void completedBy(FutureTask<Void> task) { | |
147 | backingTask = task; | |
148 | } | |
149 | ||
150 | @Override | |
151 | public boolean cancel(boolean mayInterruptIfRunning) { | |
152 |
1
1. cancel : negated conditional → KILLED |
if (backingTask != null) { |
153 | backingTask.cancel(mayInterruptIfRunning); | |
154 | } | |
155 |
2
1. cancel : replaced boolean return with false for com/pivovarit/collectors/Dispatcher$InterruptibleCompletableFuture::cancel → SURVIVED 2. cancel : replaced boolean return with true for com/pivovarit/collectors/Dispatcher$InterruptibleCompletableFuture::cancel → SURVIVED |
return super.cancel(mayInterruptIfRunning); |
156 | } | |
157 | } | |
158 | } | |
Mutations | ||
25 |
1.1 |
|
45 |
1.1 |
|
49 |
1.1 |
|
50 |
1.1 |
|
54 |
1.1 |
|
55 |
1.1 |
|
56 |
1.1 |
|
62 |
1.1 |
|
70 |
1.1 |
|
74 |
1.1 |
|
79 |
1.1 2.2 |
|
86 |
1.1 |
|
92 |
1.1 |
|
96 |
1.1 |
|
99 |
1.1 |
|
100 |
1.1 |
|
110 |
1.1 |
|
118 |
1.1 |
|
120 |
1.1 |
|
122 |
1.1 |
|
128 |
1.1 2.2 |
|
132 |
1.1 |
|
137 |
1.1 |
|
138 |
1.1 |
|
139 |
1.1 |
|
152 |
1.1 |
|
155 |
1.1 2.2 |