AsyncParallelStreamingCollector.java

1
package com.pivovarit.collectors;
2
3
import java.util.ArrayList;
4
import java.util.Collection;
5
import java.util.EnumSet;
6
import java.util.List;
7
import java.util.Set;
8
import java.util.concurrent.CompletableFuture;
9
import java.util.concurrent.Executor;
10
import java.util.function.BiConsumer;
11
import java.util.function.BinaryOperator;
12
import java.util.function.Function;
13
import java.util.function.Supplier;
14
import java.util.stream.Collector;
15
import java.util.stream.Stream;
16
17
import static com.pivovarit.collectors.BatchingSpliterator.batching;
18
import static com.pivovarit.collectors.BatchingSpliterator.partitioned;
19
import static com.pivovarit.collectors.CompletionStrategy.ordered;
20
import static com.pivovarit.collectors.CompletionStrategy.unordered;
21
import static java.util.Collections.emptySet;
22
23
/**
24
 * @author Grzegorz Piwowarek
25
 */
26
class AsyncParallelStreamingCollector<T, R> implements Collector<T, List<CompletableFuture<R>>, Stream<R>> {
27
28
    private final Function<? super T, ? extends R> function;
29
30
    private final CompletionStrategy<R> completionStrategy;
31
32
    private final Set<Characteristics> characteristics;
33
34
    private final Dispatcher<R> dispatcher;
35
36
    private AsyncParallelStreamingCollector(
37
      Function<? super T, ? extends R> function,
38
      Dispatcher<R> dispatcher,
39
      boolean ordered) {
40 1 1. <init> : negated conditional → KILLED
        this.completionStrategy = ordered ? ordered() : unordered();
41
        this.characteristics = switch (completionStrategy) {
42
            case CompletionStrategy.Ordered<R> __ -> emptySet();
43
            case CompletionStrategy.Unordered<R> __ -> EnumSet.of(Characteristics.UNORDERED);
44
        };
45
        this.dispatcher = dispatcher;
46
        this.function = function;
47
    }
48
49
    public static <T, R> Collector<T, ?, Stream<R>> from(
50
      Function<? super T, ? extends R> function, Executor executor, int parallelism, boolean ordered) {
51 1 1. from : replaced return value with null for com/pivovarit/collectors/AsyncParallelStreamingCollector::from → KILLED
        return new AsyncParallelStreamingCollector<>(function, new Dispatcher<>(executor, parallelism), ordered);
52
    }
53
54
    public static <T, R> Collector<T, ?, Stream<R>> from(
55
      Function<? super T, ? extends R> function, Executor executor, boolean ordered) {
56 1 1. from : replaced return value with null for com/pivovarit/collectors/AsyncParallelStreamingCollector::from → KILLED
        return new AsyncParallelStreamingCollector<>(function, new Dispatcher<>(executor), ordered);
57
    }
58
59
    @Override
60
    public Supplier<List<CompletableFuture<R>>> supplier() {
61 1 1. supplier : replaced return value with null for com/pivovarit/collectors/AsyncParallelStreamingCollector::supplier → KILLED
        return ArrayList::new;
62
    }
63
64
    @Override
65
    public BiConsumer<List<CompletableFuture<R>>, T> accumulator() {
66 1 1. accumulator : replaced return value with null for com/pivovarit/collectors/AsyncParallelStreamingCollector::accumulator → KILLED
        return (acc, e) -> {
67 1 1. lambda$accumulator$0 : removed call to com/pivovarit/collectors/Dispatcher::start → TIMED_OUT
            dispatcher.start();
68 1 1. lambda$accumulator$1 : replaced return value with null for com/pivovarit/collectors/AsyncParallelStreamingCollector::lambda$accumulator$1 → KILLED
            acc.add(dispatcher.enqueue(() -> function.apply(e)));
69
        };
70
    }
71
72
    @Override
73
    public BinaryOperator<List<CompletableFuture<R>>> combiner() {
74 1 1. combiner : replaced return value with null for com/pivovarit/collectors/AsyncParallelStreamingCollector::combiner → SURVIVED
        return (left, right) -> {
75
            throw new UnsupportedOperationException(
76
              "Using parallel stream with parallel collectors is a bad idea");
77
        };
78
    }
79
80
    @Override
81
    public Function<List<CompletableFuture<R>>, Stream<R>> finisher() {
82 1 1. finisher : replaced return value with null for com/pivovarit/collectors/AsyncParallelStreamingCollector::finisher → KILLED
        return acc -> {
83 1 1. lambda$finisher$0 : removed call to com/pivovarit/collectors/Dispatcher::stop → SURVIVED
            dispatcher.stop();
84 1 1. lambda$finisher$0 : replaced return value with Stream.empty for com/pivovarit/collectors/AsyncParallelStreamingCollector::lambda$finisher$0 → KILLED
            return completionStrategy.apply(acc);
85
        };
86
    }
87
88
    @Override
89
    public Set<Characteristics> characteristics() {
90 1 1. characteristics : replaced return value with Collections.emptySet for com/pivovarit/collectors/AsyncParallelStreamingCollector::characteristics → SURVIVED
        return characteristics;
91
    }
92
93
    record BatchingCollector<T, R>(Function<? super T, ? extends R> task, Executor executor, int parallelism,
94
                                           boolean ordered)
95
      implements Collector<T, ArrayList<T>, Stream<R>> {
96
97
        @Override
98
        public Supplier<ArrayList<T>> supplier() {
99 1 1. supplier : replaced return value with null for com/pivovarit/collectors/AsyncParallelStreamingCollector$BatchingCollector::supplier → KILLED
            return ArrayList::new;
100
        }
101
102
        @Override
103
        public BiConsumer<ArrayList<T>, T> accumulator() {
104 1 1. accumulator : replaced return value with null for com/pivovarit/collectors/AsyncParallelStreamingCollector$BatchingCollector::accumulator → KILLED
            return ArrayList::add;
105
        }
106
107
        @Override
108
        public BinaryOperator<ArrayList<T>> combiner() {
109 1 1. combiner : replaced return value with null for com/pivovarit/collectors/AsyncParallelStreamingCollector$BatchingCollector::combiner → SURVIVED
            return (left, right) -> {
110
                left.addAll(right);
111 1 1. lambda$combiner$0 : replaced return value with null for com/pivovarit/collectors/AsyncParallelStreamingCollector$BatchingCollector::lambda$combiner$0 → NO_COVERAGE
                return left;
112
            };
113
        }
114
115
        @Override
116
        public Function<ArrayList<T>, Stream<R>> finisher() {
117 1 1. finisher : replaced return value with null for com/pivovarit/collectors/AsyncParallelStreamingCollector$BatchingCollector::finisher → KILLED
            return items -> {
118 1 1. lambda$finisher$0 : negated conditional → KILLED
                if (items.size() == parallelism) {
119 1 1. lambda$finisher$0 : replaced return value with Stream.empty for com/pivovarit/collectors/AsyncParallelStreamingCollector$BatchingCollector::lambda$finisher$0 → KILLED
                    return items.stream()
120
                      .collect(new AsyncParallelStreamingCollector<>(task, new Dispatcher<>(executor, parallelism), ordered));
121
                } else {
122 1 1. lambda$finisher$0 : replaced return value with Stream.empty for com/pivovarit/collectors/AsyncParallelStreamingCollector$BatchingCollector::lambda$finisher$0 → KILLED
                    return partitioned(items, parallelism)
123
                      .collect(new AsyncParallelStreamingCollector<>(batching(task), new Dispatcher<>(executor, parallelism), ordered))
124
                      .flatMap(Collection::stream);
125
                }
126
            };
127
        }
128
129
        @Override
130
        public Set<Characteristics> characteristics() {
131
            return emptySet();
132
        }
133
    }
134
}

Mutations

40

1.1
Location : <init>
Killed by : com.pivovarit.collectors.test.StreamingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.StreamingTest]/[test-factory:shouldCollectInOriginalOrder()]/[dynamic-test:#1]
negated conditional → KILLED

51

1.1
Location : from
Killed by : com.pivovarit.collectors.test.BasicParallelismTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicParallelismTest]/[test-factory:shouldProcessEmptyWithMaxParallelism()]/[dynamic-test:#53]
replaced return value with null for com/pivovarit/collectors/AsyncParallelStreamingCollector::from → KILLED

56

1.1
Location : from
Killed by : com.pivovarit.collectors.test.BasicProcessingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicProcessingTest]/[test-factory:shouldProcessEmpty()]/[dynamic-test:#15]
replaced return value with null for com/pivovarit/collectors/AsyncParallelStreamingCollector::from → KILLED

61

1.1
Location : supplier
Killed by : com.pivovarit.collectors.test.BasicProcessingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicProcessingTest]/[test-factory:shouldProcessEmpty()]/[dynamic-test:#15]
replaced return value with null for com/pivovarit/collectors/AsyncParallelStreamingCollector::supplier → KILLED

66

1.1
Location : accumulator
Killed by : com.pivovarit.collectors.test.BasicProcessingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicProcessingTest]/[test-factory:shouldProcessAllElementsInOrder()]/[dynamic-test:#10]
replaced return value with null for com/pivovarit/collectors/AsyncParallelStreamingCollector::accumulator → KILLED

67

1.1
Location : lambda$accumulator$0
Killed by : none
removed call to com/pivovarit/collectors/Dispatcher::start → TIMED_OUT

68

1.1
Location : lambda$accumulator$1
Killed by : com.pivovarit.collectors.test.BasicProcessingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicProcessingTest]/[test-factory:shouldProcessAllElementsInOrder()]/[dynamic-test:#10]
replaced return value with null for com/pivovarit/collectors/AsyncParallelStreamingCollector::lambda$accumulator$1 → KILLED

74

1.1
Location : combiner
Killed by : none
replaced return value with null for com/pivovarit/collectors/AsyncParallelStreamingCollector::combiner → SURVIVED
Covering tests

82

1.1
Location : finisher
Killed by : com.pivovarit.collectors.test.BasicProcessingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicProcessingTest]/[test-factory:shouldProcessEmpty()]/[dynamic-test:#15]
replaced return value with null for com/pivovarit/collectors/AsyncParallelStreamingCollector::finisher → KILLED

83

1.1
Location : lambda$finisher$0
Killed by : none
removed call to com/pivovarit/collectors/Dispatcher::stop → SURVIVED
Covering tests

84

1.1
Location : lambda$finisher$0
Killed by : com.pivovarit.collectors.test.BasicProcessingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicProcessingTest]/[test-factory:shouldProcessAllElementsInOrder()]/[dynamic-test:#10]
replaced return value with Stream.empty for com/pivovarit/collectors/AsyncParallelStreamingCollector::lambda$finisher$0 → KILLED

90

1.1
Location : characteristics
Killed by : none
replaced return value with Collections.emptySet for com/pivovarit/collectors/AsyncParallelStreamingCollector::characteristics → SURVIVED
Covering tests

99

1.1
Location : supplier
Killed by : com.pivovarit.collectors.test.BasicParallelismTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicParallelismTest]/[test-factory:shouldProcessEmptyWithMaxParallelism()]/[dynamic-test:#55]
replaced return value with null for com/pivovarit/collectors/AsyncParallelStreamingCollector$BatchingCollector::supplier → KILLED

104

1.1
Location : accumulator
Killed by : com.pivovarit.collectors.test.RejectedExecutionHandlingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.RejectedExecutionHandlingTest]/[test-factory:shouldRejectInvalidRejectedExecutionHandlerFactory()]/[dynamic-test:#8]
replaced return value with null for com/pivovarit/collectors/AsyncParallelStreamingCollector$BatchingCollector::accumulator → KILLED

109

1.1
Location : combiner
Killed by : none
replaced return value with null for com/pivovarit/collectors/AsyncParallelStreamingCollector$BatchingCollector::combiner → SURVIVED
Covering tests

111

1.1
Location : lambda$combiner$0
Killed by : none
replaced return value with null for com/pivovarit/collectors/AsyncParallelStreamingCollector$BatchingCollector::lambda$combiner$0 → NO_COVERAGE

117

1.1
Location : finisher
Killed by : com.pivovarit.collectors.test.BasicParallelismTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicParallelismTest]/[test-factory:shouldProcessEmptyWithMaxParallelism()]/[dynamic-test:#55]
replaced return value with null for com/pivovarit/collectors/AsyncParallelStreamingCollector$BatchingCollector::finisher → KILLED

118

1.1
Location : lambda$finisher$0
Killed by : com.pivovarit.collectors.test.BatchingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BatchingTest]/[test-factory:shouldProcessOnExactlyNThreads()]/[dynamic-test:#4]
negated conditional → KILLED

119

1.1
Location : lambda$finisher$0
Killed by : com.pivovarit.collectors.test.RejectedExecutionHandlingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.RejectedExecutionHandlingTest]/[test-factory:shouldRejectInvalidRejectedExecutionHandlerFactory()]/[dynamic-test:#8]
replaced return value with Stream.empty for com/pivovarit/collectors/AsyncParallelStreamingCollector$BatchingCollector::lambda$finisher$0 → KILLED

122

1.1
Location : lambda$finisher$0
Killed by : com.pivovarit.collectors.test.BasicParallelismTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicParallelismTest]/[test-factory:shouldProcessAllElementsWithMaxParallelism()]/[dynamic-test:#33]
replaced return value with Stream.empty for com/pivovarit/collectors/AsyncParallelStreamingCollector$BatchingCollector::lambda$finisher$0 → KILLED

Active mutators

Tests examined


Report generated by PIT 1.19.4