Dispatcher.java

1
package com.pivovarit.collectors;
2
3
import java.util.concurrent.BlockingQueue;
4
import java.util.concurrent.CompletableFuture;
5
import java.util.concurrent.Executor;
6
import java.util.concurrent.ExecutorService;
7
import java.util.concurrent.Executors;
8
import java.util.concurrent.FutureTask;
9
import java.util.concurrent.LinkedBlockingQueue;
10
import java.util.concurrent.RejectedExecutionException;
11
import java.util.concurrent.Semaphore;
12
import java.util.concurrent.ThreadFactory;
13
import java.util.concurrent.atomic.AtomicBoolean;
14
import java.util.function.Function;
15
import java.util.function.Supplier;
16
17
import static com.pivovarit.collectors.Preconditions.requireValidExecutor;
18
19
/**
20
 * @author Grzegorz Piwowarek
21
 */
22
final class Dispatcher<T> {
23
24
    private static final Runnable POISON_PILL = () -> System.out.println("Why so serious?");
25
26
    private final CompletableFuture<Void> completionSignaller = new CompletableFuture<>();
27
    private final BlockingQueue<Runnable> workingQueue = new LinkedBlockingQueue<>();
28
29
    private final ThreadFactory dispatcherThreadFactory = Thread::startVirtualThread;
30
31
    private final Executor executor;
32
    private final Semaphore limiter;
33
34
    private final AtomicBoolean started = new AtomicBoolean(false);
35
36
    private volatile boolean shortCircuited = false;
37
38
    private Dispatcher() {
39
        this.executor = defaultExecutorService();
40
        this.limiter = null;
41
    }
42
43
    private Dispatcher(Executor executor, int permits) {
44 1 1. <init> : removed call to com/pivovarit/collectors/Preconditions::requireValidExecutor → SURVIVED
        requireValidExecutor(executor);
45
        this.executor = executor;
46
        this.limiter = new Semaphore(permits);
47
    }
48
49
    private Dispatcher(int permits) {
50
        this.executor = defaultExecutorService();
51
        this.limiter = new Semaphore(permits);
52
    }
53
54
    private Dispatcher(Executor executor) {
55 1 1. <init> : removed call to com/pivovarit/collectors/Preconditions::requireValidExecutor → SURVIVED
        requireValidExecutor(executor);
56
        this.executor = executor;
57
        this.limiter = null;
58
    }
59
60
    static <T> Dispatcher<T> from(Executor executor) {
61 1 1. from : replaced return value with null for com/pivovarit/collectors/Dispatcher::from → KILLED
        return new Dispatcher<>(executor);
62
    }
63
64
    static <T> Dispatcher<T> from(Executor executor, int permits) {
65 1 1. from : replaced return value with null for com/pivovarit/collectors/Dispatcher::from → KILLED
        return new Dispatcher<>(executor, permits);
66
    }
67
68
    static <T> Dispatcher<T> virtual() {
69 1 1. virtual : replaced return value with null for com/pivovarit/collectors/Dispatcher::virtual → KILLED
        return new Dispatcher<>();
70
    }
71
72
    static <T> Dispatcher<T> virtual(int permits) {
73 1 1. virtual : replaced return value with null for com/pivovarit/collectors/Dispatcher::virtual → KILLED
        return new Dispatcher<>(permits);
74
    }
75
76
    void start() {
77 1 1. start : negated conditional → TIMED_OUT
        if (!started.getAndSet(true)) {
78
            dispatcherThreadFactory.newThread(() -> {
79
                try {
80
                    while (true) {
81
                        try {
82 1 1. lambda$start$3 : negated conditional → KILLED
                            if (limiter != null) {
83 1 1. lambda$start$3 : removed call to java/util/concurrent/Semaphore::acquire → KILLED
                                limiter.acquire();
84
                            }
85
                        } catch (InterruptedException e) {
86 1 1. lambda$start$3 : removed call to com/pivovarit/collectors/Dispatcher::handle → NO_COVERAGE
                            handle(e);
87
                        }
88
                        Runnable task;
89 1 1. lambda$start$3 : negated conditional → TIMED_OUT
                        if ((task = workingQueue.take()) != POISON_PILL) {
90 2 1. lambda$start$3 : removed call to com/pivovarit/collectors/Dispatcher::retry → TIMED_OUT
2. lambda$start$2 : removed call to java/util/concurrent/Executor::execute → TIMED_OUT
                            retry(() -> executor.execute(() -> {
91
                                try {
92 1 1. lambda$start$1 : removed call to java/lang/Runnable::run → TIMED_OUT
                                    task.run();
93
                                } finally {
94 1 1. lambda$start$1 : negated conditional → TIMED_OUT
                                    if (limiter != null) {
95 1 1. lambda$start$1 : removed call to java/util/concurrent/Semaphore::release → TIMED_OUT
                                        limiter.release();
96
                                    }
97
                                }
98
                            }));
99
                        } else {
100
                            break;
101
                        }
102
                    }
103
                } catch (Throwable e) {
104 1 1. lambda$start$3 : removed call to com/pivovarit/collectors/Dispatcher::handle → KILLED
                    handle(e);
105
                }
106
            });
107
        }
108
    }
109
110
    void stop() {
111
        try {
112 1 1. stop : removed call to java/util/concurrent/BlockingQueue::put → SURVIVED
            workingQueue.put(POISON_PILL);
113
        } catch (InterruptedException e) {
114
            completionSignaller.completeExceptionally(e);
115
        }
116
    }
117
118
    boolean isRunning() {
119 2 1. isRunning : replaced boolean return with false for com/pivovarit/collectors/Dispatcher::isRunning → SURVIVED
2. isRunning : replaced boolean return with true for com/pivovarit/collectors/Dispatcher::isRunning → TIMED_OUT
        return started.get();
120
    }
121
122
    CompletableFuture<T> enqueue(Supplier<T> supplier) {
123
        InterruptibleCompletableFuture<T> future = new InterruptibleCompletableFuture<>();
124
        workingQueue.add(completionTask(supplier, future));
125
        completionSignaller.exceptionally(shortcircuit(future));
126 1 1. enqueue : replaced return value with null for com/pivovarit/collectors/Dispatcher::enqueue → KILLED
        return future;
127
    }
128
129
    private FutureTask<Void> completionTask(Supplier<T> supplier, InterruptibleCompletableFuture<T> future) {
130
        FutureTask<Void> task = new FutureTask<>(() -> {
131
            try {
132 1 1. lambda$completionTask$4 : negated conditional → TIMED_OUT
                if (!shortCircuited) {
133
                    future.complete(supplier.get());
134
                }
135
            } catch (Throwable e) {
136 1 1. lambda$completionTask$4 : removed call to com/pivovarit/collectors/Dispatcher::handle → TIMED_OUT
                handle(e);
137
            }
138
        }, null);
139 1 1. completionTask : removed call to com/pivovarit/collectors/Dispatcher$InterruptibleCompletableFuture::completedBy → KILLED
        future.completedBy(task);
140 1 1. completionTask : replaced return value with null for com/pivovarit/collectors/Dispatcher::completionTask → KILLED
        return task;
141
    }
142
143
    private void handle(Throwable e) {
144
        shortCircuited = true;
145
        completionSignaller.completeExceptionally(e);
146
    }
147
148
    private static Function<Throwable, Void> shortcircuit(InterruptibleCompletableFuture<?> future) {
149 1 1. shortcircuit : replaced return value with null for com/pivovarit/collectors/Dispatcher::shortcircuit → KILLED
        return throwable -> {
150
            future.completeExceptionally(throwable);
151
            future.cancel(true);
152
            return null;
153
        };
154
    }
155
156
    static final class InterruptibleCompletableFuture<T> extends CompletableFuture<T> {
157
158
        private volatile FutureTask<?> backingTask;
159
160
        private void completedBy(FutureTask<Void> task) {
161
            backingTask = task;
162
        }
163
164
        @Override
165
        public boolean cancel(boolean mayInterruptIfRunning) {
166 1 1. cancel : negated conditional → KILLED
            if (backingTask != null) {
167
                backingTask.cancel(mayInterruptIfRunning);
168
            }
169 2 1. cancel : replaced boolean return with false for com/pivovarit/collectors/Dispatcher$InterruptibleCompletableFuture::cancel → SURVIVED
2. cancel : replaced boolean return with true for com/pivovarit/collectors/Dispatcher$InterruptibleCompletableFuture::cancel → SURVIVED
            return super.cancel(mayInterruptIfRunning);
170
        }
171
    }
172
173
    private static ExecutorService defaultExecutorService() {
174 1 1. defaultExecutorService : replaced return value with null for com/pivovarit/collectors/Dispatcher::defaultExecutorService → KILLED
        return Executors.newVirtualThreadPerTaskExecutor();
175
    }
176
177
    private static void retry(Runnable runnable) {
178
        try {
179 1 1. retry : removed call to java/lang/Runnable::run → TIMED_OUT
            runnable.run();
180
        } catch (RejectedExecutionException e) {
181 1 1. retry : removed call to java/lang/Thread::onSpinWait → SURVIVED
            Thread.onSpinWait();
182 1 1. retry : removed call to java/lang/Runnable::run → KILLED
            runnable.run();
183
        }
184
    }
185
}

Mutations

44

1.1
Location : <init>
Killed by : none
removed call to com/pivovarit/collectors/Preconditions::requireValidExecutor → SURVIVED
Covering tests

55

1.1
Location : <init>
Killed by : none
removed call to com/pivovarit/collectors/Preconditions::requireValidExecutor → SURVIVED
Covering tests

61

1.1
Location : from
Killed by : com.pivovarit.collectors.test.BasicProcessingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicProcessingTest]/[test-factory:shouldProcessEmpty()]/[dynamic-test:#17]
replaced return value with null for com/pivovarit/collectors/Dispatcher::from → KILLED

65

1.1
Location : from
Killed by : com.pivovarit.collectors.test.BasicParallelismTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicParallelismTest]/[test-factory:shouldProcessEmptyWithMaxParallelism()]/[dynamic-test:#32]
replaced return value with null for com/pivovarit/collectors/Dispatcher::from → KILLED

69

1.1
Location : virtual
Killed by : com.pivovarit.collectors.test.BasicProcessingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicProcessingTest]/[test-factory:shouldProcessEmpty()]/[dynamic-test:#15]
replaced return value with null for com/pivovarit/collectors/Dispatcher::virtual → KILLED

73

1.1
Location : virtual
Killed by : com.pivovarit.collectors.test.BasicParallelismTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicParallelismTest]/[test-factory:shouldProcessEmptyWithMaxParallelism()]/[dynamic-test:#53]
replaced return value with null for com/pivovarit/collectors/Dispatcher::virtual → KILLED

77

1.1
Location : start
Killed by : none
negated conditional → TIMED_OUT

82

1.1
Location : lambda$start$3
Killed by : com.pivovarit.collectors.test.BasicProcessingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicProcessingTest]/[test-factory:shouldProcessAllElementsInOrder()]/[dynamic-test:#10]
negated conditional → KILLED

83

1.1
Location : lambda$start$3
Killed by : com.pivovarit.collectors.test.ExecutorPollutionTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.ExecutorPollutionTest]/[test-factory:shouldNotPolluteExecutorFactoryLimitedParallelism()]/[dynamic-test:#4]
removed call to java/util/concurrent/Semaphore::acquire → KILLED

86

1.1
Location : lambda$start$3
Killed by : none
removed call to com/pivovarit/collectors/Dispatcher::handle → NO_COVERAGE

89

1.1
Location : lambda$start$3
Killed by : none
negated conditional → TIMED_OUT

90

1.1
Location : lambda$start$3
Killed by : none
removed call to com/pivovarit/collectors/Dispatcher::retry → TIMED_OUT

2.2
Location : lambda$start$2
Killed by : none
removed call to java/util/concurrent/Executor::execute → TIMED_OUT

92

1.1
Location : lambda$start$1
Killed by : none
removed call to java/lang/Runnable::run → TIMED_OUT

94

1.1
Location : lambda$start$1
Killed by : none
negated conditional → TIMED_OUT

95

1.1
Location : lambda$start$1
Killed by : none
removed call to java/util/concurrent/Semaphore::release → TIMED_OUT

104

1.1
Location : lambda$start$3
Killed by : com.pivovarit.collectors.test.RejectedExecutionHandlingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.RejectedExecutionHandlingTest]/[test-factory:shouldRejectInvalidRejectedExecutionHandlerWhenParallelismOneFactory()]/[dynamic-test:#4]
removed call to com/pivovarit/collectors/Dispatcher::handle → KILLED

112

1.1
Location : stop
Killed by : none
removed call to java/util/concurrent/BlockingQueue::put → SURVIVED
Covering tests

119

1.1
Location : isRunning
Killed by : none
replaced boolean return with true for com/pivovarit/collectors/Dispatcher::isRunning → TIMED_OUT

2.2
Location : isRunning
Killed by : none
replaced boolean return with false for com/pivovarit/collectors/Dispatcher::isRunning → SURVIVED
Covering tests

126

1.1
Location : enqueue
Killed by : com.pivovarit.collectors.test.BasicProcessingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicProcessingTest]/[test-factory:shouldProcessAllElementsInOrder()]/[dynamic-test:#10]
replaced return value with null for com/pivovarit/collectors/Dispatcher::enqueue → KILLED

132

1.1
Location : lambda$completionTask$4
Killed by : none
negated conditional → TIMED_OUT

136

1.1
Location : lambda$completionTask$4
Killed by : none
removed call to com/pivovarit/collectors/Dispatcher::handle → TIMED_OUT

139

1.1
Location : completionTask
Killed by : com.pivovarit.collectors.test.BasicProcessingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicProcessingTest]/[test-factory:shouldInterruptOnException()]/[dynamic-test:#15]
removed call to com/pivovarit/collectors/Dispatcher$InterruptibleCompletableFuture::completedBy → KILLED

140

1.1
Location : completionTask
Killed by : com.pivovarit.collectors.test.BasicProcessingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicProcessingTest]/[test-factory:shouldProcessAllElementsInOrder()]/[dynamic-test:#10]
replaced return value with null for com/pivovarit/collectors/Dispatcher::completionTask → KILLED

149

1.1
Location : shortcircuit
Killed by : com.pivovarit.collectors.test.BasicProcessingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicProcessingTest]/[test-factory:shouldProcessAllElementsInOrder()]/[dynamic-test:#10]
replaced return value with null for com/pivovarit/collectors/Dispatcher::shortcircuit → KILLED

166

1.1
Location : cancel
Killed by : com.pivovarit.collectors.test.BasicProcessingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicProcessingTest]/[test-factory:shouldInterruptOnException()]/[dynamic-test:#15]
negated conditional → KILLED

169

1.1
Location : cancel
Killed by : none
replaced boolean return with false for com/pivovarit/collectors/Dispatcher$InterruptibleCompletableFuture::cancel → SURVIVED
Covering tests

2.2
Location : cancel
Killed by : none
replaced boolean return with true for com/pivovarit/collectors/Dispatcher$InterruptibleCompletableFuture::cancel → SURVIVED Covering tests

174

1.1
Location : defaultExecutorService
Killed by : com.pivovarit.collectors.test.BasicProcessingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicProcessingTest]/[test-factory:shouldProcessAllElementsInOrder()]/[dynamic-test:#10]
replaced return value with null for com/pivovarit/collectors/Dispatcher::defaultExecutorService → KILLED

179

1.1
Location : retry
Killed by : none
removed call to java/lang/Runnable::run → TIMED_OUT

181

1.1
Location : retry
Killed by : none
removed call to java/lang/Thread::onSpinWait → SURVIVED
Covering tests

182

1.1
Location : retry
Killed by : com.pivovarit.collectors.test.RejectedExecutionHandlingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.RejectedExecutionHandlingTest]/[test-factory:shouldRejectInvalidRejectedExecutionHandlerWhenParallelismOneFactory()]/[dynamic-test:#4]
removed call to java/lang/Runnable::run → KILLED

Active mutators

Tests examined


Report generated by PIT 1.17.3