AsyncParallelCollector.java

1
package com.pivovarit.collectors;
2
3
import java.util.ArrayList;
4
import java.util.Collection;
5
import java.util.Collections;
6
import java.util.List;
7
import java.util.Set;
8
import java.util.concurrent.CompletableFuture;
9
import java.util.concurrent.Executor;
10
import java.util.function.BiConsumer;
11
import java.util.function.BinaryOperator;
12
import java.util.function.Function;
13
import java.util.function.Supplier;
14
import java.util.stream.Collector;
15
import java.util.stream.Stream;
16
17
import static com.pivovarit.collectors.BatchingSpliterator.partitioned;
18
import static java.util.concurrent.CompletableFuture.allOf;
19
20
/**
21
 * @author Grzegorz Piwowarek
22
 */
23
final class AsyncParallelCollector<T, R, C>
24
  implements Collector<T, List<CompletableFuture<R>>, CompletableFuture<C>> {
25
26
    private final Dispatcher<R> dispatcher;
27
    private final Function<? super T, ? extends R> task;
28
    private final Function<Stream<R>, C> finalizer;
29
30
    private AsyncParallelCollector(
31
      Function<? super T, ? extends R> task,
32
      Dispatcher<R> dispatcher,
33
      Function<Stream<R>, C> finalizer) {
34
        this.dispatcher = dispatcher;
35
        this.finalizer = finalizer;
36
        this.task = task;
37
    }
38
39
    public static <T,R, C> Collector<T, ?, CompletableFuture<C>> from(
40
      Function<? super T, ? extends R> task,
41
      Function<Stream<R>, C> finalizer,
42
      Executor executor,
43
      int parallelism) {
44 1 1. from : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::from → KILLED
        return new AsyncParallelCollector<>(task, new Dispatcher<>(executor, parallelism), finalizer);
45
    }
46
47
    public static <T,R, C> Collector<T, ?, CompletableFuture<C>> from(
48
      Function<? super T, ? extends R> task,
49
      Function<Stream<R>, C> finalizer,
50
      Executor executor) {
51 1 1. from : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::from → KILLED
        return new AsyncParallelCollector<>(task, new Dispatcher<>(executor), finalizer);
52
    }
53
54
    @Override
55
    public Supplier<List<CompletableFuture<R>>> supplier() {
56 1 1. supplier : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::supplier → KILLED
        return ArrayList::new;
57
    }
58
59
    @Override
60
    public BinaryOperator<List<CompletableFuture<R>>> combiner() {
61 1 1. combiner : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::combiner → SURVIVED
        return (left, right) -> {
62
            throw new UnsupportedOperationException("Using parallel stream with parallel collectors is a bad idea");
63
        };
64
    }
65
66
    @Override
67
    public BiConsumer<List<CompletableFuture<R>>, T> accumulator() {
68 1 1. accumulator : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::accumulator → KILLED
        return (acc, e) -> {
69 1 1. lambda$accumulator$0 : negated conditional → TIMED_OUT
            if (!dispatcher.isRunning()) {
70 1 1. lambda$accumulator$0 : removed call to com/pivovarit/collectors/Dispatcher::start → TIMED_OUT
                dispatcher.start();
71
            }
72 1 1. lambda$accumulator$1 : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::lambda$accumulator$1 → KILLED
            acc.add(dispatcher.enqueue(() -> task.apply(e)));
73
        };
74
    }
75
76
    @Override
77
    public Function<List<CompletableFuture<R>>, CompletableFuture<C>> finisher() {
78 1 1. finisher : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::finisher → KILLED
        return futures -> {
79 1 1. lambda$finisher$0 : removed call to com/pivovarit/collectors/Dispatcher::stop → SURVIVED
            dispatcher.stop();
80
81 1 1. lambda$finisher$0 : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::lambda$finisher$0 → KILLED
            return combine(futures).thenApply(finalizer);
82
        };
83
    }
84
85
    @Override
86
    public Set<Characteristics> characteristics() {
87
        return Collections.emptySet();
88
    }
89
90
    private static <T> CompletableFuture<Stream<T>> combine(List<CompletableFuture<T>> futures) {
91 1 1. lambda$combine$0 : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::lambda$combine$0 → KILLED
        var combined = allOf(futures.toArray(CompletableFuture[]::new))
92 1 1. lambda$combine$1 : replaced return value with Stream.empty for com/pivovarit/collectors/AsyncParallelCollector::lambda$combine$1 → KILLED
          .thenApply(__ -> futures.stream().map(CompletableFuture::join));
93
94
        for (var future : futures) {
95
            future.whenComplete((o, ex) -> {
96 1 1. lambda$combine$2 : negated conditional → SURVIVED
                if (ex != null) {
97
                    combined.completeExceptionally(ex);
98
                }
99
            });
100
        }
101
102 1 1. combine : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::combine → KILLED
        return combined;
103
    }
104
105
    record BatchingCollector<T, R, C>(Function<? super T, ? extends R> task, Function<Stream<R>, C> finalizer, Executor executor, int parallelism)
106
      implements Collector<T, ArrayList<T>, CompletableFuture<C>> {
107
108
        @Override
109
        public Supplier<ArrayList<T>> supplier() {
110 1 1. supplier : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector$BatchingCollector::supplier → KILLED
            return ArrayList::new;
111
        }
112
113
        @Override
114
        public BiConsumer<ArrayList<T>, T> accumulator() {
115 1 1. accumulator : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector$BatchingCollector::accumulator → KILLED
            return ArrayList::add;
116
        }
117
118
        @Override
119
        public BinaryOperator<ArrayList<T>> combiner() {
120 1 1. combiner : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector$BatchingCollector::combiner → SURVIVED
            return (left, right) -> {
121
                left.addAll(right);
122 1 1. lambda$combiner$0 : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector$BatchingCollector::lambda$combiner$0 → NO_COVERAGE
                return left;
123
            };
124
        }
125
126
        @Override
127
        public Function<ArrayList<T>, CompletableFuture<C>> finisher() {
128 1 1. finisher : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector$BatchingCollector::finisher → KILLED
            return items -> {
129 1 1. lambda$finisher$0 : negated conditional → KILLED
                if (items.size() == parallelism) {
130 1 1. lambda$finisher$0 : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector$BatchingCollector::lambda$finisher$0 → KILLED
                    return items.stream()
131
                      .collect(AsyncParallelCollector.from(task, finalizer, executor, parallelism));
132
                } else {
133 1 1. lambda$finisher$0 : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector$BatchingCollector::lambda$finisher$0 → KILLED
                    return partitioned(items, parallelism)
134
                      .collect(AsyncParallelCollector.from(batch -> {
135
                          List<R> list = new ArrayList<>(batch.size());
136
                          for (T t : batch) {
137
                              list.add(task.apply(t));
138
                          }
139 1 1. lambda$finisher$1 : replaced return value with Collections.emptyList for com/pivovarit/collectors/AsyncParallelCollector$BatchingCollector::lambda$finisher$1 → KILLED
                          return list;
140 1 1. lambda$finisher$2 : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector$BatchingCollector::lambda$finisher$2 → KILLED
                      }, r -> finalizer.apply(r.flatMap(Collection::stream)), executor, parallelism));
141
                }
142
            };
143
        }
144
145
        @Override
146
        public Set<Characteristics> characteristics() {
147
            return Collections.emptySet();
148
        }
149
    }
150
}

Mutations

44

1.1
Location : from
Killed by : com.pivovarit.collectors.test.BasicParallelismTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicParallelismTest]/[test-factory:shouldProcessEmptyWithMaxParallelism()]/[dynamic-test:#36]
replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::from → KILLED

51

1.1
Location : from
Killed by : com.pivovarit.collectors.test.BasicProcessingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicProcessingTest]/[test-factory:shouldProcessEmpty()]/[dynamic-test:#5]
replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::from → KILLED

56

1.1
Location : supplier
Killed by : com.pivovarit.collectors.test.BasicProcessingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicProcessingTest]/[test-factory:shouldProcessEmpty()]/[dynamic-test:#5]
replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::supplier → KILLED

61

1.1
Location : combiner
Killed by : none
replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::combiner → SURVIVED
Covering tests

68

1.1
Location : accumulator
Killed by : com.pivovarit.collectors.test.NonBlockingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.NonBlockingTest]/[test-factory:shouldNotBlockTheCallingThread()]/[dynamic-test:#2]
replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::accumulator → KILLED

69

1.1
Location : lambda$accumulator$0
Killed by : none
negated conditional → TIMED_OUT

70

1.1
Location : lambda$accumulator$0
Killed by : none
removed call to com/pivovarit/collectors/Dispatcher::start → TIMED_OUT

72

1.1
Location : lambda$accumulator$1
Killed by : com.pivovarit.collectors.test.BasicProcessingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicProcessingTest]/[test-factory:shouldProcessAllElementsInOrder()]/[dynamic-test:#5]
replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::lambda$accumulator$1 → KILLED

78

1.1
Location : finisher
Killed by : com.pivovarit.collectors.test.BasicProcessingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicProcessingTest]/[test-factory:shouldProcessEmpty()]/[dynamic-test:#5]
replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::finisher → KILLED

79

1.1
Location : lambda$finisher$0
Killed by : none
removed call to com/pivovarit/collectors/Dispatcher::stop → SURVIVED
Covering tests

81

1.1
Location : lambda$finisher$0
Killed by : com.pivovarit.collectors.test.BasicProcessingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicProcessingTest]/[test-factory:shouldProcessEmpty()]/[dynamic-test:#5]
replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::lambda$finisher$0 → KILLED

91

1.1
Location : lambda$combine$0
Killed by : com.pivovarit.collectors.test.BasicProcessingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicProcessingTest]/[test-factory:shouldProcessEmpty()]/[dynamic-test:#5]
replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::lambda$combine$0 → KILLED

92

1.1
Location : lambda$combine$1
Killed by : com.pivovarit.collectors.test.BasicProcessingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicProcessingTest]/[test-factory:shouldProcessAllElementsInOrder()]/[dynamic-test:#5]
replaced return value with Stream.empty for com/pivovarit/collectors/AsyncParallelCollector::lambda$combine$1 → KILLED

96

1.1
Location : lambda$combine$2
Killed by : none
negated conditional → SURVIVED
Covering tests

102

1.1
Location : combine
Killed by : com.pivovarit.collectors.test.BasicProcessingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicProcessingTest]/[test-factory:shouldProcessEmpty()]/[dynamic-test:#5]
replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::combine → KILLED

110

1.1
Location : supplier
Killed by : com.pivovarit.collectors.test.BasicParallelismTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicParallelismTest]/[test-factory:shouldProcessEmptyWithMaxParallelism()]/[dynamic-test:#27]
replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector$BatchingCollector::supplier → KILLED

115

1.1
Location : accumulator
Killed by : com.pivovarit.collectors.test.NonBlockingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.NonBlockingTest]/[test-factory:shouldNotBlockTheCallingThread()]/[dynamic-test:#7]
replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector$BatchingCollector::accumulator → KILLED

120

1.1
Location : combiner
Killed by : none
replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector$BatchingCollector::combiner → SURVIVED
Covering tests

122

1.1
Location : lambda$combiner$0
Killed by : none
replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector$BatchingCollector::lambda$combiner$0 → NO_COVERAGE

128

1.1
Location : finisher
Killed by : com.pivovarit.collectors.test.BasicParallelismTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicParallelismTest]/[test-factory:shouldProcessEmptyWithMaxParallelism()]/[dynamic-test:#27]
replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector$BatchingCollector::finisher → KILLED

129

1.1
Location : lambda$finisher$0
Killed by : com.pivovarit.collectors.test.BatchingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BatchingTest]/[test-factory:shouldProcessOnExactlyNThreads()]/[dynamic-test:#2]
negated conditional → KILLED

130

1.1
Location : lambda$finisher$0
Killed by : com.pivovarit.collectors.test.RejectedExecutionHandlingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.RejectedExecutionHandlingTest]/[test-factory:shouldRejectInvalidRejectedExecutionHandlerFactory()]/[dynamic-test:#3]
replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector$BatchingCollector::lambda$finisher$0 → KILLED

133

1.1
Location : lambda$finisher$0
Killed by : com.pivovarit.collectors.test.BasicParallelismTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicParallelismTest]/[test-factory:shouldProcessEmptyWithMaxParallelism()]/[dynamic-test:#27]
replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector$BatchingCollector::lambda$finisher$0 → KILLED

139

1.1
Location : lambda$finisher$1
Killed by : com.pivovarit.collectors.test.BasicParallelismTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicParallelismTest]/[test-factory:shouldProcessAllElementsWithMaxParallelism()]/[dynamic-test:#38]
replaced return value with Collections.emptyList for com/pivovarit/collectors/AsyncParallelCollector$BatchingCollector::lambda$finisher$1 → KILLED

140

1.1
Location : lambda$finisher$2
Killed by : com.pivovarit.collectors.test.BasicParallelismTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicParallelismTest]/[test-factory:shouldProcessEmptyWithMaxParallelism()]/[dynamic-test:#16]
replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector$BatchingCollector::lambda$finisher$2 → KILLED

Active mutators

Tests examined


Report generated by PIT 1.19.4