Dispatcher.java

1
package com.pivovarit.collectors;
2
3
import java.util.concurrent.BlockingQueue;
4
import java.util.concurrent.CompletableFuture;
5
import java.util.concurrent.Executor;
6
import java.util.concurrent.FutureTask;
7
import java.util.concurrent.LinkedBlockingQueue;
8
import java.util.concurrent.RejectedExecutionException;
9
import java.util.concurrent.Semaphore;
10
import java.util.concurrent.ThreadFactory;
11
import java.util.concurrent.atomic.AtomicBoolean;
12
import java.util.function.Function;
13
import java.util.function.Supplier;
14
15
import static com.pivovarit.collectors.Preconditions.requireValidExecutor;
16
17
/**
18
 * @author Grzegorz Piwowarek
19
 */
20
final class Dispatcher<T> {
21
22
    private static final Runnable POISON_PILL = () -> System.out.println("Why so serious?");
23
24
    private final CompletableFuture<Void> completionSignaller = new CompletableFuture<>();
25
    private final BlockingQueue<Runnable> workingQueue = new LinkedBlockingQueue<>();
26
27
    private final ThreadFactory dispatcherThreadFactory = Thread::startVirtualThread;
28
29
    private final Executor executor;
30
    private final Semaphore limiter;
31
32
    private final AtomicBoolean started = new AtomicBoolean(false);
33
34
    Dispatcher(Executor executor, int permits) {
35 1 1. <init> : removed call to com/pivovarit/collectors/Preconditions::requireValidExecutor → SURVIVED
        requireValidExecutor(executor);
36
        this.executor = executor;
37
        this.limiter = new Semaphore(permits);
38
    }
39
40
    Dispatcher(Executor executor) {
41 1 1. <init> : removed call to com/pivovarit/collectors/Preconditions::requireValidExecutor → SURVIVED
        requireValidExecutor(executor);
42
        this.executor = executor;
43
        this.limiter = null;
44
    }
45
46
    void start() {
47 1 1. start : negated conditional → TIMED_OUT
        if (!started.getAndSet(true)) {
48
            dispatcherThreadFactory.newThread(() -> {
49
                try {
50
                    while (true) {
51
                        try {
52 1 1. lambda$start$0 : negated conditional → KILLED
                            if (limiter != null) {
53 1 1. lambda$start$0 : removed call to java/util/concurrent/Semaphore::acquire → KILLED
                                limiter.acquire();
54
                            }
55
                        } catch (InterruptedException e) {
56 1 1. lambda$start$0 : removed call to com/pivovarit/collectors/Dispatcher::handleException → NO_COVERAGE
                            handleException(e);
57
                        }
58
                        Runnable task;
59 1 1. lambda$start$0 : negated conditional → TIMED_OUT
                        if ((task = workingQueue.take()) != POISON_PILL) {
60 2 1. lambda$start$1 : removed call to java/util/concurrent/Executor::execute → TIMED_OUT
2. lambda$start$0 : removed call to com/pivovarit/collectors/Dispatcher::retry → TIMED_OUT
                            retry(() -> executor.execute(() -> {
61
                                try {
62 1 1. lambda$start$2 : removed call to java/lang/Runnable::run → TIMED_OUT
                                    task.run();
63
                                } finally {
64 1 1. lambda$start$2 : negated conditional → TIMED_OUT
                                    if (limiter != null) {
65 1 1. lambda$start$2 : removed call to java/util/concurrent/Semaphore::release → TIMED_OUT
                                        limiter.release();
66
                                    }
67
                                }
68
                            }));
69
                        } else {
70
                            break;
71
                        }
72
                    }
73
                } catch (Throwable e) {
74 1 1. lambda$start$0 : removed call to com/pivovarit/collectors/Dispatcher::handleException → KILLED
                    handleException(e);
75
                }
76
            });
77
        }
78
    }
79
80
    void stop() {
81
        try {
82 1 1. stop : removed call to java/util/concurrent/BlockingQueue::put → SURVIVED
            workingQueue.put(POISON_PILL);
83
        } catch (InterruptedException e) {
84
            completionSignaller.completeExceptionally(e);
85
        }
86
    }
87
88
    boolean isRunning() {
89 2 1. isRunning : replaced boolean return with false for com/pivovarit/collectors/Dispatcher::isRunning → SURVIVED
2. isRunning : replaced boolean return with true for com/pivovarit/collectors/Dispatcher::isRunning → TIMED_OUT
        return started.get();
90
    }
91
92
    CompletableFuture<T> enqueue(Supplier<T> supplier) {
93
        InterruptibleCompletableFuture<T> future = new InterruptibleCompletableFuture<>();
94
        workingQueue.add(completionTask(supplier, future));
95
        completionSignaller.exceptionally(shortcircuit(future));
96 1 1. enqueue : replaced return value with null for com/pivovarit/collectors/Dispatcher::enqueue → KILLED
        return future;
97
    }
98
99
    private FutureTask<Void> completionTask(Supplier<T> supplier, InterruptibleCompletableFuture<T> future) {
100
        FutureTask<Void> task = new FutureTask<>(() -> {
101
            try {
102
                future.complete(supplier.get());
103
            } catch (Throwable e) {
104 1 1. lambda$completionTask$0 : removed call to com/pivovarit/collectors/Dispatcher::handleException → TIMED_OUT
                handleException(e);
105
            }
106
        }, null);
107 1 1. completionTask : removed call to com/pivovarit/collectors/Dispatcher$InterruptibleCompletableFuture::completedBy → KILLED
        future.completedBy(task);
108 1 1. completionTask : replaced return value with null for com/pivovarit/collectors/Dispatcher::completionTask → KILLED
        return task;
109
    }
110
111
    private void handleException(Throwable e) {
112
        completionSignaller.completeExceptionally(e);
113
114
        for (Runnable runnable : workingQueue) {
115 1 1. handleException : negated conditional → SURVIVED
            if (runnable instanceof FutureTask<?> task) {
116
                task.cancel(true);
117
            }
118
        }
119
    }
120
121
    private static Function<Throwable, Void> shortcircuit(InterruptibleCompletableFuture<?> future) {
122 1 1. shortcircuit : replaced return value with null for com/pivovarit/collectors/Dispatcher::shortcircuit → KILLED
        return throwable -> {
123
            future.completeExceptionally(throwable);
124
            future.cancel(true);
125
            return null;
126
        };
127
    }
128
129
    static final class InterruptibleCompletableFuture<T> extends CompletableFuture<T> {
130
131
        private volatile FutureTask<?> backingTask;
132
133
        private void completedBy(FutureTask<Void> task) {
134
            backingTask = task;
135
        }
136
137
        @Override
138
        public boolean cancel(boolean mayInterruptIfRunning) {
139 1 1. cancel : negated conditional → KILLED
            if (backingTask != null) {
140
                backingTask.cancel(mayInterruptIfRunning);
141
            }
142 2 1. cancel : replaced boolean return with false for com/pivovarit/collectors/Dispatcher$InterruptibleCompletableFuture::cancel → SURVIVED
2. cancel : replaced boolean return with true for com/pivovarit/collectors/Dispatcher$InterruptibleCompletableFuture::cancel → SURVIVED
            return super.cancel(mayInterruptIfRunning);
143
        }
144
    }
145
146
    private static void retry(Runnable runnable) {
147
        try {
148 1 1. retry : removed call to java/lang/Runnable::run → TIMED_OUT
            runnable.run();
149
        } catch (RejectedExecutionException e) {
150 1 1. retry : removed call to java/lang/Thread::onSpinWait → SURVIVED
            Thread.onSpinWait();
151 1 1. retry : removed call to java/lang/Runnable::run → KILLED
            runnable.run();
152
        }
153
    }
154
}

Mutations

35

1.1
Location : <init>
Killed by : none
removed call to com/pivovarit/collectors/Preconditions::requireValidExecutor → SURVIVED
Covering tests

41

1.1
Location : <init>
Killed by : none
removed call to com/pivovarit/collectors/Preconditions::requireValidExecutor → SURVIVED
Covering tests

47

1.1
Location : start
Killed by : none
negated conditional → TIMED_OUT

52

1.1
Location : lambda$start$0
Killed by : com.pivovarit.collectors.test.BasicProcessingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicProcessingTest]/[test-factory:shouldProcessAllElementsInOrder()]/[dynamic-test:#10]
negated conditional → KILLED

53

1.1
Location : lambda$start$0
Killed by : com.pivovarit.collectors.test.ExecutorPollutionTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.ExecutorPollutionTest]/[test-factory:shouldNotPolluteExecutorFactoryLimitedParallelism()]/[dynamic-test:#2]
removed call to java/util/concurrent/Semaphore::acquire → KILLED

56

1.1
Location : lambda$start$0
Killed by : none
removed call to com/pivovarit/collectors/Dispatcher::handleException → NO_COVERAGE

59

1.1
Location : lambda$start$0
Killed by : none
negated conditional → TIMED_OUT

60

1.1
Location : lambda$start$1
Killed by : none
removed call to java/util/concurrent/Executor::execute → TIMED_OUT

2.2
Location : lambda$start$0
Killed by : none
removed call to com/pivovarit/collectors/Dispatcher::retry → TIMED_OUT

62

1.1
Location : lambda$start$2
Killed by : none
removed call to java/lang/Runnable::run → TIMED_OUT

64

1.1
Location : lambda$start$2
Killed by : none
negated conditional → TIMED_OUT

65

1.1
Location : lambda$start$2
Killed by : none
removed call to java/util/concurrent/Semaphore::release → TIMED_OUT

74

1.1
Location : lambda$start$0
Killed by : com.pivovarit.collectors.test.RejectedExecutionHandlingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.RejectedExecutionHandlingTest]/[test-factory:shouldRejectInvalidRejectedExecutionHandlerFactory()]/[dynamic-test:#4]
removed call to com/pivovarit/collectors/Dispatcher::handleException → KILLED

82

1.1
Location : stop
Killed by : none
removed call to java/util/concurrent/BlockingQueue::put → SURVIVED
Covering tests

89

1.1
Location : isRunning
Killed by : none
replaced boolean return with true for com/pivovarit/collectors/Dispatcher::isRunning → TIMED_OUT

2.2
Location : isRunning
Killed by : none
replaced boolean return with false for com/pivovarit/collectors/Dispatcher::isRunning → SURVIVED
Covering tests

96

1.1
Location : enqueue
Killed by : com.pivovarit.collectors.test.NonBlockingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.NonBlockingTest]/[test-factory:shouldNotBlockTheCallingThread()]/[dynamic-test:#2]
replaced return value with null for com/pivovarit/collectors/Dispatcher::enqueue → KILLED

104

1.1
Location : lambda$completionTask$0
Killed by : none
removed call to com/pivovarit/collectors/Dispatcher::handleException → TIMED_OUT

107

1.1
Location : completionTask
Killed by : com.pivovarit.collectors.test.ExceptionHandlingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.ExceptionHandlingTest]/[test-factory:shouldShortcircuitOnException()]/[dynamic-test:#15]
removed call to com/pivovarit/collectors/Dispatcher$InterruptibleCompletableFuture::completedBy → KILLED

108

1.1
Location : completionTask
Killed by : com.pivovarit.collectors.test.NonBlockingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.NonBlockingTest]/[test-factory:shouldNotBlockTheCallingThread()]/[dynamic-test:#2]
replaced return value with null for com/pivovarit/collectors/Dispatcher::completionTask → KILLED

115

1.1
Location : handleException
Killed by : none
negated conditional → SURVIVED
Covering tests

122

1.1
Location : shortcircuit
Killed by : com.pivovarit.collectors.test.NonBlockingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.NonBlockingTest]/[test-factory:shouldNotBlockTheCallingThread()]/[dynamic-test:#2]
replaced return value with null for com/pivovarit/collectors/Dispatcher::shortcircuit → KILLED

139

1.1
Location : cancel
Killed by : com.pivovarit.collectors.test.BasicProcessingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicProcessingTest]/[test-factory:shouldInterruptOnException()]/[dynamic-test:#5]
negated conditional → KILLED

142

1.1
Location : cancel
Killed by : none
replaced boolean return with false for com/pivovarit/collectors/Dispatcher$InterruptibleCompletableFuture::cancel → SURVIVED
Covering tests

2.2
Location : cancel
Killed by : none
replaced boolean return with true for com/pivovarit/collectors/Dispatcher$InterruptibleCompletableFuture::cancel → SURVIVED Covering tests

148

1.1
Location : retry
Killed by : none
removed call to java/lang/Runnable::run → TIMED_OUT

150

1.1
Location : retry
Killed by : none
removed call to java/lang/Thread::onSpinWait → SURVIVED
Covering tests

151

1.1
Location : retry
Killed by : com.pivovarit.collectors.test.RejectedExecutionHandlingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.RejectedExecutionHandlingTest]/[test-factory:shouldRejectInvalidRejectedExecutionHandlerFactory()]/[dynamic-test:#4]
removed call to java/lang/Runnable::run → KILLED

Active mutators

Tests examined


Report generated by PIT 1.21.0