ParallelStreamCollector.java

1
package com.pivovarit.collectors;
2
3
import java.util.ArrayList;
4
import java.util.Collection;
5
import java.util.EnumSet;
6
import java.util.List;
7
import java.util.Set;
8
import java.util.concurrent.CompletableFuture;
9
import java.util.concurrent.Executor;
10
import java.util.function.BiConsumer;
11
import java.util.function.BinaryOperator;
12
import java.util.function.Function;
13
import java.util.function.Supplier;
14
import java.util.stream.Collector;
15
import java.util.stream.Stream;
16
17
import static com.pivovarit.collectors.BatchingSpliterator.batching;
18
import static com.pivovarit.collectors.BatchingSpliterator.partitioned;
19
import static com.pivovarit.collectors.CompletionStrategy.ordered;
20
import static com.pivovarit.collectors.CompletionStrategy.unordered;
21
import static java.util.Collections.emptySet;
22
import static java.util.Objects.requireNonNull;
23
import static java.util.stream.Collectors.collectingAndThen;
24
import static java.util.stream.Collectors.toList;
25
26
/**
27
 * @author Grzegorz Piwowarek
28
 */
29
class ParallelStreamCollector<T, R> implements Collector<T, List<CompletableFuture<R>>, Stream<R>> {
30
31
    private static final EnumSet<Characteristics> UNORDERED = EnumSet.of(Characteristics.UNORDERED);
32
33
    private final Function<? super T, ? extends R> function;
34
35
    private final CompletionStrategy<R> completionStrategy;
36
37
    private final Set<Characteristics> characteristics;
38
39
    private final Dispatcher<R> dispatcher;
40
41
    private ParallelStreamCollector(
42
      Function<? super T, ? extends R> function,
43
      CompletionStrategy<R> completionStrategy,
44
      Set<Characteristics> characteristics,
45
      Dispatcher<R> dispatcher) {
46
        this.completionStrategy = completionStrategy;
47
        this.characteristics = characteristics;
48
        this.dispatcher = dispatcher;
49
        this.function = function;
50
    }
51
52
    @Override
53
    public Supplier<List<CompletableFuture<R>>> supplier() {
54 1 1. supplier : replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::supplier → KILLED
        return ArrayList::new;
55
    }
56
57
    @Override
58
    public BiConsumer<List<CompletableFuture<R>>, T> accumulator() {
59 1 1. accumulator : replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::accumulator → KILLED
        return (acc, e) -> {
60 1 1. lambda$accumulator$1 : removed call to com/pivovarit/collectors/Dispatcher::start → TIMED_OUT
            dispatcher.start();
61 1 1. lambda$accumulator$0 : replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::lambda$accumulator$0 → KILLED
            acc.add(dispatcher.enqueue(() -> function.apply(e)));
62
        };
63
    }
64
65
    @Override
66
    public BinaryOperator<List<CompletableFuture<R>>> combiner() {
67 1 1. combiner : replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::combiner → SURVIVED
        return (left, right) -> {
68
            throw new UnsupportedOperationException(
69
              "Using parallel stream with parallel collectors is a bad idea");
70
        };
71
    }
72
73
    @Override
74
    public Function<List<CompletableFuture<R>>, Stream<R>> finisher() {
75 1 1. finisher : replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::finisher → KILLED
        return acc -> {
76 1 1. lambda$finisher$3 : removed call to com/pivovarit/collectors/Dispatcher::stop → SURVIVED
            dispatcher.stop();
77 1 1. lambda$finisher$3 : replaced return value with Stream.empty for com/pivovarit/collectors/ParallelStreamCollector::lambda$finisher$3 → KILLED
            return completionStrategy.apply(acc);
78
        };
79
    }
80
81
    @Override
82
    public Set<Characteristics> characteristics() {
83 1 1. characteristics : replaced return value with Collections.emptySet for com/pivovarit/collectors/ParallelStreamCollector::characteristics → SURVIVED
        return characteristics;
84
    }
85
86
    static <T, R> Collector<T, ?, Stream<R>> streaming(Function<? super T, ? extends R> mapper, boolean ordered, Option... options) {
87
        requireNonNull(mapper, "mapper can't be null");
88
89
        Option.Configuration config = Option.process(options);
90
91 1 1. streaming : negated conditional → KILLED
        CompletionStrategy<R> completionStrategy = ordered ? ordered() : unordered();
92 1 1. streaming : negated conditional → SURVIVED
        Set<Characteristics> characteristics = ordered ? emptySet() : UNORDERED;
93
94 2 1. streaming : negated conditional → KILLED
2. streaming : negated conditional → KILLED
        if (config.batching().isPresent() && config.batching().get()) {
95 1 1. streaming : negated conditional → KILLED
            if (config.parallelism().isEmpty()) {
96
                throw new IllegalArgumentException("it's obligatory to provide parallelism when using batching");
97
            }
98
99
            var parallelism = config.parallelism().orElseThrow();
100
101 1 1. streaming : negated conditional → KILLED
            if (config.executor().isPresent()) {
102
                var executor = config.executor().orElseThrow();
103
104 2 1. streaming : negated conditional → KILLED
2. streaming : replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::streaming → KILLED
                return parallelism == 1
105
                  ? syncCollector(mapper)
106
                  : batchingCollector(mapper, executor, parallelism);
107
            } else {
108 1 1. streaming : replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::streaming → NO_COVERAGE
                return batchingCollector(mapper, parallelism);
109
            }
110
        } else {
111 2 1. streaming : negated conditional → KILLED
2. streaming : negated conditional → KILLED
            if (config.executor().isPresent() && config.parallelism().isPresent()) {
112
                var executor = config.executor().orElseThrow();
113
                var parallelism = config.parallelism().orElseThrow();
114
115 1 1. streaming : replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::streaming → KILLED
                return new ParallelStreamCollector<>(mapper, completionStrategy, characteristics, Dispatcher.from(executor, parallelism));
116 1 1. streaming : negated conditional → KILLED
            } else if (config.executor().isPresent()) {
117
                var executor = config.executor().orElseThrow();
118
119 1 1. streaming : replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::streaming → KILLED
                return new ParallelStreamCollector<>(mapper, completionStrategy, characteristics, Dispatcher.from(executor));
120 1 1. streaming : negated conditional → KILLED
            } else if (config.parallelism().isPresent()) {
121
                var parallelism = config.parallelism().orElseThrow();
122
123 1 1. streaming : replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::streaming → KILLED
                return new ParallelStreamCollector<>(mapper, completionStrategy, characteristics, Dispatcher.virtual(parallelism));
124
            }
125
126 1 1. streaming : replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::streaming → KILLED
            return new ParallelStreamCollector<>(mapper, completionStrategy, characteristics, Dispatcher.virtual());
127
        }
128
    }
129
130
    static <T, R> Collector<T, ?, Stream<R>> batchingCollector(Function<? super T, ? extends R> mapper, Executor executor, int parallelism) {
131 1 1. batchingCollector : replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::batchingCollector → KILLED
        return collectingAndThen(
132
          toList(),
133
          list -> {
134
              // no sense to repack into batches of size 1
135 1 1. lambda$batchingCollector$5 : negated conditional → SURVIVED
              if (list.size() == parallelism) {
136 1 1. lambda$batchingCollector$5 : replaced return value with Stream.empty for com/pivovarit/collectors/ParallelStreamCollector::lambda$batchingCollector$5 → KILLED
                  return list.stream()
137
                    .collect(new ParallelStreamCollector<>(
138
                      mapper,
139
                      ordered(),
140
                      emptySet(),
141 1 1. lambda$batchingCollector$5 : negated conditional → KILLED
                      executor != null
142
                        ? Dispatcher.from(executor, parallelism)
143
                        : Dispatcher.virtual(parallelism)));
144
              } else {
145 1 1. lambda$batchingCollector$5 : replaced return value with Stream.empty for com/pivovarit/collectors/ParallelStreamCollector::lambda$batchingCollector$5 → KILLED
                  return partitioned(list, parallelism)
146
                    .collect(collectingAndThen(new ParallelStreamCollector<>(
147
                        batching(mapper),
148
                        ordered(),
149
                        emptySet(),
150 1 1. lambda$batchingCollector$5 : negated conditional → SURVIVED
                        executor != null
151
                          ? Dispatcher.from(executor, parallelism)
152
                          : Dispatcher.virtual(parallelism)),
153 1 1. lambda$batchingCollector$4 : replaced return value with Stream.empty for com/pivovarit/collectors/ParallelStreamCollector::lambda$batchingCollector$4 → KILLED
                      s -> s.flatMap(Collection::stream)));
154
              }
155
          });
156
    }
157
158
    static <T, R> Collector<T, ?, Stream<R>> batchingCollector(Function<? super T, ? extends R> mapper, int parallelism) {
159 1 1. batchingCollector : replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::batchingCollector → NO_COVERAGE
        return batchingCollector(mapper, null, parallelism);
160
    }
161
162
    static <T, R> Collector<T, Stream.Builder<R>, Stream<R>> syncCollector(Function<? super T, ? extends R> mapper) {
163 1 1. syncCollector : replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::syncCollector → KILLED
        return Collector.of(Stream::builder, (rs, t) -> rs.add(mapper.apply(t)), (rs, rs2) -> {
164
            throw new UnsupportedOperationException("Using parallel stream with parallel collectors is a bad idea");
165
        }, Stream.Builder::build);
166
    }
167
}

Mutations

54

1.1
Location : supplier
Killed by : com.pivovarit.collectors.test.BasicProcessingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicProcessingTest]/[test-factory:shouldProcessEmpty()]/[dynamic-test:#15]
replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::supplier → KILLED

59

1.1
Location : accumulator
Killed by : com.pivovarit.collectors.test.BasicProcessingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicProcessingTest]/[test-factory:shouldProcessAllElementsInOrder()]/[dynamic-test:#10]
replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::accumulator → KILLED

60

1.1
Location : lambda$accumulator$1
Killed by : none
removed call to com/pivovarit/collectors/Dispatcher::start → TIMED_OUT

61

1.1
Location : lambda$accumulator$0
Killed by : com.pivovarit.collectors.test.BasicProcessingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicProcessingTest]/[test-factory:shouldProcessAllElementsInOrder()]/[dynamic-test:#10]
replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::lambda$accumulator$0 → KILLED

67

1.1
Location : combiner
Killed by : none
replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::combiner → SURVIVED
Covering tests

75

1.1
Location : finisher
Killed by : com.pivovarit.collectors.test.BasicProcessingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicProcessingTest]/[test-factory:shouldProcessEmpty()]/[dynamic-test:#15]
replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::finisher → KILLED

76

1.1
Location : lambda$finisher$3
Killed by : none
removed call to com/pivovarit/collectors/Dispatcher::stop → SURVIVED
Covering tests

77

1.1
Location : lambda$finisher$3
Killed by : com.pivovarit.collectors.test.BasicProcessingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicProcessingTest]/[test-factory:shouldProcessAllElementsInOrder()]/[dynamic-test:#10]
replaced return value with Stream.empty for com/pivovarit/collectors/ParallelStreamCollector::lambda$finisher$3 → KILLED

83

1.1
Location : characteristics
Killed by : none
replaced return value with Collections.emptySet for com/pivovarit/collectors/ParallelStreamCollector::characteristics → SURVIVED
Covering tests

91

1.1
Location : streaming
Killed by : com.pivovarit.collectors.test.StreamingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.StreamingTest]/[test-factory:shouldCollectInOriginalOrder()]/[dynamic-test:#1]
negated conditional → KILLED

92

1.1
Location : streaming
Killed by : none
negated conditional → SURVIVED
Covering tests

94

1.1
Location : streaming
Killed by : com.pivovarit.collectors.test.BatchingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BatchingTest]/[test-factory:shouldProcessOnExactlyNThreads()]/[dynamic-test:#4]
negated conditional → KILLED

2.2
Location : streaming
Killed by : com.pivovarit.collectors.test.BasicProcessingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicProcessingTest]/[test-factory:shouldProcessEmpty()]/[dynamic-test:#15]
negated conditional → KILLED

95

1.1
Location : streaming
Killed by : com.pivovarit.collectors.test.BasicParallelismTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicParallelismTest]/[test-factory:shouldProcessEmptyWithMaxParallelism()]/[dynamic-test:#8]
negated conditional → KILLED

101

1.1
Location : streaming
Killed by : com.pivovarit.collectors.test.RejectedExecutionHandlingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.RejectedExecutionHandlingTest]/[test-factory:shouldRejectInvalidRejectedExecutionHandlerFactory()]/[dynamic-test:#6]
negated conditional → KILLED

104

1.1
Location : streaming
Killed by : com.pivovarit.collectors.test.RejectedExecutionHandlingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.RejectedExecutionHandlingTest]/[test-factory:shouldRejectInvalidRejectedExecutionHandlerFactory()]/[dynamic-test:#6]
negated conditional → KILLED

2.2
Location : streaming
Killed by : com.pivovarit.collectors.test.BasicParallelismTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicParallelismTest]/[test-factory:shouldProcessEmptyWithMaxParallelism()]/[dynamic-test:#8]
replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::streaming → KILLED

108

1.1
Location : streaming
Killed by : none
replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::streaming → NO_COVERAGE

111

1.1
Location : streaming
Killed by : com.pivovarit.collectors.test.BasicParallelismTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicParallelismTest]/[test-factory:shouldProcessEmptyWithMaxParallelism()]/[dynamic-test:#86]
negated conditional → KILLED

2.2
Location : streaming
Killed by : com.pivovarit.collectors.test.BasicProcessingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicProcessingTest]/[test-factory:shouldProcessEmpty()]/[dynamic-test:#17]
negated conditional → KILLED

115

1.1
Location : streaming
Killed by : com.pivovarit.collectors.test.BasicParallelismTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicParallelismTest]/[test-factory:shouldProcessEmptyWithMaxParallelism()]/[dynamic-test:#32]
replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::streaming → KILLED

116

1.1
Location : streaming
Killed by : com.pivovarit.collectors.test.BasicProcessingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicProcessingTest]/[test-factory:shouldProcessEmpty()]/[dynamic-test:#15]
negated conditional → KILLED

119

1.1
Location : streaming
Killed by : com.pivovarit.collectors.test.BasicProcessingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicProcessingTest]/[test-factory:shouldProcessEmpty()]/[dynamic-test:#17]
replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::streaming → KILLED

120

1.1
Location : streaming
Killed by : com.pivovarit.collectors.test.BasicProcessingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicProcessingTest]/[test-factory:shouldProcessEmpty()]/[dynamic-test:#15]
negated conditional → KILLED

123

1.1
Location : streaming
Killed by : com.pivovarit.collectors.test.BasicParallelismTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicParallelismTest]/[test-factory:shouldProcessEmptyWithMaxParallelism()]/[dynamic-test:#53]
replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::streaming → KILLED

126

1.1
Location : streaming
Killed by : com.pivovarit.collectors.test.BasicProcessingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicProcessingTest]/[test-factory:shouldProcessEmpty()]/[dynamic-test:#15]
replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::streaming → KILLED

131

1.1
Location : batchingCollector
Killed by : com.pivovarit.collectors.test.BasicProcessingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicProcessingTest]/[test-factory:shouldProcessEmpty()]/[dynamic-test:#19]
replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::batchingCollector → KILLED

135

1.1
Location : lambda$batchingCollector$5
Killed by : none
negated conditional → SURVIVED
Covering tests

136

1.1
Location : lambda$batchingCollector$5
Killed by : com.pivovarit.collectors.test.BasicParallelismTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicParallelismTest]/[test-factory:shouldProcessAllElementsWithMaxParallelism()]/[dynamic-test:#88]
replaced return value with Stream.empty for com/pivovarit/collectors/ParallelStreamCollector::lambda$batchingCollector$5 → KILLED

141

1.1
Location : lambda$batchingCollector$5
Killed by : com.pivovarit.collectors.test.RejectedExecutionHandlingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.RejectedExecutionHandlingTest]/[test-factory:shouldRejectInvalidRejectedExecutionHandlerFactory()]/[dynamic-test:#6]
negated conditional → KILLED

145

1.1
Location : lambda$batchingCollector$5
Killed by : com.pivovarit.collectors.test.BasicParallelismTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicParallelismTest]/[test-factory:shouldProcessAllElementsWithMaxParallelism()]/[dynamic-test:#19]
replaced return value with Stream.empty for com/pivovarit/collectors/ParallelStreamCollector::lambda$batchingCollector$5 → KILLED

150

1.1
Location : lambda$batchingCollector$5
Killed by : none
negated conditional → SURVIVED
Covering tests

153

1.1
Location : lambda$batchingCollector$4
Killed by : com.pivovarit.collectors.test.BasicParallelismTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicParallelismTest]/[test-factory:shouldProcessAllElementsWithMaxParallelism()]/[dynamic-test:#19]
replaced return value with Stream.empty for com/pivovarit/collectors/ParallelStreamCollector::lambda$batchingCollector$4 → KILLED

159

1.1
Location : batchingCollector
Killed by : none
replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::batchingCollector → NO_COVERAGE

163

1.1
Location : syncCollector
Killed by : com.pivovarit.collectors.test.BasicParallelismTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicParallelismTest]/[test-factory:shouldProcessEmptyWithMaxParallelism()]/[dynamic-test:#8]
replaced return value with null for com/pivovarit/collectors/ParallelStreamCollector::syncCollector → KILLED

Active mutators

Tests examined


Report generated by PIT 1.19.1