ParallelStreamCollector.java

1
package com.pivovarit.collectors;
2
3
import java.util.ArrayList;
4
import java.util.Collection;
5
import java.util.EnumSet;
6
import java.util.List;
7
import java.util.Set;
8
import java.util.concurrent.CompletableFuture;
9
import java.util.concurrent.Executor;
10
import java.util.function.BiConsumer;
11
import java.util.function.BinaryOperator;
12
import java.util.function.Function;
13
import java.util.function.Supplier;
14
import java.util.stream.Collector;
15
import java.util.stream.Stream;
16
17
import static com.pivovarit.collectors.AsyncParallelCollector.requireValidParallelism;
18
import static com.pivovarit.collectors.BatchingSpliterator.batching;
19
import static com.pivovarit.collectors.BatchingSpliterator.partitioned;
20
import static com.pivovarit.collectors.CompletionStrategy.ordered;
21
import static com.pivovarit.collectors.CompletionStrategy.unordered;
22
import static java.util.Collections.emptySet;
23
import static java.util.Objects.requireNonNull;
24
import static java.util.stream.Collectors.collectingAndThen;
25
import static java.util.stream.Collectors.toList;
26
27
/**
28
 * @author Grzegorz Piwowarek
29
 */
30
class ParallelStreamCollector<T, R> implements Collector<T, List<CompletableFuture<R>>, Stream<R>> {
31
32
    private static final EnumSet<Characteristics> UNORDERED = EnumSet.of(Characteristics.UNORDERED);
33
34
    private final Function<T, R> function;
35
36
    private final CompletionStrategy<R> completionStrategy;
37
38
    private final Set<Characteristics> characteristics;
39
40
    private final Dispatcher<R> dispatcher;
41
42
    private ParallelStreamCollector(
43
      Function<T, R> function,
44
      CompletionStrategy<R> completionStrategy,
45
      Set<Characteristics> characteristics,
46
      Dispatcher<R> dispatcher) {
47
        this.completionStrategy = completionStrategy;
48
        this.characteristics = characteristics;
49
        this.dispatcher = dispatcher;
50
        this.function = function;
51
    }
52
53
    @Override
54
    public Supplier<List<CompletableFuture<R>>> supplier() {
55 1 1. supplier : replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::supplier → KILLED
        return ArrayList::new;
56
    }
57
58
    @Override
59
    public BiConsumer<List<CompletableFuture<R>>, T> accumulator() {
60 1 1. accumulator : replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::accumulator → KILLED
        return (acc, e) -> {
61 1 1. lambda$accumulator$1 : removed call to com/pivovarit/collectors/Dispatcher::start → TIMED_OUT
            dispatcher.start();
62 1 1. lambda$accumulator$0 : replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::lambda$accumulator$0 → KILLED
            acc.add(dispatcher.enqueue(() -> function.apply(e)));
63
        };
64
    }
65
66
    @Override
67
    public BinaryOperator<List<CompletableFuture<R>>> combiner() {
68 1 1. combiner : replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::combiner → SURVIVED
        return (left, right) -> {
69
            throw new UnsupportedOperationException(
70
              "Using parallel stream with parallel collectors is a bad idea");
71
        };
72
    }
73
74
    @Override
75
    public Function<List<CompletableFuture<R>>, Stream<R>> finisher() {
76 1 1. finisher : replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::finisher → KILLED
        return acc -> {
77 1 1. lambda$finisher$3 : removed call to com/pivovarit/collectors/Dispatcher::stop → SURVIVED
            dispatcher.stop();
78 1 1. lambda$finisher$3 : replaced return value with Stream.empty for com/pivovarit/collectors/ParallelStreamCollector::lambda$finisher$3 → KILLED
            return completionStrategy.apply(acc);
79
        };
80
    }
81
82
    @Override
83
    public Set<Characteristics> characteristics() {
84 1 1. characteristics : replaced return value with Collections.emptySet for com/pivovarit/collectors/ParallelStreamCollector::characteristics → SURVIVED
        return characteristics;
85
    }
86
87
    static <T, R> Collector<T, ?, Stream<R>> streaming(Function<T, R> mapper) {
88
        requireNonNull(mapper, "mapper can't be null");
89
90 1 1. streaming : replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::streaming → KILLED
        return new ParallelStreamCollector<>(mapper, unordered(), UNORDERED, Dispatcher.virtual());
91
    }
92
93
    static <T, R> Collector<T, ?, Stream<R>> streaming(Function<T, R> mapper, int parallelism) {
94
        requireNonNull(mapper, "mapper can't be null");
95 1 1. streaming : removed call to com/pivovarit/collectors/AsyncParallelCollector::requireValidParallelism → SURVIVED
        requireValidParallelism(parallelism);
96
97 1 1. streaming : replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::streaming → KILLED
        return new ParallelStreamCollector<>(mapper, unordered(), UNORDERED, Dispatcher.virtual(parallelism));
98
    }
99
100
    static <T, R> Collector<T, ?, Stream<R>> streaming(Function<T, R> mapper, Executor executor, int parallelism) {
101
        requireNonNull(executor, "executor can't be null");
102
        requireNonNull(mapper, "mapper can't be null");
103 1 1. streaming : removed call to com/pivovarit/collectors/AsyncParallelCollector::requireValidParallelism → KILLED
        requireValidParallelism(parallelism);
104
105 1 1. streaming : replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::streaming → KILLED
        return new ParallelStreamCollector<>(mapper, unordered(), UNORDERED, Dispatcher.from(executor, parallelism));
106
    }
107
108
    static <T, R> Collector<T, ?, Stream<R>> streamingOrdered(Function<T, R> mapper) {
109
        requireNonNull(mapper, "mapper can't be null");
110
111 1 1. streamingOrdered : replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::streamingOrdered → KILLED
        return new ParallelStreamCollector<>(mapper, ordered(), emptySet(), Dispatcher.virtual());
112
    }
113
114
    static <T, R> Collector<T, ?, Stream<R>> streamingOrdered(Function<T, R> mapper, int parallelism) {
115
        requireNonNull(mapper, "mapper can't be null");
116 1 1. streamingOrdered : removed call to com/pivovarit/collectors/AsyncParallelCollector::requireValidParallelism → SURVIVED
        requireValidParallelism(parallelism);
117
118 1 1. streamingOrdered : replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::streamingOrdered → KILLED
        return new ParallelStreamCollector<>(mapper, ordered(), emptySet(), Dispatcher.virtual(parallelism));
119
    }
120
121
    static <T, R> Collector<T, ?, Stream<R>> streamingOrdered(Function<T, R> mapper, Executor executor,
122
                                                              int parallelism) {
123
        requireNonNull(executor, "executor can't be null");
124
        requireNonNull(mapper, "mapper can't be null");
125 1 1. streamingOrdered : removed call to com/pivovarit/collectors/AsyncParallelCollector::requireValidParallelism → KILLED
        requireValidParallelism(parallelism);
126
127 1 1. streamingOrdered : replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::streamingOrdered → KILLED
        return new ParallelStreamCollector<>(mapper, ordered(), emptySet(), Dispatcher.from(executor, parallelism));
128
    }
129
130
    static final class BatchingCollectors {
131
132
        private BatchingCollectors() {
133
        }
134
135
        static <T, R> Collector<T, ?, Stream<R>> streaming(Function<T, R> mapper, Executor executor,
136
                                                           int parallelism) {
137
            requireNonNull(executor, "executor can't be null");
138
            requireNonNull(mapper, "mapper can't be null");
139 1 1. streaming : removed call to com/pivovarit/collectors/AsyncParallelCollector::requireValidParallelism → KILLED
            requireValidParallelism(parallelism);
140
141 2 1. streaming : negated conditional → KILLED
2. streaming : replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector$BatchingCollectors::streaming → KILLED
            return parallelism == 1
142
              ? syncCollector(mapper)
143
              : batchingCollector(mapper, executor, parallelism);
144
        }
145
146
        static <T, R> Collector<T, ?, Stream<R>> streamingOrdered(Function<T, R> mapper, Executor executor,
147
                                                                  int parallelism) {
148
            requireNonNull(executor, "executor can't be null");
149
            requireNonNull(mapper, "mapper can't be null");
150 1 1. streamingOrdered : removed call to com/pivovarit/collectors/AsyncParallelCollector::requireValidParallelism → KILLED
            requireValidParallelism(parallelism);
151
152 2 1. streamingOrdered : replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector$BatchingCollectors::streamingOrdered → KILLED
2. streamingOrdered : negated conditional → KILLED
            return parallelism == 1
153
              ? syncCollector(mapper)
154
              : batchingCollector(mapper, executor, parallelism);
155
        }
156
157
        private static <T, R> Collector<T, ?, Stream<R>> batchingCollector(Function<T, R> mapper,
158
                                                                           Executor executor, int parallelism) {
159 1 1. batchingCollector : replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector$BatchingCollectors::batchingCollector → KILLED
            return collectingAndThen(
160
              toList(),
161
              list -> {
162
                  // no sense to repack into batches of size 1
163 1 1. lambda$batchingCollector$1 : negated conditional → KILLED
                  if (list.size() == parallelism) {
164 1 1. lambda$batchingCollector$1 : replaced return value with Stream.empty for com/pivovarit/collectors/ParallelStreamCollector$BatchingCollectors::lambda$batchingCollector$1 → KILLED
                      return list.stream()
165
                        .collect(new ParallelStreamCollector<>(
166
                          mapper,
167
                          ordered(),
168
                          emptySet(),
169
                          Dispatcher.from(executor, parallelism)));
170
                  } else {
171 1 1. lambda$batchingCollector$1 : replaced return value with Stream.empty for com/pivovarit/collectors/ParallelStreamCollector$BatchingCollectors::lambda$batchingCollector$1 → KILLED
                      return partitioned(list, parallelism)
172
                        .collect(collectingAndThen(new ParallelStreamCollector<>(
173
                            batching(mapper),
174
                            ordered(),
175
                            emptySet(),
176
                            Dispatcher.from(executor, parallelism)),
177 1 1. lambda$batchingCollector$0 : replaced return value with Stream.empty for com/pivovarit/collectors/ParallelStreamCollector$BatchingCollectors::lambda$batchingCollector$0 → KILLED
                          s -> s.flatMap(Collection::stream)));
178
                  }
179
              });
180
        }
181
182
        private static <T, R> Collector<T, Stream.Builder<R>, Stream<R>> syncCollector(Function<T, R> mapper) {
183 1 1. syncCollector : replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector$BatchingCollectors::syncCollector → KILLED
            return Collector.of(Stream::builder, (rs, t) -> rs.add(mapper.apply(t)), (rs, rs2) -> {
184
                throw new UnsupportedOperationException(
185
                  "Using parallel stream with parallel collectors is a bad idea");
186
            }, Stream.Builder::build);
187
        }
188
    }
189
}

Mutations

55

1.1
Location : supplier
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_collectors()]/[dynamic-test:#33]
replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::supplier → KILLED

60

1.1
Location : accumulator
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_collectors()]/[dynamic-test:#17]
replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::accumulator → KILLED

61

1.1
Location : lambda$accumulator$1
Killed by : none
removed call to com/pivovarit/collectors/Dispatcher::start → TIMED_OUT

62

1.1
Location : lambda$accumulator$0
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_collectors()]/[dynamic-test:#17]
replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::lambda$accumulator$0 → KILLED

68

1.1
Location : combiner
Killed by : none
replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::combiner → SURVIVED

76

1.1
Location : finisher
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_collectors()]/[dynamic-test:#33]
replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::finisher → KILLED

77

1.1
Location : lambda$finisher$3
Killed by : none
removed call to com/pivovarit/collectors/Dispatcher::stop → SURVIVED

78

1.1
Location : lambda$finisher$3
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_collectors()]/[dynamic-test:#10]
replaced return value with Stream.empty for com/pivovarit/collectors/ParallelStreamCollector::lambda$finisher$3 → KILLED

84

1.1
Location : characteristics
Killed by : none
replaced return value with Collections.emptySet for com/pivovarit/collectors/ParallelStreamCollector::characteristics → SURVIVED

90

1.1
Location : streaming
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_collectors()]/[dynamic-test:#5]
replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::streaming → KILLED

95

1.1
Location : streaming
Killed by : none
removed call to com/pivovarit/collectors/AsyncParallelCollector::requireValidParallelism → SURVIVED

97

1.1
Location : streaming
Killed by : com.pivovarit.collectors.functional.ExecutorPollutionTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.functional.ExecutorPollutionTest]/[test-factory:shouldStartProcessingElementsTests()]/[dynamic-test:#6]
replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::streaming → KILLED

103

1.1
Location : streaming
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_collectors()]/[dynamic-test:#29]
removed call to com/pivovarit/collectors/AsyncParallelCollector::requireValidParallelism → KILLED

105

1.1
Location : streaming
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_collectors()]/[dynamic-test:#22]
replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::streaming → KILLED

111

1.1
Location : streamingOrdered
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_collectors()]/[dynamic-test:#11]
replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::streamingOrdered → KILLED

116

1.1
Location : streamingOrdered
Killed by : none
removed call to com/pivovarit/collectors/AsyncParallelCollector::requireValidParallelism → SURVIVED

118

1.1
Location : streamingOrdered
Killed by : com.pivovarit.collectors.functional.ExecutorPollutionTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.functional.ExecutorPollutionTest]/[test-factory:shouldStartProcessingElementsTests()]/[dynamic-test:#8]
replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::streamingOrdered → KILLED

125

1.1
Location : streamingOrdered
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_collectors()]/[dynamic-test:#42]
removed call to com/pivovarit/collectors/AsyncParallelCollector::requireValidParallelism → KILLED

127

1.1
Location : streamingOrdered
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_collectors()]/[dynamic-test:#33]
replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::streamingOrdered → KILLED

139

1.1
Location : streaming
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_batching_collectors()]/[dynamic-test:#12]
removed call to com/pivovarit/collectors/AsyncParallelCollector::requireValidParallelism → KILLED

141

1.1
Location : streaming
Killed by : com.pivovarit.collectors.functional.ExecutorValidationTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.functional.ExecutorValidationTest]/[test-factory:shouldStartProcessingElementsTests()]/[dynamic-test:#9]
negated conditional → KILLED

2.2
Location : streaming
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_batching_collectors()]/[dynamic-test:#34]
replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector$BatchingCollectors::streaming → KILLED

150

1.1
Location : streamingOrdered
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_batching_collectors()]/[dynamic-test:#55]
removed call to com/pivovarit/collectors/AsyncParallelCollector::requireValidParallelism → KILLED

152

1.1
Location : streamingOrdered
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_batching_collectors()]/[dynamic-test:#19]
replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector$BatchingCollectors::streamingOrdered → KILLED

2.2
Location : streamingOrdered
Killed by : com.pivovarit.collectors.functional.ExecutorValidationTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.functional.ExecutorValidationTest]/[test-factory:shouldStartProcessingElementsTests()]/[dynamic-test:#11]
negated conditional → KILLED

159

1.1
Location : batchingCollector
Killed by : com.pivovarit.collectors.functional.ExecutorValidationTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.functional.ExecutorValidationTest]/[test-factory:shouldStartProcessingElementsTests()]/[dynamic-test:#11]
replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector$BatchingCollectors::batchingCollector → KILLED

163

1.1
Location : lambda$batchingCollector$1
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_batching_collectors()]/[dynamic-test:#58]
negated conditional → KILLED

164

1.1
Location : lambda$batchingCollector$1
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_batching_collectors()]/[dynamic-test:#53]
replaced return value with Stream.empty for com/pivovarit/collectors/ParallelStreamCollector$BatchingCollectors::lambda$batchingCollector$1 → KILLED

171

1.1
Location : lambda$batchingCollector$1
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_batching_collectors()]/[dynamic-test:#45]
replaced return value with Stream.empty for com/pivovarit/collectors/ParallelStreamCollector$BatchingCollectors::lambda$batchingCollector$1 → KILLED

177

1.1
Location : lambda$batchingCollector$0
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_batching_collectors()]/[dynamic-test:#45]
replaced return value with Stream.empty for com/pivovarit/collectors/ParallelStreamCollector$BatchingCollectors::lambda$batchingCollector$0 → KILLED

183

1.1
Location : syncCollector
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_batching_collectors()]/[dynamic-test:#34]
replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector$BatchingCollectors::syncCollector → KILLED

Active mutators

Tests examined


Report generated by PIT 1.16.1