AsyncParallelStreamingCollector.java

1
package com.pivovarit.collectors;
2
3
import java.util.ArrayList;
4
import java.util.Collection;
5
import java.util.EnumSet;
6
import java.util.List;
7
import java.util.Set;
8
import java.util.concurrent.CompletableFuture;
9
import java.util.concurrent.Executor;
10
import java.util.function.BiConsumer;
11
import java.util.function.BinaryOperator;
12
import java.util.function.Function;
13
import java.util.function.Supplier;
14
import java.util.stream.Collector;
15
import java.util.stream.Stream;
16
import java.util.stream.StreamSupport;
17
18
import static com.pivovarit.collectors.BatchingSpliterator.batching;
19
import static com.pivovarit.collectors.BatchingSpliterator.partitioned;
20
import static java.util.Collections.emptySet;
21
22
/**
23
 * @author Grzegorz Piwowarek
24
 */
25
class AsyncParallelStreamingCollector<T, R> implements Collector<T, List<CompletableFuture<R>>, Stream<R>> {
26
27
    private final Function<? super T, ? extends R> function;
28
29
    private final CompletionStrategy completionStrategy;
30
31
    private final Set<Characteristics> characteristics;
32
33
    private final Dispatcher<R> dispatcher;
34
35
    private AsyncParallelStreamingCollector(
36
      Function<? super T, ? extends R> function,
37
      Dispatcher<R> dispatcher,
38
      boolean ordered) {
39 1 1. <init> : negated conditional → KILLED
        this.completionStrategy = ordered ? CompletionStrategy.ORDERED : CompletionStrategy.UNORDERED;
40
        this.characteristics = switch (completionStrategy) {
41
            case ORDERED -> emptySet();
42
            case UNORDERED -> EnumSet.of(Characteristics.UNORDERED);
43
        };
44
        this.dispatcher = dispatcher;
45
        this.function = function;
46
    }
47
48
    public static <T, R> Collector<T, ?, Stream<R>> from(
49
      Function<? super T, ? extends R> function, Executor executor, int parallelism, boolean ordered) {
50 1 1. from : replaced return value with null for com/pivovarit/collectors/AsyncParallelStreamingCollector::from → KILLED
        return new AsyncParallelStreamingCollector<>(function, new Dispatcher<>(executor, parallelism), ordered);
51
    }
52
53
    public static <T, R> Collector<T, ?, Stream<R>> from(
54
      Function<? super T, ? extends R> function, Executor executor, boolean ordered) {
55 1 1. from : replaced return value with null for com/pivovarit/collectors/AsyncParallelStreamingCollector::from → KILLED
        return new AsyncParallelStreamingCollector<>(function, new Dispatcher<>(executor), ordered);
56
    }
57
58
    @Override
59
    public Supplier<List<CompletableFuture<R>>> supplier() {
60 1 1. supplier : replaced return value with null for com/pivovarit/collectors/AsyncParallelStreamingCollector::supplier → KILLED
        return ArrayList::new;
61
    }
62
63
    @Override
64
    public BiConsumer<List<CompletableFuture<R>>, T> accumulator() {
65 1 1. accumulator : replaced return value with null for com/pivovarit/collectors/AsyncParallelStreamingCollector::accumulator → KILLED
        return (acc, e) -> {
66 1 1. lambda$accumulator$0 : removed call to com/pivovarit/collectors/Dispatcher::start → TIMED_OUT
            dispatcher.start();
67 1 1. lambda$accumulator$1 : replaced return value with null for com/pivovarit/collectors/AsyncParallelStreamingCollector::lambda$accumulator$1 → KILLED
            acc.add(dispatcher.enqueue(() -> function.apply(e)));
68
        };
69
    }
70
71
    @Override
72
    public BinaryOperator<List<CompletableFuture<R>>> combiner() {
73 1 1. combiner : replaced return value with null for com/pivovarit/collectors/AsyncParallelStreamingCollector::combiner → SURVIVED
        return (left, right) -> {
74
            throw new UnsupportedOperationException(
75
              "Using parallel stream with parallel collectors is a bad idea");
76
        };
77
    }
78
79
    @Override
80
    public Function<List<CompletableFuture<R>>, Stream<R>> finisher() {
81 1 1. finisher : replaced return value with null for com/pivovarit/collectors/AsyncParallelStreamingCollector::finisher → KILLED
        return acc -> {
82 1 1. lambda$finisher$0 : removed call to com/pivovarit/collectors/Dispatcher::stop → SURVIVED
            dispatcher.stop();
83 1 1. lambda$finisher$0 : replaced return value with Stream.empty for com/pivovarit/collectors/AsyncParallelStreamingCollector::lambda$finisher$0 → KILLED
            return switch (completionStrategy) {
84
                case ORDERED -> acc.stream().map(CompletableFuture::join);
85
                case UNORDERED -> StreamSupport.stream(new CompletionOrderSpliterator<>(acc), false);
86
            };
87
        };
88
    }
89
90
    @Override
91
    public Set<Characteristics> characteristics() {
92 1 1. characteristics : replaced return value with Collections.emptySet for com/pivovarit/collectors/AsyncParallelStreamingCollector::characteristics → KILLED
        return characteristics;
93
    }
94
95
    record BatchingCollector<T, R>(Function<? super T, ? extends R> task, Executor executor, int parallelism,
96
                                   boolean ordered)
97
      implements Collector<T, ArrayList<T>, Stream<R>> {
98
99
        @Override
100
        public Supplier<ArrayList<T>> supplier() {
101 1 1. supplier : replaced return value with null for com/pivovarit/collectors/AsyncParallelStreamingCollector$BatchingCollector::supplier → KILLED
            return ArrayList::new;
102
        }
103
104
        @Override
105
        public BiConsumer<ArrayList<T>, T> accumulator() {
106 1 1. accumulator : replaced return value with null for com/pivovarit/collectors/AsyncParallelStreamingCollector$BatchingCollector::accumulator → KILLED
            return ArrayList::add;
107
        }
108
109
        @Override
110
        public BinaryOperator<ArrayList<T>> combiner() {
111 1 1. combiner : replaced return value with null for com/pivovarit/collectors/AsyncParallelStreamingCollector$BatchingCollector::combiner → SURVIVED
            return (left, right) -> {
112
                left.addAll(right);
113 1 1. lambda$combiner$0 : replaced return value with null for com/pivovarit/collectors/AsyncParallelStreamingCollector$BatchingCollector::lambda$combiner$0 → NO_COVERAGE
                return left;
114
            };
115
        }
116
117
        @Override
118
        public Function<ArrayList<T>, Stream<R>> finisher() {
119 1 1. finisher : replaced return value with null for com/pivovarit/collectors/AsyncParallelStreamingCollector$BatchingCollector::finisher → KILLED
            return items -> {
120 1 1. lambda$finisher$0 : negated conditional → KILLED
                if (items.size() == parallelism) {
121 1 1. lambda$finisher$0 : replaced return value with Stream.empty for com/pivovarit/collectors/AsyncParallelStreamingCollector$BatchingCollector::lambda$finisher$0 → KILLED
                    return items.stream()
122
                      .collect(new AsyncParallelStreamingCollector<>(task, new Dispatcher<>(executor, parallelism), ordered));
123
                } else {
124 1 1. lambda$finisher$0 : replaced return value with Stream.empty for com/pivovarit/collectors/AsyncParallelStreamingCollector$BatchingCollector::lambda$finisher$0 → KILLED
                    return partitioned(items, parallelism)
125
                      .collect(new AsyncParallelStreamingCollector<>(batching(task), new Dispatcher<>(executor, parallelism), ordered))
126
                      .flatMap(Collection::stream);
127
                }
128
            };
129
        }
130
131
        @Override
132
        public Set<Characteristics> characteristics() {
133
            return emptySet();
134
        }
135
    }
136
}

Mutations

39

1.1
Location : <init>
Killed by : com.pivovarit.collectors.test.GroupingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.GroupingTest]/[test-factory:shouldGroupByClassifierOrdered()]/[dynamic-test:#2]
negated conditional → KILLED

50

1.1
Location : from
Killed by : com.pivovarit.collectors.test.BasicParallelismTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicParallelismTest]/[test-factory:shouldProcessEmptyWithMaxParallelism()]/[dynamic-test:#29]
replaced return value with null for com/pivovarit/collectors/AsyncParallelStreamingCollector::from → KILLED

55

1.1
Location : from
Killed by : com.pivovarit.collectors.test.BasicProcessingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicProcessingTest]/[test-factory:shouldProcessEmpty()]/[dynamic-test:#27]
replaced return value with null for com/pivovarit/collectors/AsyncParallelStreamingCollector::from → KILLED

60

1.1
Location : supplier
Killed by : com.pivovarit.collectors.test.BasicProcessingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicProcessingTest]/[test-factory:shouldProcessEmpty()]/[dynamic-test:#27]
replaced return value with null for com/pivovarit/collectors/AsyncParallelStreamingCollector::supplier → KILLED

65

1.1
Location : accumulator
Killed by : com.pivovarit.collectors.test.BasicProcessingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicProcessingTest]/[test-factory:shouldProcessAllElementsInOrder()]/[dynamic-test:#18]
replaced return value with null for com/pivovarit/collectors/AsyncParallelStreamingCollector::accumulator → KILLED

66

1.1
Location : lambda$accumulator$0
Killed by : none
removed call to com/pivovarit/collectors/Dispatcher::start → TIMED_OUT

67

1.1
Location : lambda$accumulator$1
Killed by : com.pivovarit.collectors.test.BasicProcessingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicProcessingTest]/[test-factory:shouldProcessAllElementsInOrder()]/[dynamic-test:#18]
replaced return value with null for com/pivovarit/collectors/AsyncParallelStreamingCollector::lambda$accumulator$1 → KILLED

73

1.1
Location : combiner
Killed by : none
replaced return value with null for com/pivovarit/collectors/AsyncParallelStreamingCollector::combiner → SURVIVED
Covering tests

81

1.1
Location : finisher
Killed by : com.pivovarit.collectors.test.BasicProcessingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicProcessingTest]/[test-factory:shouldProcessEmpty()]/[dynamic-test:#27]
replaced return value with null for com/pivovarit/collectors/AsyncParallelStreamingCollector::finisher → KILLED

82

1.1
Location : lambda$finisher$0
Killed by : none
removed call to com/pivovarit/collectors/Dispatcher::stop → SURVIVED
Covering tests

83

1.1
Location : lambda$finisher$0
Killed by : com.pivovarit.collectors.test.BasicProcessingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicProcessingTest]/[test-factory:shouldProcessAllElementsInOrder()]/[dynamic-test:#18]
replaced return value with Stream.empty for com/pivovarit/collectors/AsyncParallelStreamingCollector::lambda$finisher$0 → KILLED

92

1.1
Location : characteristics
Killed by : com.pivovarit.collectors.test.BasicProcessingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicProcessingTest]/[test-factory:shouldInterruptOnException()]/[dynamic-test:#23]
replaced return value with Collections.emptySet for com/pivovarit/collectors/AsyncParallelStreamingCollector::characteristics → KILLED

101

1.1
Location : supplier
Killed by : com.pivovarit.collectors.test.BasicParallelismTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicParallelismTest]/[test-factory:shouldProcessEmptyWithMaxParallelism()]/[dynamic-test:#107]
replaced return value with null for com/pivovarit/collectors/AsyncParallelStreamingCollector$BatchingCollector::supplier → KILLED

106

1.1
Location : accumulator
Killed by : com.pivovarit.collectors.test.BasicParallelismTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicParallelismTest]/[test-factory:shouldProcessAllElementsWithMaxParallelism()]/[dynamic-test:#150]
replaced return value with null for com/pivovarit/collectors/AsyncParallelStreamingCollector$BatchingCollector::accumulator → KILLED

111

1.1
Location : combiner
Killed by : none
replaced return value with null for com/pivovarit/collectors/AsyncParallelStreamingCollector$BatchingCollector::combiner → SURVIVED
Covering tests

113

1.1
Location : lambda$combiner$0
Killed by : none
replaced return value with null for com/pivovarit/collectors/AsyncParallelStreamingCollector$BatchingCollector::lambda$combiner$0 → NO_COVERAGE

119

1.1
Location : finisher
Killed by : com.pivovarit.collectors.test.BasicParallelismTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicParallelismTest]/[test-factory:shouldProcessEmptyWithMaxParallelism()]/[dynamic-test:#107]
replaced return value with null for com/pivovarit/collectors/AsyncParallelStreamingCollector$BatchingCollector::finisher → KILLED

120

1.1
Location : lambda$finisher$0
Killed by : com.pivovarit.collectors.test.BatchingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BatchingTest]/[test-factory:shouldProcessOnExactlyNThreads()]/[dynamic-test:#4]
negated conditional → KILLED

121

1.1
Location : lambda$finisher$0
Killed by : com.pivovarit.collectors.test.BasicParallelismTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicParallelismTest]/[test-factory:shouldProcessAllElementsWithMaxParallelism()]/[dynamic-test:#150]
replaced return value with Stream.empty for com/pivovarit/collectors/AsyncParallelStreamingCollector$BatchingCollector::lambda$finisher$0 → KILLED

124

1.1
Location : lambda$finisher$0
Killed by : com.pivovarit.collectors.test.BasicParallelismTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicParallelismTest]/[test-factory:shouldProcessAllElementsWithMaxParallelism()]/[dynamic-test:#55]
replaced return value with Stream.empty for com/pivovarit/collectors/AsyncParallelStreamingCollector$BatchingCollector::lambda$finisher$0 → KILLED

Active mutators

Tests examined


Report generated by PIT 1.22.0