Dispatcher.java

1
/*
2
 * Copyright 2014-2026 Grzegorz Piwowarek, https://4comprehension.com/
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 * https://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
package com.pivovarit.collectors;
17
18
import java.util.Objects;
19
import java.util.concurrent.BlockingQueue;
20
import java.util.concurrent.CompletableFuture;
21
import java.util.concurrent.Executor;
22
import java.util.concurrent.FutureTask;
23
import java.util.concurrent.LinkedBlockingQueue;
24
import java.util.concurrent.RejectedExecutionException;
25
import java.util.concurrent.Semaphore;
26
import java.util.concurrent.ThreadFactory;
27
import java.util.concurrent.atomic.AtomicBoolean;
28
import java.util.function.Consumer;
29
import java.util.function.Supplier;
30
31
import static com.pivovarit.collectors.Preconditions.requireValidExecutor;
32
33
/**
34
 * @author Grzegorz Piwowarek
35
 */
36
final class Dispatcher<T> {
37
38
    private final CompletableFuture<Void> completionSignaller = new CompletableFuture<>();
39
    private final BlockingQueue<DispatchItem> workingQueue = new LinkedBlockingQueue<>();
40
41
    private final ThreadFactory dispatcherThreadFactory = Thread.ofVirtual()
42
      .name("parallel-collectors-dispatcher-",0)
43
      .factory();
44
45
    private final Consumer<Thread> dispatcherThreadHook;
46
47
    private final Executor executor;
48
    private final Semaphore limiter;
49
50
    private final AtomicBoolean started = new AtomicBoolean(false);
51
    private final AtomicBoolean stopped = new AtomicBoolean(false);
52
53
    Dispatcher(Executor executor, int permits, Consumer<Thread> dispatcherThreadHook) {
54 1 1. <init> : removed call to com/pivovarit/collectors/Preconditions::requireValidExecutor → SURVIVED
        requireValidExecutor(executor);
55
        this.executor = executor;
56
        this.dispatcherThreadHook = dispatcherThreadHook;
57
        this.limiter = new Semaphore(permits);
58
    }
59
60
    Dispatcher(Executor executor, int permits) {
61
        this(executor, permits, c -> {});
62
    }
63
64
    Dispatcher(Executor executor) {
65
        this(executor, c -> {});
66
    }
67
68
    Dispatcher(Executor executor, Consumer<Thread> dispatcherThreadHook) {
69 1 1. <init> : removed call to com/pivovarit/collectors/Preconditions::requireValidExecutor → SURVIVED
        requireValidExecutor(executor);
70
        this.executor = executor;
71
        this.dispatcherThreadHook = dispatcherThreadHook;
72
        this.limiter = null;
73
    }
74
75
    void start() {
76 1 1. start : negated conditional → KILLED
        if (!started.getAndSet(true)) {
77
            var thread = dispatcherThreadFactory.newThread(() -> {
78
                try {
79
                    while (true) {
80
                        switch (workingQueue.take()) {
81
                            case DispatchItem.Task(Runnable task) -> {
82
                                try {
83 1 1. lambda$start$0 : negated conditional → KILLED
                                    if (limiter != null) {
84 1 1. lambda$start$0 : removed call to java/util/concurrent/Semaphore::acquire → KILLED
                                        limiter.acquire();
85
                                    }
86
                                } catch (InterruptedException e) {
87
                                    completionSignaller.completeExceptionally(e);
88
                                    return;
89
                                }
90
                                try {
91 2 1. lambda$start$1 : removed call to java/util/concurrent/Executor::execute → TIMED_OUT
2. lambda$start$0 : removed call to com/pivovarit/collectors/Dispatcher::retry → TIMED_OUT
                                    retry(() -> executor.execute(() -> {
92
                                        try {
93 1 1. lambda$start$2 : removed call to java/lang/Runnable::run → TIMED_OUT
                                            task.run();
94
                                        } finally {
95 1 1. lambda$start$2 : negated conditional → TIMED_OUT
                                            if (limiter != null) {
96 1 1. lambda$start$2 : removed call to java/util/concurrent/Semaphore::release → TIMED_OUT
                                                limiter.release();
97
                                            }
98
                                        }
99
                                    }));
100
                                } catch (RejectedExecutionException e) {
101 1 1. lambda$start$0 : negated conditional → SURVIVED
                                    if (limiter != null) {
102 1 1. lambda$start$0 : removed call to java/util/concurrent/Semaphore::release → SURVIVED
                                        limiter.release();
103
                                    }
104
105
                                    throw e;
106
                                }
107
                            }
108
                            case DispatchItem.Stop ignored -> {
109
                                return;
110
                            }
111
                        }
112
                    }
113
                } catch (Throwable e) {
114
                    completionSignaller.completeExceptionally(e);
115
                }
116
            });
117 1 1. start : removed call to java/util/function/Consumer::accept → KILLED
            dispatcherThreadHook.accept(thread);
118 1 1. start : removed call to java/lang/Thread::start → TIMED_OUT
            thread.start();
119
        }
120
    }
121
122
    void stop() {
123 1 1. stop : negated conditional → TIMED_OUT
        if (!stopped.getAndSet(true)) {
124
            try {
125 1 1. stop : removed call to java/util/concurrent/BlockingQueue::put → TIMED_OUT
                workingQueue.put(DispatchItem.Stop.POISON_PILL);
126
            } catch (InterruptedException e) {
127
                completionSignaller.completeExceptionally(e);
128
            }
129
        }
130
    }
131
132
    boolean wasStarted() {
133 2 1. wasStarted : replaced boolean return with false for com/pivovarit/collectors/Dispatcher::wasStarted → SURVIVED
2. wasStarted : replaced boolean return with true for com/pivovarit/collectors/Dispatcher::wasStarted → TIMED_OUT
        return started.get();
134
    }
135
136
    boolean wasStopped() {
137 2 1. wasStopped : replaced boolean return with true for com/pivovarit/collectors/Dispatcher::wasStopped → SURVIVED
2. wasStopped : replaced boolean return with false for com/pivovarit/collectors/Dispatcher::wasStopped → TIMED_OUT
        return stopped.get();
138
    }
139
140
    CompletableFuture<T> submit(Supplier<T> supplier) {
141
        InterruptibleCompletableFuture<T> future = new InterruptibleCompletableFuture<>();
142
        completionSignaller.whenComplete((result, ex) -> {
143 1 1. lambda$submit$0 : negated conditional → TIMED_OUT
            if (ex != null) {
144
                future.completeExceptionally(ex);
145
                future.cancel(true);
146
            }
147
        });
148
        var task = new FutureTask<>(() -> {
149
            try {
150
                future.complete(supplier.get());
151
            } catch (Throwable e) {
152
                completionSignaller.completeExceptionally(e);
153
            }
154
        }, null);
155 1 1. submit : removed call to com/pivovarit/collectors/Dispatcher$InterruptibleCompletableFuture::completedBy → KILLED
        future.completedBy(task);
156
        workingQueue.add(new DispatchItem.Task(task));
157 1 1. submit : replaced return value with null for com/pivovarit/collectors/Dispatcher::submit → KILLED
        return future;
158
    }
159
160
    static final class InterruptibleCompletableFuture<T> extends CompletableFuture<T> {
161
162
        private volatile FutureTask<?> backingTask;
163
164
        private void completedBy(FutureTask<?> task) {
165
            backingTask = task;
166
        }
167
168
        @Override
169
        public boolean cancel(boolean mayInterruptIfRunning) {
170 1 1. cancel : negated conditional → KILLED
            if (backingTask != null) {
171
                backingTask.cancel(mayInterruptIfRunning);
172
            }
173 2 1. cancel : replaced boolean return with false for com/pivovarit/collectors/Dispatcher$InterruptibleCompletableFuture::cancel → SURVIVED
2. cancel : replaced boolean return with true for com/pivovarit/collectors/Dispatcher$InterruptibleCompletableFuture::cancel → SURVIVED
            return super.cancel(mayInterruptIfRunning);
174
        }
175
    }
176
177
    private static void retry(Runnable runnable) {
178
        try {
179 1 1. retry : removed call to java/lang/Runnable::run → TIMED_OUT
            runnable.run();
180
        } catch (RejectedExecutionException e) {
181 1 1. retry : removed call to java/lang/Thread::onSpinWait → SURVIVED
            Thread.onSpinWait();
182 1 1. retry : removed call to java/lang/Runnable::run → KILLED
            runnable.run();
183
        }
184
    }
185
186
    sealed interface DispatchItem permits DispatchItem.Task, DispatchItem.Stop {
187
        record Task(FutureTask<?> task) implements DispatchItem {
188
            public Task {
189
                Objects.requireNonNull(task);
190
            }
191
        }
192
193
        enum Stop implements DispatchItem {
194
            POISON_PILL
195
        }
196
    }
197
}

Mutations

54

1.1
Location : <init>
Killed by : none
removed call to com/pivovarit/collectors/Preconditions::requireValidExecutor → SURVIVED
Covering tests

69

1.1
Location : <init>
Killed by : none
removed call to com/pivovarit/collectors/Preconditions::requireValidExecutor → SURVIVED
Covering tests

76

1.1
Location : start
Killed by : com.pivovarit.collectors.DispatcherTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.DispatcherTest]/[method:shouldShutdownExecutorOnStop()]
negated conditional → KILLED

83

1.1
Location : lambda$start$0
Killed by : com.pivovarit.collectors.test.BasicProcessingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicProcessingTest]/[test-factory:shouldProcessAllElements()]/[dynamic-test:#27]
negated conditional → KILLED

84

1.1
Location : lambda$start$0
Killed by : com.pivovarit.collectors.test.ExecutorPollutionTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.ExecutorPollutionTest]/[test-factory:shouldNotPolluteExecutorFactoryLimitedParallelism()]/[dynamic-test:#2]
removed call to java/util/concurrent/Semaphore::acquire → KILLED

91

1.1
Location : lambda$start$1
Killed by : none
removed call to java/util/concurrent/Executor::execute → TIMED_OUT

2.2
Location : lambda$start$0
Killed by : none
removed call to com/pivovarit/collectors/Dispatcher::retry → TIMED_OUT

93

1.1
Location : lambda$start$2
Killed by : none
removed call to java/lang/Runnable::run → TIMED_OUT

95

1.1
Location : lambda$start$2
Killed by : none
negated conditional → TIMED_OUT

96

1.1
Location : lambda$start$2
Killed by : none
removed call to java/util/concurrent/Semaphore::release → TIMED_OUT

101

1.1
Location : lambda$start$0
Killed by : none
negated conditional → SURVIVED
Covering tests

102

1.1
Location : lambda$start$0
Killed by : none
removed call to java/util/concurrent/Semaphore::release → SURVIVED
Covering tests

117

1.1
Location : start
Killed by : com.pivovarit.collectors.DispatcherTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.DispatcherTest]/[method:shouldShutdownExecutorOnStop()]
removed call to java/util/function/Consumer::accept → KILLED

118

1.1
Location : start
Killed by : none
removed call to java/lang/Thread::start → TIMED_OUT

123

1.1
Location : stop
Killed by : none
negated conditional → TIMED_OUT

125

1.1
Location : stop
Killed by : none
removed call to java/util/concurrent/BlockingQueue::put → TIMED_OUT

133

1.1
Location : wasStarted
Killed by : none
replaced boolean return with true for com/pivovarit/collectors/Dispatcher::wasStarted → TIMED_OUT

2.2
Location : wasStarted
Killed by : none
replaced boolean return with false for com/pivovarit/collectors/Dispatcher::wasStarted → SURVIVED
Covering tests

137

1.1
Location : wasStopped
Killed by : none
replaced boolean return with true for com/pivovarit/collectors/Dispatcher::wasStopped → SURVIVED
Covering tests

2.2
Location : wasStopped
Killed by : none
replaced boolean return with false for com/pivovarit/collectors/Dispatcher::wasStopped → TIMED_OUT

143

1.1
Location : lambda$submit$0
Killed by : none
negated conditional → TIMED_OUT

155

1.1
Location : submit
Killed by : com.pivovarit.collectors.test.BasicProcessingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicProcessingTest]/[test-factory:shouldInterruptOnException()]/[dynamic-test:#27]
removed call to com/pivovarit/collectors/Dispatcher$InterruptibleCompletableFuture::completedBy → KILLED

157

1.1
Location : submit
Killed by : com.pivovarit.collectors.test.NonBlockingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.NonBlockingTest]/[test-factory:shouldNotBlockTheCallingThread()]/[dynamic-test:#2]
replaced return value with null for com/pivovarit/collectors/Dispatcher::submit → KILLED

170

1.1
Location : cancel
Killed by : com.pivovarit.collectors.test.BasicProcessingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicProcessingTest]/[test-factory:shouldInterruptOnException()]/[dynamic-test:#27]
negated conditional → KILLED

173

1.1
Location : cancel
Killed by : none
replaced boolean return with false for com/pivovarit/collectors/Dispatcher$InterruptibleCompletableFuture::cancel → SURVIVED
Covering tests

2.2
Location : cancel
Killed by : none
replaced boolean return with true for com/pivovarit/collectors/Dispatcher$InterruptibleCompletableFuture::cancel → SURVIVED Covering tests

179

1.1
Location : retry
Killed by : none
removed call to java/lang/Runnable::run → TIMED_OUT

181

1.1
Location : retry
Killed by : none
removed call to java/lang/Thread::onSpinWait → SURVIVED
Covering tests

182

1.1
Location : retry
Killed by : com.pivovarit.collectors.test.RejectedExecutionHandlingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.RejectedExecutionHandlingTest]/[test-factory:shouldRejectInvalidRejectedExecutionHandlerFactory()]/[dynamic-test:#7]
removed call to java/lang/Runnable::run → KILLED

Active mutators

Tests examined


Report generated by PIT 1.22.0