Dispatcher.java

1
package com.pivovarit.collectors;
2
3
import java.util.concurrent.BlockingQueue;
4
import java.util.concurrent.CompletableFuture;
5
import java.util.concurrent.Executor;
6
import java.util.concurrent.ExecutorService;
7
import java.util.concurrent.Executors;
8
import java.util.concurrent.FutureTask;
9
import java.util.concurrent.LinkedBlockingQueue;
10
import java.util.concurrent.Semaphore;
11
import java.util.concurrent.SynchronousQueue;
12
import java.util.concurrent.ThreadPoolExecutor;
13
import java.util.concurrent.TimeUnit;
14
import java.util.concurrent.atomic.AtomicBoolean;
15
import java.util.function.Function;
16
import java.util.function.Supplier;
17
18
import static java.lang.Runtime.getRuntime;
19
20
/**
21
 * @author Grzegorz Piwowarek
22
 */
23
final class Dispatcher<T> {
24
25 1 1. lambda$static$0 : removed call to java/io/PrintStream::println → NO_COVERAGE
    private static final Runnable POISON_PILL = () -> System.out.println("Why so serious?");
26
27
    private final CompletableFuture<Void> completionSignaller = new CompletableFuture<>();
28
29
    private final BlockingQueue<Runnable> workingQueue = new LinkedBlockingQueue<>();
30
31
    private final ExecutorService dispatcher = newLazySingleThreadExecutor();
32
    private final Executor executor;
33
    private final Semaphore limiter;
34
35
    private final AtomicBoolean started = new AtomicBoolean(false);
36
37
    private volatile boolean shortCircuited = false;
38
39
    private Dispatcher(Executor executor, int permits) {
40
        this.executor = executor;
41
        this.limiter = new Semaphore(permits);
42
    }
43
44
    static <T> Dispatcher<T> of(Executor executor, int permits) {
45 1 1. of : replaced return value with null for com/pivovarit/collectors/Dispatcher::of → KILLED
        return new Dispatcher<>(executor, permits);
46
    }
47
48
    void start() {
49 1 1. start : negated conditional → KILLED
        if (!started.getAndSet(true)) {
50 1 1. start : removed call to java/util/concurrent/ExecutorService::execute → TIMED_OUT
            dispatcher.execute(() -> {
51
                try {
52
                    while (true) {
53
                        Runnable task;
54 1 1. lambda$start$1 : negated conditional → TIMED_OUT
                        if ((task = workingQueue.take()) != POISON_PILL) {
55 1 1. lambda$start$1 : removed call to java/util/concurrent/Semaphore::acquire → KILLED
                            limiter.acquire();
56 1 1. lambda$start$1 : removed call to java/util/concurrent/Executor::execute → TIMED_OUT
                            executor.execute(withFinally(task, limiter::release));
57
                        } else {
58
                            break;
59
                        }
60
                    }
61
                } catch (Throwable e) {
62 1 1. lambda$start$1 : removed call to com/pivovarit/collectors/Dispatcher::handle → TIMED_OUT
                    handle(e);
63
                }
64
            });
65
        }
66
    }
67
68
    void stop() {
69
        try {
70 1 1. stop : removed call to java/util/concurrent/BlockingQueue::put → KILLED
            workingQueue.put(POISON_PILL);
71
        } catch (InterruptedException e) {
72
            completionSignaller.completeExceptionally(e);
73
        } finally {
74 1 1. stop : removed call to java/util/concurrent/ExecutorService::shutdown → KILLED
            dispatcher.shutdown();
75
        }
76
    }
77
78
    boolean isRunning() {
79 2 1. isRunning : replaced boolean return with false for com/pivovarit/collectors/Dispatcher::isRunning → SURVIVED
2. isRunning : replaced boolean return with true for com/pivovarit/collectors/Dispatcher::isRunning → TIMED_OUT
        return started.get();
80
    }
81
82
    CompletableFuture<T> enqueue(Supplier<T> supplier) {
83
        InterruptibleCompletableFuture<T> future = new InterruptibleCompletableFuture<>();
84
        workingQueue.add(completionTask(supplier, future));
85
        completionSignaller.exceptionally(shortcircuit(future));
86 1 1. enqueue : replaced return value with null for com/pivovarit/collectors/Dispatcher::enqueue → KILLED
        return future;
87
    }
88
89
    private FutureTask<Void> completionTask(Supplier<T> supplier, InterruptibleCompletableFuture<T> future) {
90
        FutureTask<Void> task = new FutureTask<>(() -> {
91
            try {
92 1 1. lambda$completionTask$2 : negated conditional → TIMED_OUT
                if (!shortCircuited) {
93
                    future.complete(supplier.get());
94
                }
95
            } catch (Throwable e) {
96 1 1. lambda$completionTask$2 : removed call to com/pivovarit/collectors/Dispatcher::handle → TIMED_OUT
                handle(e);
97
            }
98
        }, null);
99 1 1. completionTask : removed call to com/pivovarit/collectors/Dispatcher$InterruptibleCompletableFuture::access$000 → KILLED
        future.completedBy(task);
100 1 1. completionTask : replaced return value with null for com/pivovarit/collectors/Dispatcher::completionTask → KILLED
        return task;
101
    }
102
103
    private void handle(Throwable e) {
104
        shortCircuited = true;
105
        completionSignaller.completeExceptionally(e);
106
        dispatcher.shutdownNow();
107
    }
108
109
    private static Function<Throwable, Void> shortcircuit(InterruptibleCompletableFuture<?> future) {
110 1 1. shortcircuit : replaced return value with null for com/pivovarit/collectors/Dispatcher::shortcircuit → KILLED
        return throwable -> {
111
            future.completeExceptionally(throwable);
112
            future.cancel(true);
113
            return null;
114
        };
115
    }
116
117
    private static Runnable withFinally(Runnable task, Runnable finisher) {
118 1 1. withFinally : replaced return value with null for com/pivovarit/collectors/Dispatcher::withFinally → KILLED
        return () -> {
119
            try {
120 1 1. lambda$withFinally$4 : removed call to java/lang/Runnable::run → TIMED_OUT
                task.run();
121
            } finally {
122 1 1. lambda$withFinally$4 : removed call to java/lang/Runnable::run → TIMED_OUT
                finisher.run();
123
            }
124
        };
125
    }
126
127
    static int getDefaultParallelism() {
128 2 1. getDefaultParallelism : Replaced integer subtraction with addition → NO_COVERAGE
2. getDefaultParallelism : replaced int return with 0 for com/pivovarit/collectors/Dispatcher::getDefaultParallelism → NO_COVERAGE
        return Math.max(getRuntime().availableProcessors() - 1, 4);
129
    }
130
131
    private static ThreadPoolExecutor newLazySingleThreadExecutor() {
132 1 1. newLazySingleThreadExecutor : replaced return value with null for com/pivovarit/collectors/Dispatcher::newLazySingleThreadExecutor → KILLED
        return new ThreadPoolExecutor(0, 1,
133
          0L, TimeUnit.MILLISECONDS,
134
          new SynchronousQueue<>(),
135
          task -> {
136
              Thread thread = Executors.defaultThreadFactory().newThread(task);
137 1 1. lambda$newLazySingleThreadExecutor$5 : removed call to java/lang/Thread::setName → SURVIVED
              thread.setName("parallel-collector-" + thread.getName());
138 1 1. lambda$newLazySingleThreadExecutor$5 : removed call to java/lang/Thread::setDaemon → SURVIVED
              thread.setDaemon(false);
139 1 1. lambda$newLazySingleThreadExecutor$5 : replaced return value with null for com/pivovarit/collectors/Dispatcher::lambda$newLazySingleThreadExecutor$5 → KILLED
              return thread;
140
          });
141
    }
142
143
    static final class InterruptibleCompletableFuture<T> extends CompletableFuture<T> {
144
        private volatile FutureTask<?> backingTask;
145
146
        private void completedBy(FutureTask<Void> task) {
147
            backingTask = task;
148
        }
149
150
        @Override
151
        public boolean cancel(boolean mayInterruptIfRunning) {
152 1 1. cancel : negated conditional → KILLED
            if (backingTask != null) {
153
                backingTask.cancel(mayInterruptIfRunning);
154
            }
155 2 1. cancel : replaced boolean return with false for com/pivovarit/collectors/Dispatcher$InterruptibleCompletableFuture::cancel → SURVIVED
2. cancel : replaced boolean return with true for com/pivovarit/collectors/Dispatcher$InterruptibleCompletableFuture::cancel → SURVIVED
            return super.cancel(mayInterruptIfRunning);
156
        }
157
    }
158
}

Mutations

25

1.1
Location : lambda$static$0
Killed by : none
removed call to java/io/PrintStream::println → NO_COVERAGE

45

1.1
Location : of
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_collectors()]/[dynamic-test:#18]
replaced return value with null for com/pivovarit/collectors/Dispatcher::of → KILLED

49

1.1
Location : start
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_collectors()]/[dynamic-test:#14]
negated conditional → KILLED

50

1.1
Location : start
Killed by : none
removed call to java/util/concurrent/ExecutorService::execute → TIMED_OUT

54

1.1
Location : lambda$start$1
Killed by : none
negated conditional → TIMED_OUT

55

1.1
Location : lambda$start$1
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:collectors()]/[dynamic-test:#40]
removed call to java/util/concurrent/Semaphore::acquire → KILLED

56

1.1
Location : lambda$start$1
Killed by : none
removed call to java/util/concurrent/Executor::execute → TIMED_OUT

62

1.1
Location : lambda$start$1
Killed by : none
removed call to com/pivovarit/collectors/Dispatcher::handle → TIMED_OUT

70

1.1
Location : stop
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:collectors()]/[dynamic-test:#38]
removed call to java/util/concurrent/BlockingQueue::put → KILLED

74

1.1
Location : stop
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:collectors()]/[dynamic-test:#38]
removed call to java/util/concurrent/ExecutorService::shutdown → KILLED

79

1.1
Location : isRunning
Killed by : none
replaced boolean return with false for com/pivovarit/collectors/Dispatcher::isRunning → SURVIVED

2.2
Location : isRunning
Killed by : none
replaced boolean return with true for com/pivovarit/collectors/Dispatcher::isRunning → TIMED_OUT

86

1.1
Location : enqueue
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_collectors()]/[dynamic-test:#14]
replaced return value with null for com/pivovarit/collectors/Dispatcher::enqueue → KILLED

92

1.1
Location : lambda$completionTask$2
Killed by : none
negated conditional → TIMED_OUT

96

1.1
Location : lambda$completionTask$2
Killed by : none
removed call to com/pivovarit/collectors/Dispatcher::handle → TIMED_OUT

99

1.1
Location : completionTask
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:collectors()]/[dynamic-test:#43]
removed call to com/pivovarit/collectors/Dispatcher$InterruptibleCompletableFuture::access$000 → KILLED

100

1.1
Location : completionTask
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_collectors()]/[dynamic-test:#14]
replaced return value with null for com/pivovarit/collectors/Dispatcher::completionTask → KILLED

110

1.1
Location : shortcircuit
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_collectors()]/[dynamic-test:#14]
replaced return value with null for com/pivovarit/collectors/Dispatcher::shortcircuit → KILLED

118

1.1
Location : withFinally
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_collectors()]/[dynamic-test:#14]
replaced return value with null for com/pivovarit/collectors/Dispatcher::withFinally → KILLED

120

1.1
Location : lambda$withFinally$4
Killed by : none
removed call to java/lang/Runnable::run → TIMED_OUT

122

1.1
Location : lambda$withFinally$4
Killed by : none
removed call to java/lang/Runnable::run → TIMED_OUT

128

1.1
Location : getDefaultParallelism
Killed by : none
Replaced integer subtraction with addition → NO_COVERAGE

2.2
Location : getDefaultParallelism
Killed by : none
replaced int return with 0 for com/pivovarit/collectors/Dispatcher::getDefaultParallelism → NO_COVERAGE

132

1.1
Location : newLazySingleThreadExecutor
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_collectors()]/[dynamic-test:#18]
replaced return value with null for com/pivovarit/collectors/Dispatcher::newLazySingleThreadExecutor → KILLED

137

1.1
Location : lambda$newLazySingleThreadExecutor$5
Killed by : none
removed call to java/lang/Thread::setName → SURVIVED

138

1.1
Location : lambda$newLazySingleThreadExecutor$5
Killed by : none
removed call to java/lang/Thread::setDaemon → SURVIVED

139

1.1
Location : lambda$newLazySingleThreadExecutor$5
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_collectors()]/[dynamic-test:#14]
replaced return value with null for com/pivovarit/collectors/Dispatcher::lambda$newLazySingleThreadExecutor$5 → KILLED

152

1.1
Location : cancel
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:collectors()]/[dynamic-test:#43]
negated conditional → KILLED

155

1.1
Location : cancel
Killed by : none
replaced boolean return with false for com/pivovarit/collectors/Dispatcher$InterruptibleCompletableFuture::cancel → SURVIVED

2.2
Location : cancel
Killed by : none
replaced boolean return with true for com/pivovarit/collectors/Dispatcher$InterruptibleCompletableFuture::cancel → SURVIVED

Active mutators

Tests examined


Report generated by PIT 1.9.8