Dispatcher.java

1
package com.pivovarit.collectors;
2
3
import java.util.concurrent.BlockingQueue;
4
import java.util.concurrent.CompletableFuture;
5
import java.util.concurrent.Executor;
6
import java.util.concurrent.ExecutorService;
7
import java.util.concurrent.Executors;
8
import java.util.concurrent.FutureTask;
9
import java.util.concurrent.LinkedBlockingQueue;
10
import java.util.concurrent.RejectedExecutionException;
11
import java.util.concurrent.Semaphore;
12
import java.util.concurrent.ThreadPoolExecutor;
13
import java.util.concurrent.atomic.AtomicBoolean;
14
import java.util.function.Function;
15
import java.util.function.Supplier;
16
17
/**
18
 * @author Grzegorz Piwowarek
19
 */
20
final class Dispatcher<T> {
21
22 1 1. lambda$static$0 : removed call to java/io/PrintStream::println → NO_COVERAGE
    private static final Runnable POISON_PILL = () -> System.out.println("Why so serious?");
23
24
    private final CompletableFuture<Void> completionSignaller = new CompletableFuture<>();
25
    private final BlockingQueue<Runnable> workingQueue = new LinkedBlockingQueue<>();
26
27
    private final Executor executor;
28
    private final Semaphore limiter;
29
30
    private final AtomicBoolean started = new AtomicBoolean(false);
31
32
    private volatile boolean shortCircuited = false;
33
34
    private Dispatcher() {
35
        this.executor = defaultExecutorService();
36
        this.limiter = null;
37
    }
38
39
    private Dispatcher(Executor executor, int permits) {
40 1 1. <init> : removed call to com/pivovarit/collectors/Dispatcher::requireValidExecutor → KILLED
        requireValidExecutor(executor);
41
        this.executor = executor;
42
        this.limiter = new Semaphore(permits);
43
    }
44
45
    private Dispatcher(int permits) {
46
        this.executor = defaultExecutorService();
47
        this.limiter = new Semaphore(permits);
48
    }
49
50
    static <T> Dispatcher<T> from(Executor executor, int permits) {
51 1 1. from : replaced return value with null for com/pivovarit/collectors/Dispatcher::from → KILLED
        return new Dispatcher<>(executor, permits);
52
    }
53
54
    static <T> Dispatcher<T> virtual() {
55 1 1. virtual : replaced return value with null for com/pivovarit/collectors/Dispatcher::virtual → KILLED
        return new Dispatcher<>();
56
    }
57
58
    static <T> Dispatcher<T> virtual(int permits) {
59 1 1. virtual : replaced return value with null for com/pivovarit/collectors/Dispatcher::virtual → KILLED
        return new Dispatcher<>(permits);
60
    }
61
62
    void start() {
63 1 1. start : negated conditional → TIMED_OUT
        if (!started.getAndSet(true)) {
64
            Thread.ofVirtual().start(() -> {
65
                try {
66
                    while (true) {
67
                        try {
68 1 1. lambda$start$3 : negated conditional → KILLED
                            if (limiter != null) {
69 1 1. lambda$start$3 : removed call to java/util/concurrent/Semaphore::acquire → KILLED
                                limiter.acquire();
70
                            }
71
                        } catch (InterruptedException e) {
72 1 1. lambda$start$3 : removed call to com/pivovarit/collectors/Dispatcher::handle → NO_COVERAGE
                            handle(e);
73
                        }
74
                        Runnable task;
75 1 1. lambda$start$3 : negated conditional → TIMED_OUT
                        if ((task = workingQueue.take()) != POISON_PILL) {
76 1 1. lambda$start$3 : removed call to com/pivovarit/collectors/Dispatcher::retry → TIMED_OUT
                            retry(() -> {
77 1 1. lambda$start$2 : removed call to java/util/concurrent/Executor::execute → TIMED_OUT
                                executor.execute(() -> {
78
                                    try {
79 1 1. lambda$start$1 : removed call to java/lang/Runnable::run → TIMED_OUT
                                        task.run();
80
                                    } finally {
81 1 1. lambda$start$1 : negated conditional → TIMED_OUT
                                        if (limiter != null) {
82 1 1. lambda$start$1 : removed call to java/util/concurrent/Semaphore::release → TIMED_OUT
                                            limiter.release();
83
                                        }
84
                                    }
85
                                });
86
                            });
87
                        } else {
88
                            break;
89
                        }
90
                    }
91
                } catch (Throwable e) {
92 1 1. lambda$start$3 : removed call to com/pivovarit/collectors/Dispatcher::handle → TIMED_OUT
                    handle(e);
93
                }
94
            });
95
        }
96
    }
97
98
    void stop() {
99
        try {
100 1 1. stop : removed call to java/util/concurrent/BlockingQueue::put → SURVIVED
            workingQueue.put(POISON_PILL);
101
        } catch (InterruptedException e) {
102
            completionSignaller.completeExceptionally(e);
103
        }
104
    }
105
106
    boolean isRunning() {
107 2 1. isRunning : replaced boolean return with false for com/pivovarit/collectors/Dispatcher::isRunning → SURVIVED
2. isRunning : replaced boolean return with true for com/pivovarit/collectors/Dispatcher::isRunning → TIMED_OUT
        return started.get();
108
    }
109
110
    CompletableFuture<T> enqueue(Supplier<T> supplier) {
111
        InterruptibleCompletableFuture<T> future = new InterruptibleCompletableFuture<>();
112
        workingQueue.add(completionTask(supplier, future));
113
        completionSignaller.exceptionally(shortcircuit(future));
114 1 1. enqueue : replaced return value with null for com/pivovarit/collectors/Dispatcher::enqueue → KILLED
        return future;
115
    }
116
117
    private FutureTask<Void> completionTask(Supplier<T> supplier, InterruptibleCompletableFuture<T> future) {
118
        FutureTask<Void> task = new FutureTask<>(() -> {
119
            try {
120 1 1. lambda$completionTask$4 : negated conditional → TIMED_OUT
                if (!shortCircuited) {
121
                    future.complete(supplier.get());
122
                }
123
            } catch (Throwable e) {
124 1 1. lambda$completionTask$4 : removed call to com/pivovarit/collectors/Dispatcher::handle → TIMED_OUT
                handle(e);
125
            }
126
        }, null);
127 1 1. completionTask : removed call to com/pivovarit/collectors/Dispatcher$InterruptibleCompletableFuture::completedBy → KILLED
        future.completedBy(task);
128 1 1. completionTask : replaced return value with null for com/pivovarit/collectors/Dispatcher::completionTask → KILLED
        return task;
129
    }
130
131
    private void handle(Throwable e) {
132
        shortCircuited = true;
133
        completionSignaller.completeExceptionally(e);
134
    }
135
136
    private static Function<Throwable, Void> shortcircuit(InterruptibleCompletableFuture<?> future) {
137 1 1. shortcircuit : replaced return value with null for com/pivovarit/collectors/Dispatcher::shortcircuit → KILLED
        return throwable -> {
138
            future.completeExceptionally(throwable);
139
            future.cancel(true);
140
            return null;
141
        };
142
    }
143
144
    static final class InterruptibleCompletableFuture<T> extends CompletableFuture<T> {
145
146
        private volatile FutureTask<?> backingTask;
147
148
        private void completedBy(FutureTask<Void> task) {
149
            backingTask = task;
150
        }
151
152
        @Override
153
        public boolean cancel(boolean mayInterruptIfRunning) {
154 1 1. cancel : negated conditional → KILLED
            if (backingTask != null) {
155
                backingTask.cancel(mayInterruptIfRunning);
156
            }
157 2 1. cancel : replaced boolean return with false for com/pivovarit/collectors/Dispatcher$InterruptibleCompletableFuture::cancel → SURVIVED
2. cancel : replaced boolean return with true for com/pivovarit/collectors/Dispatcher$InterruptibleCompletableFuture::cancel → SURVIVED
            return super.cancel(mayInterruptIfRunning);
158
        }
159
    }
160
161
    private static ExecutorService defaultExecutorService() {
162 1 1. defaultExecutorService : replaced return value with null for com/pivovarit/collectors/Dispatcher::defaultExecutorService → KILLED
        return Executors.newVirtualThreadPerTaskExecutor();
163
    }
164
165
    private static void requireValidExecutor(Executor executor) {
166 1 1. requireValidExecutor : negated conditional → KILLED
        if (executor instanceof ThreadPoolExecutor tpe) {
167
            switch (tpe.getRejectedExecutionHandler()) {
168
                case ThreadPoolExecutor.DiscardPolicy __ ->
169
                  throw new IllegalArgumentException("Executor's RejectedExecutionHandler can't discard tasks");
170
                case ThreadPoolExecutor.DiscardOldestPolicy __ ->
171
                  throw new IllegalArgumentException("Executor's RejectedExecutionHandler can't discard tasks");
172
                default -> {
173
                    // no-op
174
                }
175
            }
176
        }
177
    }
178
179
    private static void retry(Runnable runnable) {
180
        try {
181 1 1. retry : removed call to java/lang/Runnable::run → TIMED_OUT
            runnable.run();
182
        } catch (RejectedExecutionException e) {
183 1 1. retry : removed call to java/lang/Thread::onSpinWait → SURVIVED
            Thread.onSpinWait();
184 1 1. retry : removed call to java/lang/Runnable::run → TIMED_OUT
            runnable.run();
185
        }
186
    }
187
}

Mutations

22

1.1
Location : lambda$static$0
Killed by : none
removed call to java/io/PrintStream::println → NO_COVERAGE

40

1.1
Location : <init>
Killed by : com.pivovarit.collectors.functional.ExecutorValidationTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.functional.ExecutorValidationTest]/[test-factory:shouldStartProcessingElementsTests()]/[dynamic-test:#4]
removed call to com/pivovarit/collectors/Dispatcher::requireValidExecutor → KILLED

51

1.1
Location : from
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_collectors()]/[dynamic-test:#33]
replaced return value with null for com/pivovarit/collectors/Dispatcher::from → KILLED

55

1.1
Location : virtual
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:collectors()]/[dynamic-test:#51]
replaced return value with null for com/pivovarit/collectors/Dispatcher::virtual → KILLED

59

1.1
Location : virtual
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:collectors()]/[dynamic-test:#63]
replaced return value with null for com/pivovarit/collectors/Dispatcher::virtual → KILLED

63

1.1
Location : start
Killed by : none
negated conditional → TIMED_OUT

68

1.1
Location : lambda$start$3
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_collectors()]/[dynamic-test:#17]
negated conditional → KILLED

69

1.1
Location : lambda$start$3
Killed by : com.pivovarit.collectors.functional.ExecutorPollutionTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.functional.ExecutorPollutionTest]/[test-factory:shouldStartProcessingElementsTests()]/[dynamic-test:#7]
removed call to java/util/concurrent/Semaphore::acquire → KILLED

72

1.1
Location : lambda$start$3
Killed by : none
removed call to com/pivovarit/collectors/Dispatcher::handle → NO_COVERAGE

75

1.1
Location : lambda$start$3
Killed by : none
negated conditional → TIMED_OUT

76

1.1
Location : lambda$start$3
Killed by : none
removed call to com/pivovarit/collectors/Dispatcher::retry → TIMED_OUT

77

1.1
Location : lambda$start$2
Killed by : none
removed call to java/util/concurrent/Executor::execute → TIMED_OUT

79

1.1
Location : lambda$start$1
Killed by : none
removed call to java/lang/Runnable::run → TIMED_OUT

81

1.1
Location : lambda$start$1
Killed by : none
negated conditional → TIMED_OUT

82

1.1
Location : lambda$start$1
Killed by : none
removed call to java/util/concurrent/Semaphore::release → TIMED_OUT

92

1.1
Location : lambda$start$3
Killed by : none
removed call to com/pivovarit/collectors/Dispatcher::handle → TIMED_OUT

100

1.1
Location : stop
Killed by : none
removed call to java/util/concurrent/BlockingQueue::put → SURVIVED

107

1.1
Location : isRunning
Killed by : none
replaced boolean return with true for com/pivovarit/collectors/Dispatcher::isRunning → TIMED_OUT

2.2
Location : isRunning
Killed by : none
replaced boolean return with false for com/pivovarit/collectors/Dispatcher::isRunning → SURVIVED

114

1.1
Location : enqueue
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_collectors()]/[dynamic-test:#17]
replaced return value with null for com/pivovarit/collectors/Dispatcher::enqueue → KILLED

120

1.1
Location : lambda$completionTask$4
Killed by : none
negated conditional → TIMED_OUT

124

1.1
Location : lambda$completionTask$4
Killed by : none
removed call to com/pivovarit/collectors/Dispatcher::handle → TIMED_OUT

127

1.1
Location : completionTask
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:collectors()]/[dynamic-test:#56]
removed call to com/pivovarit/collectors/Dispatcher$InterruptibleCompletableFuture::completedBy → KILLED

128

1.1
Location : completionTask
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_collectors()]/[dynamic-test:#17]
replaced return value with null for com/pivovarit/collectors/Dispatcher::completionTask → KILLED

137

1.1
Location : shortcircuit
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_collectors()]/[dynamic-test:#17]
replaced return value with null for com/pivovarit/collectors/Dispatcher::shortcircuit → KILLED

154

1.1
Location : cancel
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:collectors()]/[dynamic-test:#56]
negated conditional → KILLED

157

1.1
Location : cancel
Killed by : none
replaced boolean return with false for com/pivovarit/collectors/Dispatcher$InterruptibleCompletableFuture::cancel → SURVIVED

2.2
Location : cancel
Killed by : none
replaced boolean return with true for com/pivovarit/collectors/Dispatcher$InterruptibleCompletableFuture::cancel → SURVIVED

162

1.1
Location : defaultExecutorService
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_collectors()]/[dynamic-test:#17]
replaced return value with null for com/pivovarit/collectors/Dispatcher::defaultExecutorService → KILLED

166

1.1
Location : requireValidExecutor
Killed by : com.pivovarit.collectors.functional.ExecutorValidationTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.functional.ExecutorValidationTest]/[test-factory:shouldStartProcessingElementsTests()]/[dynamic-test:#4]
negated conditional → KILLED

181

1.1
Location : retry
Killed by : none
removed call to java/lang/Runnable::run → TIMED_OUT

183

1.1
Location : retry
Killed by : none
removed call to java/lang/Thread::onSpinWait → SURVIVED

184

1.1
Location : retry
Killed by : none
removed call to java/lang/Runnable::run → TIMED_OUT

Active mutators

Tests examined


Report generated by PIT 1.16.1