ParallelStreamCollector.java

1
package com.pivovarit.collectors;
2
3
import java.util.ArrayList;
4
import java.util.Collection;
5
import java.util.EnumSet;
6
import java.util.List;
7
import java.util.Set;
8
import java.util.concurrent.CompletableFuture;
9
import java.util.concurrent.Executor;
10
import java.util.function.BiConsumer;
11
import java.util.function.BinaryOperator;
12
import java.util.function.Function;
13
import java.util.function.Supplier;
14
import java.util.stream.Collector;
15
import java.util.stream.Stream;
16
17
import static com.pivovarit.collectors.AsyncParallelCollector.requireValidParallelism;
18
import static com.pivovarit.collectors.BatchingSpliterator.batching;
19
import static com.pivovarit.collectors.BatchingSpliterator.partitioned;
20
import static com.pivovarit.collectors.CompletionStrategy.ordered;
21
import static com.pivovarit.collectors.CompletionStrategy.unordered;
22
import static com.pivovarit.collectors.Dispatcher.getDefaultParallelism;
23
import static java.util.Collections.emptySet;
24
import static java.util.Objects.requireNonNull;
25
import static java.util.stream.Collectors.collectingAndThen;
26
import static java.util.stream.Collectors.toList;
27
28
/**
29
 * @author Grzegorz Piwowarek
30
 */
31
class ParallelStreamCollector<T, R> implements Collector<T, List<CompletableFuture<R>>, Stream<R>> {
32
33
    private static final EnumSet<Characteristics> UNORDERED = EnumSet.of(Characteristics.UNORDERED);
34
35
    private final Function<T, R> function;
36
37
    private final CompletionStrategy<R> completionStrategy;
38
39
    private final Set<Characteristics> characteristics;
40
41
    private final Dispatcher<R> dispatcher;
42
43
    private ParallelStreamCollector(
44
        Function<T, R> function,
45
        CompletionStrategy<R> completionStrategy,
46
        Set<Characteristics> characteristics,
47
        Dispatcher<R> dispatcher) {
48
        this.completionStrategy = completionStrategy;
49
        this.characteristics = characteristics;
50
        this.dispatcher = dispatcher;
51
        this.function = function;
52
    }
53
54
    @Override
55
    public Supplier<List<CompletableFuture<R>>> supplier() {
56 1 1. supplier : replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::supplier → KILLED
        return ArrayList::new;
57
    }
58
59
    @Override
60
    public BiConsumer<List<CompletableFuture<R>>, T> accumulator() {
61 1 1. accumulator : replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::accumulator → KILLED
        return (acc, e) -> {
62 1 1. lambda$accumulator$1 : removed call to com/pivovarit/collectors/Dispatcher::start → TIMED_OUT
            dispatcher.start();
63 1 1. lambda$null$0 : replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::lambda$null$0 → KILLED
            acc.add(dispatcher.enqueue(() -> function.apply(e)));
64
        };
65
    }
66
67
    @Override
68
    public BinaryOperator<List<CompletableFuture<R>>> combiner() {
69 1 1. combiner : replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::combiner → SURVIVED
        return (left, right) -> {
70
            throw new UnsupportedOperationException(
71
                "Using parallel stream with parallel collectors is a bad idea");
72
        };
73
    }
74
75
    @Override
76
    public Function<List<CompletableFuture<R>>, Stream<R>> finisher() {
77 1 1. finisher : replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::finisher → KILLED
        return acc -> {
78 1 1. lambda$finisher$3 : removed call to com/pivovarit/collectors/Dispatcher::stop → SURVIVED
            dispatcher.stop();
79 1 1. lambda$finisher$3 : replaced return value with Stream.empty for com/pivovarit/collectors/ParallelStreamCollector::lambda$finisher$3 → KILLED
            return completionStrategy.apply(acc);
80
        };
81
    }
82
83
    @Override
84
    public Set<Characteristics> characteristics() {
85 1 1. characteristics : replaced return value with Collections.emptySet for com/pivovarit/collectors/ParallelStreamCollector::characteristics → SURVIVED
        return characteristics;
86
    }
87
88
    static <T, R> Collector<T, ?, Stream<R>> streaming(Function<T, R> mapper, Executor executor) {
89 1 1. streaming : replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::streaming → NO_COVERAGE
        return streaming(mapper, executor, getDefaultParallelism());
90
    }
91
92
    static <T, R> Collector<T, ?, Stream<R>> streaming(Function<T, R> mapper, Executor executor, int parallelism) {
93
        requireNonNull(executor, "executor can't be null");
94
        requireNonNull(mapper, "mapper can't be null");
95 1 1. streaming : removed call to com/pivovarit/collectors/AsyncParallelCollector::requireValidParallelism → KILLED
        requireValidParallelism(parallelism);
96
97 1 1. streaming : replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::streaming → KILLED
        return new ParallelStreamCollector<>(mapper, unordered(), UNORDERED, Dispatcher.of(executor, parallelism));
98
    }
99
100
    static <T, R> Collector<T, ?, Stream<R>> streamingOrdered(Function<T, R> mapper, Executor executor) {
101 1 1. streamingOrdered : replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::streamingOrdered → NO_COVERAGE
        return streamingOrdered(mapper, executor, getDefaultParallelism());
102
    }
103
104
    static <T, R> Collector<T, ?, Stream<R>> streamingOrdered(Function<T, R> mapper, Executor executor,
105
        int parallelism) {
106
        requireNonNull(executor, "executor can't be null");
107
        requireNonNull(mapper, "mapper can't be null");
108 1 1. streamingOrdered : removed call to com/pivovarit/collectors/AsyncParallelCollector::requireValidParallelism → KILLED
        requireValidParallelism(parallelism);
109
110 1 1. streamingOrdered : replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::streamingOrdered → KILLED
        return new ParallelStreamCollector<>(mapper, ordered(), emptySet(), Dispatcher.of(executor, parallelism));
111
    }
112
113
    static final class BatchingCollectors {
114
115
        private BatchingCollectors() {
116
        }
117
118
        static <T, R> Collector<T, ?, Stream<R>> streaming(Function<T, R> mapper, Executor executor,
119
            int parallelism) {
120
            requireNonNull(executor, "executor can't be null");
121
            requireNonNull(mapper, "mapper can't be null");
122 1 1. streaming : removed call to com/pivovarit/collectors/AsyncParallelCollector::requireValidParallelism → KILLED
            requireValidParallelism(parallelism);
123
124 2 1. streaming : negated conditional → TIMED_OUT
2. streaming : replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector$BatchingCollectors::streaming → KILLED
            return parallelism == 1
125
                ? syncCollector(mapper)
126
                : batchingCollector(mapper, executor, parallelism);
127
        }
128
129
        static <T, R> Collector<T, ?, Stream<R>> streamingOrdered(Function<T, R> mapper, Executor executor,
130
            int parallelism) {
131
            requireNonNull(executor, "executor can't be null");
132
            requireNonNull(mapper, "mapper can't be null");
133 1 1. streamingOrdered : removed call to com/pivovarit/collectors/AsyncParallelCollector::requireValidParallelism → KILLED
            requireValidParallelism(parallelism);
134
135 2 1. streamingOrdered : negated conditional → TIMED_OUT
2. streamingOrdered : replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector$BatchingCollectors::streamingOrdered → KILLED
            return parallelism == 1
136
                ? syncCollector(mapper)
137
                : batchingCollector(mapper, executor, parallelism);
138
        }
139
140
        private static <T, R> Collector<T, ?, Stream<R>> batchingCollector(Function<T, R> mapper,
141
            Executor executor, int parallelism) {
142 1 1. batchingCollector : replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector$BatchingCollectors::batchingCollector → KILLED
            return collectingAndThen(
143
                toList(),
144
                list -> {
145
                    // no sense to repack into batches of size 1
146 1 1. lambda$batchingCollector$1 : negated conditional → KILLED
                    if (list.size() == parallelism) {
147 1 1. lambda$batchingCollector$1 : replaced return value with Stream.empty for com/pivovarit/collectors/ParallelStreamCollector$BatchingCollectors::lambda$batchingCollector$1 → KILLED
                        return list.stream()
148
                            .collect(new ParallelStreamCollector<>(
149
                                mapper,
150
                                ordered(),
151
                                emptySet(),
152
                                Dispatcher.of(executor, parallelism)));
153
                    }
154
                    else {
155 1 1. lambda$batchingCollector$1 : replaced return value with Stream.empty for com/pivovarit/collectors/ParallelStreamCollector$BatchingCollectors::lambda$batchingCollector$1 → KILLED
                        return partitioned(list, parallelism)
156
                            .collect(collectingAndThen(new ParallelStreamCollector<>(
157
                                    batching(mapper),
158
                                    ordered(),
159
                                    emptySet(),
160
                                    Dispatcher.of(executor, parallelism)),
161 1 1. lambda$null$0 : replaced return value with Stream.empty for com/pivovarit/collectors/ParallelStreamCollector$BatchingCollectors::lambda$null$0 → KILLED
                                s -> s.flatMap(Collection::stream)));
162
                    }
163
                });
164
        }
165
166
        private static <T, R> Collector<T, Stream.Builder<R>, Stream<R>> syncCollector(Function<T, R> mapper) {
167 1 1. syncCollector : replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector$BatchingCollectors::syncCollector → KILLED
            return Collector.of(Stream::builder, (rs, t) -> rs.add(mapper.apply(t)), (rs, rs2) -> {
168
                throw new UnsupportedOperationException(
169
                    "Using parallel stream with parallel collectors is a bad idea");
170
            }, Stream.Builder::build);
171
        }
172
173
    }
174
175
}

Mutations

56

1.1
Location : supplier
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_collectors()]/[dynamic-test:#15]
replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::supplier → KILLED

61

1.1
Location : accumulator
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_collectors()]/[dynamic-test:#14]
replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::accumulator → KILLED

62

1.1
Location : lambda$accumulator$1
Killed by : none
removed call to com/pivovarit/collectors/Dispatcher::start → TIMED_OUT

63

1.1
Location : lambda$null$0
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_collectors()]/[dynamic-test:#14]
replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::lambda$null$0 → KILLED

69

1.1
Location : combiner
Killed by : none
replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::combiner → SURVIVED

77

1.1
Location : finisher
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_collectors()]/[dynamic-test:#15]
replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::finisher → KILLED

78

1.1
Location : lambda$finisher$3
Killed by : none
removed call to com/pivovarit/collectors/Dispatcher::stop → SURVIVED

79

1.1
Location : lambda$finisher$3
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_collectors()]/[dynamic-test:#14]
replaced return value with Stream.empty for com/pivovarit/collectors/ParallelStreamCollector::lambda$finisher$3 → KILLED

85

1.1
Location : characteristics
Killed by : none
replaced return value with Collections.emptySet for com/pivovarit/collectors/ParallelStreamCollector::characteristics → SURVIVED

89

1.1
Location : streaming
Killed by : none
replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::streaming → NO_COVERAGE

95

1.1
Location : streaming
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_collectors()]/[dynamic-test:#12]
removed call to com/pivovarit/collectors/AsyncParallelCollector::requireValidParallelism → KILLED

97

1.1
Location : streaming
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_collectors()]/[dynamic-test:#6]
replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::streaming → KILLED

101

1.1
Location : streamingOrdered
Killed by : none
replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::streamingOrdered → NO_COVERAGE

108

1.1
Location : streamingOrdered
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_collectors()]/[dynamic-test:#24]
removed call to com/pivovarit/collectors/AsyncParallelCollector::requireValidParallelism → KILLED

110

1.1
Location : streamingOrdered
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_collectors()]/[dynamic-test:#15]
replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::streamingOrdered → KILLED

122

1.1
Location : streaming
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_batching_collectors()]/[dynamic-test:#12]
removed call to com/pivovarit/collectors/AsyncParallelCollector::requireValidParallelism → KILLED

124

1.1
Location : streaming
Killed by : none
negated conditional → TIMED_OUT

2.2
Location : streaming
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_batching_collectors()]/[dynamic-test:#6]
replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector$BatchingCollectors::streaming → KILLED

133

1.1
Location : streamingOrdered
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_batching_collectors()]/[dynamic-test:#25]
removed call to com/pivovarit/collectors/AsyncParallelCollector::requireValidParallelism → KILLED

135

1.1
Location : streamingOrdered
Killed by : none
negated conditional → TIMED_OUT

2.2
Location : streamingOrdered
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_batching_collectors()]/[dynamic-test:#14]
replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector$BatchingCollectors::streamingOrdered → KILLED

142

1.1
Location : batchingCollector
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_batching_collectors()]/[dynamic-test:#16]
replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector$BatchingCollectors::batchingCollector → KILLED

146

1.1
Location : lambda$batchingCollector$1
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_batching_collectors()]/[dynamic-test:#27]
negated conditional → KILLED

147

1.1
Location : lambda$batchingCollector$1
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_batching_collectors()]/[dynamic-test:#18]
replaced return value with Stream.empty for com/pivovarit/collectors/ParallelStreamCollector$BatchingCollectors::lambda$batchingCollector$1 → KILLED

155

1.1
Location : lambda$batchingCollector$1
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_batching_collectors()]/[dynamic-test:#15]
replaced return value with Stream.empty for com/pivovarit/collectors/ParallelStreamCollector$BatchingCollectors::lambda$batchingCollector$1 → KILLED

161

1.1
Location : lambda$null$0
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_batching_collectors()]/[dynamic-test:#15]
replaced return value with Stream.empty for com/pivovarit/collectors/ParallelStreamCollector$BatchingCollectors::lambda$null$0 → KILLED

167

1.1
Location : syncCollector
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_batching_collectors()]/[dynamic-test:#14]
replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector$BatchingCollectors::syncCollector → KILLED

Active mutators

Tests examined


Report generated by PIT 1.9.8