| 1 | package com.pivovarit.collectors; | |
| 2 | ||
| 3 | import java.util.concurrent.BlockingQueue; | |
| 4 | import java.util.concurrent.CompletableFuture; | |
| 5 | import java.util.concurrent.Executor; | |
| 6 | import java.util.concurrent.FutureTask; | |
| 7 | import java.util.concurrent.LinkedBlockingQueue; | |
| 8 | import java.util.concurrent.RejectedExecutionException; | |
| 9 | import java.util.concurrent.Semaphore; | |
| 10 | import java.util.concurrent.ThreadFactory; | |
| 11 | import java.util.concurrent.atomic.AtomicBoolean; | |
| 12 | import java.util.function.Function; | |
| 13 | import java.util.function.Supplier; | |
| 14 | ||
| 15 | import static com.pivovarit.collectors.Preconditions.requireValidExecutor; | |
| 16 | ||
| 17 | /** | |
| 18 | * @author Grzegorz Piwowarek | |
| 19 | */ | |
| 20 | final class Dispatcher<T> { | |
| 21 | ||
| 22 | private static final Runnable POISON_PILL = () -> System.out.println("Why so serious?"); | |
| 23 | ||
| 24 | private final CompletableFuture<Void> completionSignaller = new CompletableFuture<>(); | |
| 25 | private final BlockingQueue<Runnable> workingQueue = new LinkedBlockingQueue<>(); | |
| 26 | ||
| 27 | private final ThreadFactory dispatcherThreadFactory = Thread::startVirtualThread; | |
| 28 | ||
| 29 | private final Executor executor; | |
| 30 | private final Semaphore limiter; | |
| 31 | ||
| 32 | private final AtomicBoolean started = new AtomicBoolean(false); | |
| 33 | ||
| 34 | Dispatcher(Executor executor, int permits) { | |
| 35 |
1
1. <init> : removed call to com/pivovarit/collectors/Preconditions::requireValidExecutor → SURVIVED |
requireValidExecutor(executor); |
| 36 | this.executor = executor; | |
| 37 | this.limiter = new Semaphore(permits); | |
| 38 | } | |
| 39 | ||
| 40 | Dispatcher(Executor executor) { | |
| 41 |
1
1. <init> : removed call to com/pivovarit/collectors/Preconditions::requireValidExecutor → SURVIVED |
requireValidExecutor(executor); |
| 42 | this.executor = executor; | |
| 43 | this.limiter = null; | |
| 44 | } | |
| 45 | ||
| 46 | void start() { | |
| 47 |
1
1. start : negated conditional → TIMED_OUT |
if (!started.getAndSet(true)) { |
| 48 | dispatcherThreadFactory.newThread(() -> { | |
| 49 | try { | |
| 50 | while (true) { | |
| 51 | try { | |
| 52 |
1
1. lambda$start$0 : negated conditional → KILLED |
if (limiter != null) { |
| 53 |
1
1. lambda$start$0 : removed call to java/util/concurrent/Semaphore::acquire → KILLED |
limiter.acquire(); |
| 54 | } | |
| 55 | } catch (InterruptedException e) { | |
| 56 |
1
1. lambda$start$0 : removed call to com/pivovarit/collectors/Dispatcher::handleException → NO_COVERAGE |
handleException(e); |
| 57 | } | |
| 58 | Runnable task; | |
| 59 |
1
1. lambda$start$0 : negated conditional → TIMED_OUT |
if ((task = workingQueue.take()) != POISON_PILL) { |
| 60 |
2
1. lambda$start$1 : removed call to java/util/concurrent/Executor::execute → TIMED_OUT 2. lambda$start$0 : removed call to com/pivovarit/collectors/Dispatcher::retry → TIMED_OUT |
retry(() -> executor.execute(() -> { |
| 61 | try { | |
| 62 |
1
1. lambda$start$2 : removed call to java/lang/Runnable::run → TIMED_OUT |
task.run(); |
| 63 | } finally { | |
| 64 |
1
1. lambda$start$2 : negated conditional → TIMED_OUT |
if (limiter != null) { |
| 65 |
1
1. lambda$start$2 : removed call to java/util/concurrent/Semaphore::release → TIMED_OUT |
limiter.release(); |
| 66 | } | |
| 67 | } | |
| 68 | })); | |
| 69 | } else { | |
| 70 | break; | |
| 71 | } | |
| 72 | } | |
| 73 | } catch (Throwable e) { | |
| 74 |
1
1. lambda$start$0 : removed call to com/pivovarit/collectors/Dispatcher::handleException → KILLED |
handleException(e); |
| 75 | } | |
| 76 | }); | |
| 77 | } | |
| 78 | } | |
| 79 | ||
| 80 | void stop() { | |
| 81 | try { | |
| 82 |
1
1. stop : removed call to java/util/concurrent/BlockingQueue::put → SURVIVED |
workingQueue.put(POISON_PILL); |
| 83 | } catch (InterruptedException e) { | |
| 84 | completionSignaller.completeExceptionally(e); | |
| 85 | } | |
| 86 | } | |
| 87 | ||
| 88 | boolean isRunning() { | |
| 89 |
2
1. isRunning : replaced boolean return with false for com/pivovarit/collectors/Dispatcher::isRunning → SURVIVED 2. isRunning : replaced boolean return with true for com/pivovarit/collectors/Dispatcher::isRunning → TIMED_OUT |
return started.get(); |
| 90 | } | |
| 91 | ||
| 92 | CompletableFuture<T> enqueue(Supplier<T> supplier) { | |
| 93 | InterruptibleCompletableFuture<T> future = new InterruptibleCompletableFuture<>(); | |
| 94 | workingQueue.add(completionTask(supplier, future)); | |
| 95 | completionSignaller.exceptionally(shortcircuit(future)); | |
| 96 |
1
1. enqueue : replaced return value with null for com/pivovarit/collectors/Dispatcher::enqueue → KILLED |
return future; |
| 97 | } | |
| 98 | ||
| 99 | private FutureTask<Void> completionTask(Supplier<T> supplier, InterruptibleCompletableFuture<T> future) { | |
| 100 | FutureTask<Void> task = new FutureTask<>(() -> { | |
| 101 | try { | |
| 102 | future.complete(supplier.get()); | |
| 103 | } catch (Throwable e) { | |
| 104 |
1
1. lambda$completionTask$0 : removed call to com/pivovarit/collectors/Dispatcher::handleException → TIMED_OUT |
handleException(e); |
| 105 | } | |
| 106 | }, null); | |
| 107 |
1
1. completionTask : removed call to com/pivovarit/collectors/Dispatcher$InterruptibleCompletableFuture::completedBy → KILLED |
future.completedBy(task); |
| 108 |
1
1. completionTask : replaced return value with null for com/pivovarit/collectors/Dispatcher::completionTask → KILLED |
return task; |
| 109 | } | |
| 110 | ||
| 111 | private void handleException(Throwable e) { | |
| 112 | completionSignaller.completeExceptionally(e); | |
| 113 | ||
| 114 | for (Runnable runnable : workingQueue) { | |
| 115 |
1
1. handleException : negated conditional → SURVIVED |
if (runnable instanceof FutureTask<?> task) { |
| 116 | task.cancel(true); | |
| 117 | } | |
| 118 | } | |
| 119 | } | |
| 120 | ||
| 121 | private static Function<Throwable, Void> shortcircuit(InterruptibleCompletableFuture<?> future) { | |
| 122 |
1
1. shortcircuit : replaced return value with null for com/pivovarit/collectors/Dispatcher::shortcircuit → KILLED |
return throwable -> { |
| 123 | future.completeExceptionally(throwable); | |
| 124 | future.cancel(true); | |
| 125 | return null; | |
| 126 | }; | |
| 127 | } | |
| 128 | ||
| 129 | static final class InterruptibleCompletableFuture<T> extends CompletableFuture<T> { | |
| 130 | ||
| 131 | private volatile FutureTask<?> backingTask; | |
| 132 | ||
| 133 | private void completedBy(FutureTask<Void> task) { | |
| 134 | backingTask = task; | |
| 135 | } | |
| 136 | ||
| 137 | @Override | |
| 138 | public boolean cancel(boolean mayInterruptIfRunning) { | |
| 139 |
1
1. cancel : negated conditional → KILLED |
if (backingTask != null) { |
| 140 | backingTask.cancel(mayInterruptIfRunning); | |
| 141 | } | |
| 142 |
2
1. cancel : replaced boolean return with false for com/pivovarit/collectors/Dispatcher$InterruptibleCompletableFuture::cancel → SURVIVED 2. cancel : replaced boolean return with true for com/pivovarit/collectors/Dispatcher$InterruptibleCompletableFuture::cancel → SURVIVED |
return super.cancel(mayInterruptIfRunning); |
| 143 | } | |
| 144 | } | |
| 145 | ||
| 146 | private static void retry(Runnable runnable) { | |
| 147 | try { | |
| 148 |
1
1. retry : removed call to java/lang/Runnable::run → TIMED_OUT |
runnable.run(); |
| 149 | } catch (RejectedExecutionException e) { | |
| 150 |
1
1. retry : removed call to java/lang/Thread::onSpinWait → SURVIVED |
Thread.onSpinWait(); |
| 151 |
1
1. retry : removed call to java/lang/Runnable::run → KILLED |
runnable.run(); |
| 152 | } | |
| 153 | } | |
| 154 | } | |
Mutations | ||
| 35 |
1.1 |
|
| 41 |
1.1 |
|
| 47 |
1.1 |
|
| 52 |
1.1 |
|
| 53 |
1.1 |
|
| 56 |
1.1 |
|
| 59 |
1.1 |
|
| 60 |
1.1 2.2 |
|
| 62 |
1.1 |
|
| 64 |
1.1 |
|
| 65 |
1.1 |
|
| 74 |
1.1 |
|
| 82 |
1.1 |
|
| 89 |
1.1 2.2 |
|
| 96 |
1.1 |
|
| 104 |
1.1 |
|
| 107 |
1.1 |
|
| 108 |
1.1 |
|
| 115 |
1.1 |
|
| 122 |
1.1 |
|
| 139 |
1.1 |
|
| 142 |
1.1 2.2 |
|
| 148 |
1.1 |
|
| 150 |
1.1 |
|
| 151 |
1.1 |