ParallelStreamCollector.java

1
package com.pivovarit.collectors;
2
3
import java.util.ArrayList;
4
import java.util.Collection;
5
import java.util.EnumSet;
6
import java.util.List;
7
import java.util.Set;
8
import java.util.concurrent.CompletableFuture;
9
import java.util.concurrent.Executor;
10
import java.util.function.BiConsumer;
11
import java.util.function.BinaryOperator;
12
import java.util.function.Function;
13
import java.util.function.Supplier;
14
import java.util.stream.Collector;
15
import java.util.stream.Stream;
16
17
import static com.pivovarit.collectors.AsyncParallelCollector.requireValidParallelism;
18
import static com.pivovarit.collectors.BatchingSpliterator.batching;
19
import static com.pivovarit.collectors.BatchingSpliterator.partitioned;
20
import static com.pivovarit.collectors.CompletionStrategy.ordered;
21
import static com.pivovarit.collectors.CompletionStrategy.unordered;
22
import static java.util.Collections.emptySet;
23
import static java.util.Objects.requireNonNull;
24
import static java.util.stream.Collectors.collectingAndThen;
25
import static java.util.stream.Collectors.toList;
26
27
/**
28
 * @author Grzegorz Piwowarek
29
 */
30
class ParallelStreamCollector<T, R> implements Collector<T, List<CompletableFuture<R>>, Stream<R>> {
31
32
    private static final EnumSet<Characteristics> UNORDERED = EnumSet.of(Characteristics.UNORDERED);
33
34
    private final Function<T, R> function;
35
36
    private final CompletionStrategy<R> completionStrategy;
37
38
    private final Set<Characteristics> characteristics;
39
40
    private final Dispatcher<R> dispatcher;
41
42
    private ParallelStreamCollector(
43
        Function<T, R> function,
44
        CompletionStrategy<R> completionStrategy,
45
        Set<Characteristics> characteristics,
46
        Dispatcher<R> dispatcher) {
47
        this.completionStrategy = completionStrategy;
48
        this.characteristics = characteristics;
49
        this.dispatcher = dispatcher;
50
        this.function = function;
51
    }
52
53
    @Override
54
    public Supplier<List<CompletableFuture<R>>> supplier() {
55 1 1. supplier : replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::supplier → KILLED
        return ArrayList::new;
56
    }
57
58
    @Override
59
    public BiConsumer<List<CompletableFuture<R>>, T> accumulator() {
60 2 1. lambda$accumulator$0 : replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::lambda$accumulator$0 → KILLED
2. accumulator : replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::accumulator → KILLED
        return (acc, e) -> acc.add(dispatcher.enqueue(() -> function.apply(e)));
61
    }
62
63
    @Override
64
    public BinaryOperator<List<CompletableFuture<R>>> combiner() {
65 1 1. combiner : replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::combiner → SURVIVED
        return (left, right) -> {
66
            throw new UnsupportedOperationException(
67
                "Using parallel stream with parallel collectors is a bad idea");
68
        };
69
    }
70
71
    @Override
72
    public Function<List<CompletableFuture<R>>, Stream<R>> finisher() {
73 1 1. finisher : replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::finisher → KILLED
        return completionStrategy;
74
    }
75
76
    @Override
77
    public Set<Characteristics> characteristics() {
78 1 1. characteristics : replaced return value with Collections.emptySet for com/pivovarit/collectors/ParallelStreamCollector::characteristics → SURVIVED
        return characteristics;
79
    }
80
81
    static <T, R> Collector<T, ?, Stream<R>> streaming(Function<T, R> mapper) {
82
        requireNonNull(mapper, "mapper can't be null");
83
84 1 1. streaming : replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::streaming → KILLED
        return new ParallelStreamCollector<>(mapper, unordered(), UNORDERED, Dispatcher.virtual());
85
    }
86
87
    static <T, R> Collector<T, ?, Stream<R>> streaming(Function<T, R> mapper, Executor executor, int parallelism) {
88
        requireNonNull(executor, "executor can't be null");
89
        requireNonNull(mapper, "mapper can't be null");
90 1 1. streaming : removed call to com/pivovarit/collectors/AsyncParallelCollector::requireValidParallelism → KILLED
        requireValidParallelism(parallelism);
91
92 1 1. streaming : replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::streaming → KILLED
        return new ParallelStreamCollector<>(mapper, unordered(), UNORDERED, Dispatcher.from(executor, parallelism));
93
    }
94
95
    static <T, R> Collector<T, ?, Stream<R>> streamingOrdered(Function<T, R> mapper) {
96
        requireNonNull(mapper, "mapper can't be null");
97
98 1 1. streamingOrdered : replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::streamingOrdered → KILLED
        return new ParallelStreamCollector<>(mapper, ordered(), emptySet(), Dispatcher.virtual());
99
    }
100
101
    static <T, R> Collector<T, ?, Stream<R>> streamingOrdered(Function<T, R> mapper, Executor executor,
102
        int parallelism) {
103
        requireNonNull(executor, "executor can't be null");
104
        requireNonNull(mapper, "mapper can't be null");
105 1 1. streamingOrdered : removed call to com/pivovarit/collectors/AsyncParallelCollector::requireValidParallelism → KILLED
        requireValidParallelism(parallelism);
106
107 1 1. streamingOrdered : replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::streamingOrdered → KILLED
        return new ParallelStreamCollector<>(mapper, ordered(), emptySet(), Dispatcher.from(executor, parallelism));
108
    }
109
110
    static final class BatchingCollectors {
111
112
        private BatchingCollectors() {
113
        }
114
115
        static <T, R> Collector<T, ?, Stream<R>> streaming(Function<T, R> mapper, Executor executor,
116
            int parallelism) {
117
            requireNonNull(executor, "executor can't be null");
118
            requireNonNull(mapper, "mapper can't be null");
119 1 1. streaming : removed call to com/pivovarit/collectors/AsyncParallelCollector::requireValidParallelism → KILLED
            requireValidParallelism(parallelism);
120
121 2 1. streaming : negated conditional → KILLED
2. streaming : replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector$BatchingCollectors::streaming → KILLED
            return parallelism == 1
122
                ? syncCollector(mapper)
123
                : batchingCollector(mapper, executor, parallelism);
124
        }
125
126
        static <T, R> Collector<T, ?, Stream<R>> streamingOrdered(Function<T, R> mapper, Executor executor,
127
            int parallelism) {
128
            requireNonNull(executor, "executor can't be null");
129
            requireNonNull(mapper, "mapper can't be null");
130 1 1. streamingOrdered : removed call to com/pivovarit/collectors/AsyncParallelCollector::requireValidParallelism → KILLED
            requireValidParallelism(parallelism);
131
132 2 1. streamingOrdered : replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector$BatchingCollectors::streamingOrdered → KILLED
2. streamingOrdered : negated conditional → KILLED
            return parallelism == 1
133
                ? syncCollector(mapper)
134
                : batchingCollector(mapper, executor, parallelism);
135
        }
136
137
        private static <T, R> Collector<T, ?, Stream<R>> batchingCollector(Function<T, R> mapper,
138
            Executor executor, int parallelism) {
139 1 1. batchingCollector : replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector$BatchingCollectors::batchingCollector → KILLED
            return collectingAndThen(
140
                toList(),
141
                list -> {
142
                    // no sense to repack into batches of size 1
143 1 1. lambda$batchingCollector$1 : negated conditional → KILLED
                    if (list.size() == parallelism) {
144 1 1. lambda$batchingCollector$1 : replaced return value with Stream.empty for com/pivovarit/collectors/ParallelStreamCollector$BatchingCollectors::lambda$batchingCollector$1 → KILLED
                        return list.stream()
145
                            .collect(new ParallelStreamCollector<>(
146
                                mapper,
147
                                ordered(),
148
                                emptySet(),
149
                                Dispatcher.from(executor, parallelism)));
150
                    }
151
                    else {
152 1 1. lambda$batchingCollector$1 : replaced return value with Stream.empty for com/pivovarit/collectors/ParallelStreamCollector$BatchingCollectors::lambda$batchingCollector$1 → KILLED
                        return partitioned(list, parallelism)
153
                            .collect(collectingAndThen(new ParallelStreamCollector<>(
154
                                    batching(mapper),
155
                                    ordered(),
156
                                    emptySet(),
157
                                    Dispatcher.from(executor, parallelism)),
158 1 1. lambda$batchingCollector$0 : replaced return value with Stream.empty for com/pivovarit/collectors/ParallelStreamCollector$BatchingCollectors::lambda$batchingCollector$0 → KILLED
                                s -> s.flatMap(Collection::stream)));
159
                    }
160
                });
161
        }
162
163
        private static <T, R> Collector<T, Stream.Builder<R>, Stream<R>> syncCollector(Function<T, R> mapper) {
164 1 1. syncCollector : replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector$BatchingCollectors::syncCollector → KILLED
            return Collector.of(Stream::builder, (rs, t) -> rs.add(mapper.apply(t)), (rs, rs2) -> {
165
                throw new UnsupportedOperationException(
166
                    "Using parallel stream with parallel collectors is a bad idea");
167
            }, Stream.Builder::build);
168
        }
169
170
    }
171
172
}

Mutations

55

1.1
Location : supplier
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_collectors()]/[dynamic-test:#11]
replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::supplier → KILLED

60

1.1
Location : lambda$accumulator$0
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_collectors()]/[dynamic-test:#42]
replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::lambda$accumulator$0 → KILLED

2.2
Location : accumulator
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_collectors()]/[dynamic-test:#42]
replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::accumulator → KILLED

65

1.1
Location : combiner
Killed by : none
replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::combiner → SURVIVED

73

1.1
Location : finisher
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_collectors()]/[dynamic-test:#11]
replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::finisher → KILLED

78

1.1
Location : characteristics
Killed by : none
replaced return value with Collections.emptySet for com/pivovarit/collectors/ParallelStreamCollector::characteristics → SURVIVED

84

1.1
Location : streaming
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_collectors()]/[dynamic-test:#5]
replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::streaming → KILLED

90

1.1
Location : streaming
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_collectors()]/[dynamic-test:#28]
removed call to com/pivovarit/collectors/AsyncParallelCollector::requireValidParallelism → KILLED

92

1.1
Location : streaming
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_collectors()]/[dynamic-test:#22]
replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::streaming → KILLED

98

1.1
Location : streamingOrdered
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_collectors()]/[dynamic-test:#11]
replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::streamingOrdered → KILLED

105

1.1
Location : streamingOrdered
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_collectors()]/[dynamic-test:#40]
removed call to com/pivovarit/collectors/AsyncParallelCollector::requireValidParallelism → KILLED

107

1.1
Location : streamingOrdered
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_collectors()]/[dynamic-test:#34]
replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::streamingOrdered → KILLED

119

1.1
Location : streaming
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_batching_collectors()]/[dynamic-test:#11]
removed call to com/pivovarit/collectors/AsyncParallelCollector::requireValidParallelism → KILLED

121

1.1
Location : streaming
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_batching_collectors()]/[dynamic-test:#13]
negated conditional → KILLED

2.2
Location : streaming
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_batching_collectors()]/[dynamic-test:#32]
replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector$BatchingCollectors::streaming → KILLED

130

1.1
Location : streamingOrdered
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_batching_collectors()]/[dynamic-test:#51]
removed call to com/pivovarit/collectors/AsyncParallelCollector::requireValidParallelism → KILLED

132

1.1
Location : streamingOrdered
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_batching_collectors()]/[dynamic-test:#45]
replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector$BatchingCollectors::streamingOrdered → KILLED

2.2
Location : streamingOrdered
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_batching_collectors()]/[dynamic-test:#54]
negated conditional → KILLED

139

1.1
Location : batchingCollector
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_batching_collectors()]/[dynamic-test:#30]
replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector$BatchingCollectors::batchingCollector → KILLED

143

1.1
Location : lambda$batchingCollector$1
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_batching_collectors()]/[dynamic-test:#54]
negated conditional → KILLED

144

1.1
Location : lambda$batchingCollector$1
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_batching_collectors()]/[dynamic-test:#22]
replaced return value with Stream.empty for com/pivovarit/collectors/ParallelStreamCollector$BatchingCollectors::lambda$batchingCollector$1 → KILLED

152

1.1
Location : lambda$batchingCollector$1
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_batching_collectors()]/[dynamic-test:#29]
replaced return value with Stream.empty for com/pivovarit/collectors/ParallelStreamCollector$BatchingCollectors::lambda$batchingCollector$1 → KILLED

158

1.1
Location : lambda$batchingCollector$0
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_batching_collectors()]/[dynamic-test:#29]
replaced return value with Stream.empty for com/pivovarit/collectors/ParallelStreamCollector$BatchingCollectors::lambda$batchingCollector$0 → KILLED

164

1.1
Location : syncCollector
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_batching_collectors()]/[dynamic-test:#32]
replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector$BatchingCollectors::syncCollector → KILLED

Active mutators

Tests examined


Report generated by PIT 1.16.0