Dispatcher.java

1
package com.pivovarit.collectors;
2
3
import java.util.concurrent.CompletableFuture;
4
import java.util.concurrent.Executor;
5
import java.util.concurrent.Executors;
6
import java.util.concurrent.FutureTask;
7
import java.util.concurrent.Semaphore;
8
import java.util.function.BiConsumer;
9
import java.util.function.Supplier;
10
11
/**
12
 * @author Grzegorz Piwowarek
13
 */
14
final class Dispatcher<T> {
15
16
    private final CompletableFuture<Void> completionSignaller = new CompletableFuture<>();
17
    private final Executor executor;
18
    private final Semaphore limiter;
19
20
    private Dispatcher() {
21
        this.executor = Executors.newVirtualThreadPerTaskExecutor();
22
        this.limiter = null;
23
    }
24
25
    private Dispatcher(Executor executor, int permits) {
26
        this.executor = executor;
27
        this.limiter = new Semaphore(permits);
28
    }
29
30
    static <T> Dispatcher<T> from(Executor executor, int permits) {
31 1 1. from : replaced return value with null for com/pivovarit/collectors/Dispatcher::from → KILLED
        return new Dispatcher<>(executor, permits);
32
    }
33
34
    static <T> Dispatcher<T> virtual() {
35 1 1. virtual : replaced return value with null for com/pivovarit/collectors/Dispatcher::virtual → KILLED
        return new Dispatcher<>();
36
    }
37
38
    CompletableFuture<T> enqueue(Supplier<T> supplier) {
39
        InterruptibleCompletableFuture<T> future = new InterruptibleCompletableFuture<>();
40
        completionSignaller.whenComplete(shortcircuit(future));
41
        try {
42 1 1. enqueue : removed call to java/util/concurrent/Executor::execute → TIMED_OUT
            executor.execute(completionTask(supplier, future));
43
        } catch (Throwable e) {
44
            completionSignaller.completeExceptionally(e);
45 1 1. enqueue : replaced return value with null for com/pivovarit/collectors/Dispatcher::enqueue → KILLED
            return CompletableFuture.failedFuture(e);
46
        }
47 1 1. enqueue : replaced return value with null for com/pivovarit/collectors/Dispatcher::enqueue → KILLED
        return future;
48
    }
49
50
    private FutureTask<T> completionTask(Supplier<T> supplier, InterruptibleCompletableFuture<T> future) {
51
        FutureTask<T> task = new FutureTask<>(() -> {
52 1 1. lambda$completionTask$0 : negated conditional → TIMED_OUT
            if (!completionSignaller.isCompletedExceptionally()) {
53
                try {
54 1 1. lambda$completionTask$0 : negated conditional → KILLED
                    if (limiter == null) {
55
                        future.complete(supplier.get());
56
                    } else {
57
                        try {
58 1 1. lambda$completionTask$0 : removed call to java/util/concurrent/Semaphore::acquire → KILLED
                            limiter.acquire();
59
                            future.complete(supplier.get());
60
                        } finally {
61 1 1. lambda$completionTask$0 : removed call to java/util/concurrent/Semaphore::release → TIMED_OUT
                            limiter.release();
62
                        }
63
                    }
64
                } catch (Throwable e) {
65
                    completionSignaller.completeExceptionally(e);
66
                }
67
            }
68
        }, null);
69 1 1. completionTask : removed call to com/pivovarit/collectors/Dispatcher$InterruptibleCompletableFuture::completedBy → KILLED
        future.completedBy(task);
70 1 1. completionTask : replaced return value with null for com/pivovarit/collectors/Dispatcher::completionTask → KILLED
        return task;
71
    }
72
73
    private static <T> BiConsumer<T, Throwable> shortcircuit(InterruptibleCompletableFuture<?> future) {
74 1 1. shortcircuit : replaced return value with null for com/pivovarit/collectors/Dispatcher::shortcircuit → KILLED
        return (__, throwable) -> {
75 1 1. lambda$shortcircuit$1 : negated conditional → TIMED_OUT
            if (throwable != null) {
76
                future.completeExceptionally(throwable);
77
                future.cancel(true);
78
            }
79
        };
80
    }
81
82
    static final class InterruptibleCompletableFuture<T> extends CompletableFuture<T> {
83
84
        private volatile FutureTask<T> backingTask;
85
86
        private void completedBy(FutureTask<T> task) {
87
            backingTask = task;
88
        }
89
90
        @Override
91
        public boolean cancel(boolean mayInterruptIfRunning) {
92
            var task = backingTask;
93 1 1. cancel : negated conditional → KILLED
            if (task != null) {
94
                task.cancel(mayInterruptIfRunning);
95
            }
96 2 1. cancel : replaced boolean return with false for com/pivovarit/collectors/Dispatcher$InterruptibleCompletableFuture::cancel → SURVIVED
2. cancel : replaced boolean return with true for com/pivovarit/collectors/Dispatcher$InterruptibleCompletableFuture::cancel → SURVIVED
            return super.cancel(mayInterruptIfRunning);
97
        }
98
    }
99
}

Mutations

31

1.1
Location : from
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_collectors()]/[dynamic-test:#42]
replaced return value with null for com/pivovarit/collectors/Dispatcher::from → KILLED

35

1.1
Location : virtual
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_collectors()]/[dynamic-test:#10]
replaced return value with null for com/pivovarit/collectors/Dispatcher::virtual → KILLED

42

1.1
Location : enqueue
Killed by : none
removed call to java/util/concurrent/Executor::execute → TIMED_OUT

45

1.1
Location : enqueue
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:collectors()]/[dynamic-test:#62]
replaced return value with null for com/pivovarit/collectors/Dispatcher::enqueue → KILLED

47

1.1
Location : enqueue
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_collectors()]/[dynamic-test:#42]
replaced return value with null for com/pivovarit/collectors/Dispatcher::enqueue → KILLED

52

1.1
Location : lambda$completionTask$0
Killed by : none
negated conditional → TIMED_OUT

54

1.1
Location : lambda$completionTask$0
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_collectors()]/[dynamic-test:#10]
negated conditional → KILLED

58

1.1
Location : lambda$completionTask$0
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_collectors()]/[dynamic-test:#35]
removed call to java/util/concurrent/Semaphore::acquire → KILLED

61

1.1
Location : lambda$completionTask$0
Killed by : none
removed call to java/util/concurrent/Semaphore::release → TIMED_OUT

69

1.1
Location : completionTask
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:collectors()]/[dynamic-test:#33]
removed call to com/pivovarit/collectors/Dispatcher$InterruptibleCompletableFuture::completedBy → KILLED

70

1.1
Location : completionTask
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_collectors()]/[dynamic-test:#42]
replaced return value with null for com/pivovarit/collectors/Dispatcher::completionTask → KILLED

74

1.1
Location : shortcircuit
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_collectors()]/[dynamic-test:#42]
replaced return value with null for com/pivovarit/collectors/Dispatcher::shortcircuit → KILLED

75

1.1
Location : lambda$shortcircuit$1
Killed by : none
negated conditional → TIMED_OUT

93

1.1
Location : cancel
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:collectors()]/[dynamic-test:#33]
negated conditional → KILLED

96

1.1
Location : cancel
Killed by : none
replaced boolean return with false for com/pivovarit/collectors/Dispatcher$InterruptibleCompletableFuture::cancel → SURVIVED

2.2
Location : cancel
Killed by : none
replaced boolean return with true for com/pivovarit/collectors/Dispatcher$InterruptibleCompletableFuture::cancel → SURVIVED

Active mutators

Tests examined


Report generated by PIT 1.16.0