| 1 | /* | |
| 2 | * Copyright 2014-2026 Grzegorz Piwowarek, https://4comprehension.com/ | |
| 3 | * | |
| 4 | * Licensed under the Apache License, Version 2.0 (the "License"); | |
| 5 | * you may not use this file except in compliance with the License. | |
| 6 | * You may obtain a copy of the License at | |
| 7 | * | |
| 8 | * https://www.apache.org/licenses/LICENSE-2.0 | |
| 9 | * | |
| 10 | * Unless required by applicable law or agreed to in writing, software | |
| 11 | * distributed under the License is distributed on an "AS IS" BASIS, | |
| 12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| 13 | * See the License for the specific language governing permissions and | |
| 14 | * limitations under the License. | |
| 15 | */ | |
| 16 | package com.pivovarit.collectors; | |
| 17 | ||
| 18 | import java.util.Objects; | |
| 19 | import java.util.concurrent.BlockingQueue; | |
| 20 | import java.util.concurrent.CompletableFuture; | |
| 21 | import java.util.concurrent.Executor; | |
| 22 | import java.util.concurrent.FutureTask; | |
| 23 | import java.util.concurrent.LinkedBlockingQueue; | |
| 24 | import java.util.concurrent.RejectedExecutionException; | |
| 25 | import java.util.concurrent.Semaphore; | |
| 26 | import java.util.concurrent.ThreadFactory; | |
| 27 | import java.util.concurrent.atomic.AtomicBoolean; | |
| 28 | import java.util.function.Consumer; | |
| 29 | import java.util.function.Supplier; | |
| 30 | ||
| 31 | import static com.pivovarit.collectors.Preconditions.requireValidExecutor; | |
| 32 | ||
| 33 | /** | |
| 34 | * @author Grzegorz Piwowarek | |
| 35 | */ | |
| 36 | final class Dispatcher<T> { | |
| 37 | ||
| 38 | private final CompletableFuture<Void> completionSignaller = new CompletableFuture<>(); | |
| 39 | private final BlockingQueue<DispatchItem> workingQueue = new LinkedBlockingQueue<>(); | |
| 40 | ||
| 41 | private final ThreadFactory dispatcherThreadFactory = Thread.ofVirtual() | |
| 42 | .name("parallel-collectors-dispatcher-",0) | |
| 43 | .factory(); | |
| 44 | ||
| 45 | private final Consumer<Thread> dispatcherThreadHook; | |
| 46 | ||
| 47 | private final Executor executor; | |
| 48 | private final Semaphore limiter; | |
| 49 | ||
| 50 | private final AtomicBoolean started = new AtomicBoolean(false); | |
| 51 | private final AtomicBoolean stopped = new AtomicBoolean(false); | |
| 52 | ||
| 53 | Dispatcher(Executor executor, int permits, Consumer<Thread> dispatcherThreadHook) { | |
| 54 |
1
1. <init> : removed call to com/pivovarit/collectors/Preconditions::requireValidExecutor → SURVIVED |
requireValidExecutor(executor); |
| 55 | this.executor = executor; | |
| 56 | this.dispatcherThreadHook = dispatcherThreadHook; | |
| 57 | this.limiter = new Semaphore(permits); | |
| 58 | } | |
| 59 | ||
| 60 | Dispatcher(Executor executor, int permits) { | |
| 61 | this(executor, permits, c -> {}); | |
| 62 | } | |
| 63 | ||
| 64 | Dispatcher(Executor executor) { | |
| 65 | this(executor, c -> {}); | |
| 66 | } | |
| 67 | ||
| 68 | Dispatcher(Executor executor, Consumer<Thread> dispatcherThreadHook) { | |
| 69 |
1
1. <init> : removed call to com/pivovarit/collectors/Preconditions::requireValidExecutor → SURVIVED |
requireValidExecutor(executor); |
| 70 | this.executor = executor; | |
| 71 | this.dispatcherThreadHook = dispatcherThreadHook; | |
| 72 | this.limiter = null; | |
| 73 | } | |
| 74 | ||
| 75 | void start() { | |
| 76 |
1
1. start : negated conditional → KILLED |
if (!started.getAndSet(true)) { |
| 77 | var thread = dispatcherThreadFactory.newThread(() -> { | |
| 78 | try { | |
| 79 | while (true) { | |
| 80 | switch (workingQueue.take()) { | |
| 81 | case DispatchItem.Task(Runnable task) -> { | |
| 82 | try { | |
| 83 |
1
1. lambda$start$0 : negated conditional → KILLED |
if (limiter != null) { |
| 84 |
1
1. lambda$start$0 : removed call to java/util/concurrent/Semaphore::acquire → KILLED |
limiter.acquire(); |
| 85 | } | |
| 86 | } catch (InterruptedException e) { | |
| 87 | completionSignaller.completeExceptionally(e); | |
| 88 | return; | |
| 89 | } | |
| 90 | try { | |
| 91 |
2
1. lambda$start$1 : removed call to java/util/concurrent/Executor::execute → TIMED_OUT 2. lambda$start$0 : removed call to com/pivovarit/collectors/Dispatcher::retry → TIMED_OUT |
retry(() -> executor.execute(() -> { |
| 92 | try { | |
| 93 |
1
1. lambda$start$2 : removed call to java/lang/Runnable::run → TIMED_OUT |
task.run(); |
| 94 | } finally { | |
| 95 |
1
1. lambda$start$2 : negated conditional → TIMED_OUT |
if (limiter != null) { |
| 96 |
1
1. lambda$start$2 : removed call to java/util/concurrent/Semaphore::release → TIMED_OUT |
limiter.release(); |
| 97 | } | |
| 98 | } | |
| 99 | })); | |
| 100 | } catch (RejectedExecutionException e) { | |
| 101 |
1
1. lambda$start$0 : negated conditional → SURVIVED |
if (limiter != null) { |
| 102 |
1
1. lambda$start$0 : removed call to java/util/concurrent/Semaphore::release → SURVIVED |
limiter.release(); |
| 103 | } | |
| 104 | ||
| 105 | throw e; | |
| 106 | } | |
| 107 | } | |
| 108 | case DispatchItem.Stop ignored -> { | |
| 109 | return; | |
| 110 | } | |
| 111 | } | |
| 112 | } | |
| 113 | } catch (Throwable e) { | |
| 114 | completionSignaller.completeExceptionally(e); | |
| 115 | } | |
| 116 | }); | |
| 117 |
1
1. start : removed call to java/util/function/Consumer::accept → KILLED |
dispatcherThreadHook.accept(thread); |
| 118 |
1
1. start : removed call to java/lang/Thread::start → TIMED_OUT |
thread.start(); |
| 119 | } | |
| 120 | } | |
| 121 | ||
| 122 | void stop() { | |
| 123 |
1
1. stop : negated conditional → TIMED_OUT |
if (!stopped.getAndSet(true)) { |
| 124 | try { | |
| 125 |
1
1. stop : removed call to java/util/concurrent/BlockingQueue::put → TIMED_OUT |
workingQueue.put(DispatchItem.Stop.POISON_PILL); |
| 126 | } catch (InterruptedException e) { | |
| 127 | completionSignaller.completeExceptionally(e); | |
| 128 | } | |
| 129 | } | |
| 130 | } | |
| 131 | ||
| 132 | boolean wasStarted() { | |
| 133 |
2
1. wasStarted : replaced boolean return with false for com/pivovarit/collectors/Dispatcher::wasStarted → SURVIVED 2. wasStarted : replaced boolean return with true for com/pivovarit/collectors/Dispatcher::wasStarted → TIMED_OUT |
return started.get(); |
| 134 | } | |
| 135 | ||
| 136 | boolean wasStopped() { | |
| 137 |
2
1. wasStopped : replaced boolean return with true for com/pivovarit/collectors/Dispatcher::wasStopped → SURVIVED 2. wasStopped : replaced boolean return with false for com/pivovarit/collectors/Dispatcher::wasStopped → TIMED_OUT |
return stopped.get(); |
| 138 | } | |
| 139 | ||
| 140 | CompletableFuture<T> submit(Supplier<T> supplier) { | |
| 141 | InterruptibleCompletableFuture<T> future = new InterruptibleCompletableFuture<>(); | |
| 142 | completionSignaller.whenComplete((result, ex) -> { | |
| 143 |
1
1. lambda$submit$0 : negated conditional → TIMED_OUT |
if (ex != null) { |
| 144 | future.completeExceptionally(ex); | |
| 145 | future.cancel(true); | |
| 146 | } | |
| 147 | }); | |
| 148 | var task = new FutureTask<>(() -> { | |
| 149 | try { | |
| 150 | future.complete(supplier.get()); | |
| 151 | } catch (Throwable e) { | |
| 152 | completionSignaller.completeExceptionally(e); | |
| 153 | } | |
| 154 | }, null); | |
| 155 |
1
1. submit : removed call to com/pivovarit/collectors/Dispatcher$InterruptibleCompletableFuture::completedBy → KILLED |
future.completedBy(task); |
| 156 | workingQueue.add(new DispatchItem.Task(task)); | |
| 157 |
1
1. submit : replaced return value with null for com/pivovarit/collectors/Dispatcher::submit → KILLED |
return future; |
| 158 | } | |
| 159 | ||
| 160 | static final class InterruptibleCompletableFuture<T> extends CompletableFuture<T> { | |
| 161 | ||
| 162 | private volatile FutureTask<?> backingTask; | |
| 163 | ||
| 164 | private void completedBy(FutureTask<?> task) { | |
| 165 | backingTask = task; | |
| 166 | } | |
| 167 | ||
| 168 | @Override | |
| 169 | public boolean cancel(boolean mayInterruptIfRunning) { | |
| 170 |
1
1. cancel : negated conditional → KILLED |
if (backingTask != null) { |
| 171 | backingTask.cancel(mayInterruptIfRunning); | |
| 172 | } | |
| 173 |
2
1. cancel : replaced boolean return with false for com/pivovarit/collectors/Dispatcher$InterruptibleCompletableFuture::cancel → SURVIVED 2. cancel : replaced boolean return with true for com/pivovarit/collectors/Dispatcher$InterruptibleCompletableFuture::cancel → SURVIVED |
return super.cancel(mayInterruptIfRunning); |
| 174 | } | |
| 175 | } | |
| 176 | ||
| 177 | private static void retry(Runnable runnable) { | |
| 178 | try { | |
| 179 |
1
1. retry : removed call to java/lang/Runnable::run → TIMED_OUT |
runnable.run(); |
| 180 | } catch (RejectedExecutionException e) { | |
| 181 |
1
1. retry : removed call to java/lang/Thread::onSpinWait → SURVIVED |
Thread.onSpinWait(); |
| 182 |
1
1. retry : removed call to java/lang/Runnable::run → KILLED |
runnable.run(); |
| 183 | } | |
| 184 | } | |
| 185 | ||
| 186 | sealed interface DispatchItem permits DispatchItem.Task, DispatchItem.Stop { | |
| 187 | record Task(FutureTask<?> task) implements DispatchItem { | |
| 188 | public Task { | |
| 189 | Objects.requireNonNull(task); | |
| 190 | } | |
| 191 | } | |
| 192 | ||
| 193 | enum Stop implements DispatchItem { | |
| 194 | POISON_PILL | |
| 195 | } | |
| 196 | } | |
| 197 | } | |
Mutations | ||
| 54 |
1.1 |
|
| 69 |
1.1 |
|
| 76 |
1.1 |
|
| 83 |
1.1 |
|
| 84 |
1.1 |
|
| 91 |
1.1 2.2 |
|
| 93 |
1.1 |
|
| 95 |
1.1 |
|
| 96 |
1.1 |
|
| 101 |
1.1 |
|
| 102 |
1.1 |
|
| 117 |
1.1 |
|
| 118 |
1.1 |
|
| 123 |
1.1 |
|
| 125 |
1.1 |
|
| 133 |
1.1 2.2 |
|
| 137 |
1.1 2.2 |
|
| 143 |
1.1 |
|
| 155 |
1.1 |
|
| 157 |
1.1 |
|
| 170 |
1.1 |
|
| 173 |
1.1 2.2 |
|
| 179 |
1.1 |
|
| 181 |
1.1 |
|
| 182 |
1.1 |