Dispatcher.java

1
package com.pivovarit.collectors;
2
3
import java.util.concurrent.BlockingQueue;
4
import java.util.concurrent.CompletableFuture;
5
import java.util.concurrent.Executor;
6
import java.util.concurrent.FutureTask;
7
import java.util.concurrent.LinkedBlockingQueue;
8
import java.util.concurrent.RejectedExecutionException;
9
import java.util.concurrent.Semaphore;
10
import java.util.concurrent.ThreadFactory;
11
import java.util.concurrent.atomic.AtomicBoolean;
12
import java.util.function.Function;
13
import java.util.function.Supplier;
14
15
import static com.pivovarit.collectors.Preconditions.requireValidExecutor;
16
17
/**
18
 * @author Grzegorz Piwowarek
19
 */
20
final class Dispatcher<T> {
21
22
    private static final Runnable POISON_PILL = () -> System.out.println("Why so serious?");
23
24
    private final CompletableFuture<Void> completionSignaller = new CompletableFuture<>();
25
    private final BlockingQueue<Runnable> workingQueue = new LinkedBlockingQueue<>();
26
27
    private final ThreadFactory dispatcherThreadFactory = Thread::startVirtualThread;
28
29
    private final Executor executor;
30
    private final Semaphore limiter;
31
32
    private final AtomicBoolean started = new AtomicBoolean(false);
33
34
    private volatile boolean shortCircuited = false;
35
36
    Dispatcher(Executor executor, int permits) {
37 1 1. <init> : removed call to com/pivovarit/collectors/Preconditions::requireValidExecutor → SURVIVED
        requireValidExecutor(executor);
38
        this.executor = executor;
39
        this.limiter = new Semaphore(permits);
40
    }
41
42
    Dispatcher(Executor executor) {
43 1 1. <init> : removed call to com/pivovarit/collectors/Preconditions::requireValidExecutor → SURVIVED
        requireValidExecutor(executor);
44
        this.executor = executor;
45
        this.limiter = null;
46
    }
47
48
    void start() {
49 1 1. start : negated conditional → TIMED_OUT
        if (!started.getAndSet(true)) {
50
            dispatcherThreadFactory.newThread(() -> {
51
                try {
52
                    while (true) {
53
                        try {
54 1 1. lambda$start$0 : negated conditional → KILLED
                            if (limiter != null) {
55 1 1. lambda$start$0 : removed call to java/util/concurrent/Semaphore::acquire → KILLED
                                limiter.acquire();
56
                            }
57
                        } catch (InterruptedException e) {
58 1 1. lambda$start$0 : removed call to com/pivovarit/collectors/Dispatcher::handle → NO_COVERAGE
                            handle(e);
59
                        }
60
                        Runnable task;
61 1 1. lambda$start$0 : negated conditional → TIMED_OUT
                        if ((task = workingQueue.take()) != POISON_PILL) {
62 2 1. lambda$start$1 : removed call to java/util/concurrent/Executor::execute → TIMED_OUT
2. lambda$start$0 : removed call to com/pivovarit/collectors/Dispatcher::retry → TIMED_OUT
                            retry(() -> executor.execute(() -> {
63
                                try {
64 1 1. lambda$start$2 : removed call to java/lang/Runnable::run → TIMED_OUT
                                    task.run();
65
                                } finally {
66 1 1. lambda$start$2 : negated conditional → TIMED_OUT
                                    if (limiter != null) {
67 1 1. lambda$start$2 : removed call to java/util/concurrent/Semaphore::release → TIMED_OUT
                                        limiter.release();
68
                                    }
69
                                }
70
                            }));
71
                        } else {
72
                            break;
73
                        }
74
                    }
75
                } catch (Throwable e) {
76 1 1. lambda$start$0 : removed call to com/pivovarit/collectors/Dispatcher::handle → KILLED
                    handle(e);
77
                }
78
            });
79
        }
80
    }
81
82
    void stop() {
83
        try {
84 1 1. stop : removed call to java/util/concurrent/BlockingQueue::put → SURVIVED
            workingQueue.put(POISON_PILL);
85
        } catch (InterruptedException e) {
86
            completionSignaller.completeExceptionally(e);
87
        }
88
    }
89
90
    boolean isRunning() {
91 2 1. isRunning : replaced boolean return with false for com/pivovarit/collectors/Dispatcher::isRunning → SURVIVED
2. isRunning : replaced boolean return with true for com/pivovarit/collectors/Dispatcher::isRunning → TIMED_OUT
        return started.get();
92
    }
93
94
    CompletableFuture<T> enqueue(Supplier<T> supplier) {
95
        InterruptibleCompletableFuture<T> future = new InterruptibleCompletableFuture<>();
96
        workingQueue.add(completionTask(supplier, future));
97
        completionSignaller.exceptionally(shortcircuit(future));
98 1 1. enqueue : replaced return value with null for com/pivovarit/collectors/Dispatcher::enqueue → KILLED
        return future;
99
    }
100
101
    private FutureTask<Void> completionTask(Supplier<T> supplier, InterruptibleCompletableFuture<T> future) {
102
        FutureTask<Void> task = new FutureTask<>(() -> {
103
            try {
104 1 1. lambda$completionTask$0 : negated conditional → TIMED_OUT
                if (!shortCircuited) {
105
                    future.complete(supplier.get());
106
                }
107
            } catch (Throwable e) {
108 1 1. lambda$completionTask$0 : removed call to com/pivovarit/collectors/Dispatcher::handle → TIMED_OUT
                handle(e);
109
            }
110
        }, null);
111 1 1. completionTask : removed call to com/pivovarit/collectors/Dispatcher$InterruptibleCompletableFuture::completedBy → KILLED
        future.completedBy(task);
112 1 1. completionTask : replaced return value with null for com/pivovarit/collectors/Dispatcher::completionTask → KILLED
        return task;
113
    }
114
115
    private void handle(Throwable e) {
116
        shortCircuited = true;
117
        completionSignaller.completeExceptionally(e);
118
    }
119
120
    private static Function<Throwable, Void> shortcircuit(InterruptibleCompletableFuture<?> future) {
121 1 1. shortcircuit : replaced return value with null for com/pivovarit/collectors/Dispatcher::shortcircuit → KILLED
        return throwable -> {
122
            future.completeExceptionally(throwable);
123
            future.cancel(true);
124
            return null;
125
        };
126
    }
127
128
    static final class InterruptibleCompletableFuture<T> extends CompletableFuture<T> {
129
130
        private volatile FutureTask<?> backingTask;
131
132
        private void completedBy(FutureTask<Void> task) {
133
            backingTask = task;
134
        }
135
136
        @Override
137
        public boolean cancel(boolean mayInterruptIfRunning) {
138 1 1. cancel : negated conditional → KILLED
            if (backingTask != null) {
139
                backingTask.cancel(mayInterruptIfRunning);
140
            }
141 2 1. cancel : replaced boolean return with false for com/pivovarit/collectors/Dispatcher$InterruptibleCompletableFuture::cancel → SURVIVED
2. cancel : replaced boolean return with true for com/pivovarit/collectors/Dispatcher$InterruptibleCompletableFuture::cancel → SURVIVED
            return super.cancel(mayInterruptIfRunning);
142
        }
143
    }
144
145
    private static void retry(Runnable runnable) {
146
        try {
147 1 1. retry : removed call to java/lang/Runnable::run → TIMED_OUT
            runnable.run();
148
        } catch (RejectedExecutionException e) {
149 1 1. retry : removed call to java/lang/Thread::onSpinWait → SURVIVED
            Thread.onSpinWait();
150 1 1. retry : removed call to java/lang/Runnable::run → KILLED
            runnable.run();
151
        }
152
    }
153
}

Mutations

37

1.1
Location : <init>
Killed by : none
removed call to com/pivovarit/collectors/Preconditions::requireValidExecutor → SURVIVED
Covering tests

43

1.1
Location : <init>
Killed by : none
removed call to com/pivovarit/collectors/Preconditions::requireValidExecutor → SURVIVED
Covering tests

49

1.1
Location : start
Killed by : none
negated conditional → TIMED_OUT

54

1.1
Location : lambda$start$0
Killed by : com.pivovarit.collectors.test.BasicProcessingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicProcessingTest]/[test-factory:shouldProcessAllElementsInOrder()]/[dynamic-test:#10]
negated conditional → KILLED

55

1.1
Location : lambda$start$0
Killed by : com.pivovarit.collectors.test.ExecutorPollutionTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.ExecutorPollutionTest]/[test-factory:shouldNotPolluteExecutorFactoryLimitedParallelism()]/[dynamic-test:#2]
removed call to java/util/concurrent/Semaphore::acquire → KILLED

58

1.1
Location : lambda$start$0
Killed by : none
removed call to com/pivovarit/collectors/Dispatcher::handle → NO_COVERAGE

61

1.1
Location : lambda$start$0
Killed by : none
negated conditional → TIMED_OUT

62

1.1
Location : lambda$start$1
Killed by : none
removed call to java/util/concurrent/Executor::execute → TIMED_OUT

2.2
Location : lambda$start$0
Killed by : none
removed call to com/pivovarit/collectors/Dispatcher::retry → TIMED_OUT

64

1.1
Location : lambda$start$2
Killed by : none
removed call to java/lang/Runnable::run → TIMED_OUT

66

1.1
Location : lambda$start$2
Killed by : none
negated conditional → TIMED_OUT

67

1.1
Location : lambda$start$2
Killed by : none
removed call to java/util/concurrent/Semaphore::release → TIMED_OUT

76

1.1
Location : lambda$start$0
Killed by : com.pivovarit.collectors.test.RejectedExecutionHandlingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.RejectedExecutionHandlingTest]/[test-factory:shouldRejectInvalidRejectedExecutionHandlerFactory()]/[dynamic-test:#4]
removed call to com/pivovarit/collectors/Dispatcher::handle → KILLED

84

1.1
Location : stop
Killed by : none
removed call to java/util/concurrent/BlockingQueue::put → SURVIVED
Covering tests

91

1.1
Location : isRunning
Killed by : none
replaced boolean return with true for com/pivovarit/collectors/Dispatcher::isRunning → TIMED_OUT

2.2
Location : isRunning
Killed by : none
replaced boolean return with false for com/pivovarit/collectors/Dispatcher::isRunning → SURVIVED
Covering tests

98

1.1
Location : enqueue
Killed by : com.pivovarit.collectors.test.BasicProcessingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicProcessingTest]/[test-factory:shouldProcessAllElementsInOrder()]/[dynamic-test:#10]
replaced return value with null for com/pivovarit/collectors/Dispatcher::enqueue → KILLED

104

1.1
Location : lambda$completionTask$0
Killed by : none
negated conditional → TIMED_OUT

108

1.1
Location : lambda$completionTask$0
Killed by : none
removed call to com/pivovarit/collectors/Dispatcher::handle → TIMED_OUT

111

1.1
Location : completionTask
Killed by : com.pivovarit.collectors.test.BasicProcessingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicProcessingTest]/[test-factory:shouldInterruptOnException()]/[dynamic-test:#15]
removed call to com/pivovarit/collectors/Dispatcher$InterruptibleCompletableFuture::completedBy → KILLED

112

1.1
Location : completionTask
Killed by : com.pivovarit.collectors.test.BasicProcessingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicProcessingTest]/[test-factory:shouldProcessAllElementsInOrder()]/[dynamic-test:#10]
replaced return value with null for com/pivovarit/collectors/Dispatcher::completionTask → KILLED

121

1.1
Location : shortcircuit
Killed by : com.pivovarit.collectors.test.BasicProcessingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicProcessingTest]/[test-factory:shouldProcessAllElementsInOrder()]/[dynamic-test:#10]
replaced return value with null for com/pivovarit/collectors/Dispatcher::shortcircuit → KILLED

138

1.1
Location : cancel
Killed by : com.pivovarit.collectors.test.BasicProcessingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicProcessingTest]/[test-factory:shouldInterruptOnException()]/[dynamic-test:#15]
negated conditional → KILLED

141

1.1
Location : cancel
Killed by : none
replaced boolean return with false for com/pivovarit/collectors/Dispatcher$InterruptibleCompletableFuture::cancel → SURVIVED
Covering tests

2.2
Location : cancel
Killed by : none
replaced boolean return with true for com/pivovarit/collectors/Dispatcher$InterruptibleCompletableFuture::cancel → SURVIVED Covering tests

147

1.1
Location : retry
Killed by : none
removed call to java/lang/Runnable::run → TIMED_OUT

149

1.1
Location : retry
Killed by : none
removed call to java/lang/Thread::onSpinWait → SURVIVED
Covering tests

150

1.1
Location : retry
Killed by : com.pivovarit.collectors.test.RejectedExecutionHandlingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.RejectedExecutionHandlingTest]/[test-factory:shouldRejectInvalidRejectedExecutionHandlerFactory()]/[dynamic-test:#4]
removed call to java/lang/Runnable::run → KILLED

Active mutators

Tests examined


Report generated by PIT 1.19.4