1 | package com.pivovarit.collectors; | |
2 | ||
3 | import java.util.ArrayList; | |
4 | import java.util.Arrays; | |
5 | import java.util.Collection; | |
6 | import java.util.Collections; | |
7 | import java.util.List; | |
8 | import java.util.Set; | |
9 | import java.util.concurrent.CompletableFuture; | |
10 | import java.util.concurrent.Executor; | |
11 | import java.util.function.BiConsumer; | |
12 | import java.util.function.BinaryOperator; | |
13 | import java.util.function.Function; | |
14 | import java.util.function.Supplier; | |
15 | import java.util.stream.Collector; | |
16 | import java.util.stream.Stream; | |
17 | ||
18 | import static com.pivovarit.collectors.BatchingSpliterator.batching; | |
19 | import static com.pivovarit.collectors.BatchingSpliterator.partitioned; | |
20 | import static com.pivovarit.collectors.Dispatcher.getDefaultParallelism; | |
21 | import static java.util.Objects.requireNonNull; | |
22 | import static java.util.concurrent.CompletableFuture.allOf; | |
23 | import static java.util.concurrent.CompletableFuture.supplyAsync; | |
24 | import static java.util.stream.Collectors.collectingAndThen; | |
25 | import static java.util.stream.Collectors.toList; | |
26 | ||
27 | /** | |
28 | * @author Grzegorz Piwowarek | |
29 | */ | |
30 | final class AsyncParallelCollector<T, R, C> | |
31 | implements Collector<T, List<CompletableFuture<R>>, CompletableFuture<C>> { | |
32 | ||
33 | private final Dispatcher<R> dispatcher; | |
34 | private final Function<T, R> mapper; | |
35 | private final Function<Stream<R>, C> processor; | |
36 | ||
37 | private AsyncParallelCollector( | |
38 | Function<T, R> mapper, | |
39 | Dispatcher<R> dispatcher, | |
40 | Function<Stream<R>, C> processor) { | |
41 | this.dispatcher = dispatcher; | |
42 | this.processor = processor; | |
43 | this.mapper = mapper; | |
44 | } | |
45 | ||
46 | @Override | |
47 | public Supplier<List<CompletableFuture<R>>> supplier() { | |
48 |
1
1. supplier : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::supplier → KILLED |
return ArrayList::new; |
49 | } | |
50 | ||
51 | @Override | |
52 | public BinaryOperator<List<CompletableFuture<R>>> combiner() { | |
53 |
1
1. combiner : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::combiner → SURVIVED |
return (left, right) -> { |
54 | throw new UnsupportedOperationException("Using parallel stream with parallel collectors is a bad idea"); | |
55 | }; | |
56 | } | |
57 | ||
58 | @Override | |
59 | public BiConsumer<List<CompletableFuture<R>>, T> accumulator() { | |
60 |
1
1. accumulator : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::accumulator → KILLED |
return (acc, e) -> { |
61 |
1
1. lambda$accumulator$2 : negated conditional → TIMED_OUT |
if (!dispatcher.isRunning()) { |
62 |
1
1. lambda$accumulator$2 : removed call to com/pivovarit/collectors/Dispatcher::start → TIMED_OUT |
dispatcher.start(); |
63 | } | |
64 |
1
1. lambda$null$1 : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::lambda$null$1 → KILLED |
acc.add(dispatcher.enqueue(() -> mapper.apply(e))); |
65 | }; | |
66 | } | |
67 | ||
68 | @Override | |
69 | public Function<List<CompletableFuture<R>>, CompletableFuture<C>> finisher() { | |
70 |
1
1. finisher : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::finisher → KILLED |
return futures -> { |
71 |
1
1. lambda$finisher$3 : removed call to com/pivovarit/collectors/Dispatcher::stop → KILLED |
dispatcher.stop(); |
72 | ||
73 |
1
1. lambda$finisher$3 : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::lambda$finisher$3 → KILLED |
return combine(futures).thenApply(processor); |
74 | }; | |
75 | } | |
76 | ||
77 | @Override | |
78 | public Set<Characteristics> characteristics() { | |
79 | return Collections.emptySet(); | |
80 | } | |
81 | ||
82 | private static <T> CompletableFuture<Stream<T>> combine(List<CompletableFuture<T>> futures) { | |
83 | CompletableFuture<T>[] futuresArray = (CompletableFuture<T>[]) futures.toArray(new CompletableFuture[0]); | |
84 | CompletableFuture<Stream<T>> combined = allOf(futuresArray) | |
85 |
1
1. lambda$combine$4 : replaced return value with Stream.empty for com/pivovarit/collectors/AsyncParallelCollector::lambda$combine$4 → KILLED |
.thenApply(__ -> Arrays.stream(futuresArray).map(CompletableFuture::join)); |
86 | ||
87 | for (CompletableFuture<?> f : futuresArray) { | |
88 | f.exceptionally(ex -> { | |
89 | combined.completeExceptionally(ex); | |
90 | return null; | |
91 | }); | |
92 | } | |
93 | ||
94 |
1
1. combine : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::combine → KILLED |
return combined; |
95 | } | |
96 | ||
97 | static <T, R> Collector<T, ?, CompletableFuture<Stream<R>>> collectingToStream(Function<T, R> mapper, Executor executor) { | |
98 |
1
1. collectingToStream : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::collectingToStream → NO_COVERAGE |
return collectingToStream(mapper, executor, getDefaultParallelism()); |
99 | } | |
100 | ||
101 | static <T, R> Collector<T, ?, CompletableFuture<Stream<R>>> collectingToStream(Function<T, R> mapper, Executor executor, int parallelism) { | |
102 | requireNonNull(executor, "executor can't be null"); | |
103 | requireNonNull(mapper, "mapper can't be null"); | |
104 |
1
1. collectingToStream : removed call to com/pivovarit/collectors/AsyncParallelCollector::requireValidParallelism → KILLED |
requireValidParallelism(parallelism); |
105 | ||
106 |
2
1. collectingToStream : negated conditional → KILLED 2. collectingToStream : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::collectingToStream → KILLED |
return parallelism == 1 |
107 |
1
1. lambda$collectingToStream$6 : replaced return value with Stream.empty for com/pivovarit/collectors/AsyncParallelCollector::lambda$collectingToStream$6 → KILLED |
? asyncCollector(mapper, executor, i -> i) |
108 |
1
1. lambda$collectingToStream$7 : replaced return value with Stream.empty for com/pivovarit/collectors/AsyncParallelCollector::lambda$collectingToStream$7 → KILLED |
: new AsyncParallelCollector<>(mapper, Dispatcher.of(executor, parallelism), t -> t); |
109 | } | |
110 | ||
111 | static <T, R, RR> Collector<T, ?, CompletableFuture<RR>> collectingWithCollector(Collector<R, ?, RR> collector, Function<T, R> mapper, Executor executor) { | |
112 |
1
1. collectingWithCollector : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::collectingWithCollector → NO_COVERAGE |
return collectingWithCollector(collector, mapper, executor, getDefaultParallelism()); |
113 | } | |
114 | ||
115 | static <T, R, RR> Collector<T, ?, CompletableFuture<RR>> collectingWithCollector(Collector<R, ?, RR> collector, Function<T, R> mapper, Executor executor, int parallelism) { | |
116 | requireNonNull(collector, "collector can't be null"); | |
117 | requireNonNull(executor, "executor can't be null"); | |
118 | requireNonNull(mapper, "mapper can't be null"); | |
119 |
1
1. collectingWithCollector : removed call to com/pivovarit/collectors/AsyncParallelCollector::requireValidParallelism → KILLED |
requireValidParallelism(parallelism); |
120 | ||
121 |
2
1. collectingWithCollector : negated conditional → TIMED_OUT 2. collectingWithCollector : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::collectingWithCollector → KILLED |
return parallelism == 1 |
122 |
1
1. lambda$collectingWithCollector$8 : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::lambda$collectingWithCollector$8 → KILLED |
? asyncCollector(mapper, executor, s -> s.collect(collector)) |
123 |
1
1. lambda$collectingWithCollector$9 : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::lambda$collectingWithCollector$9 → KILLED |
: new AsyncParallelCollector<>(mapper, Dispatcher.of(executor, parallelism), s -> s.collect(collector)); |
124 | } | |
125 | ||
126 | static void requireValidParallelism(int parallelism) { | |
127 |
2
1. requireValidParallelism : changed conditional boundary → KILLED 2. requireValidParallelism : negated conditional → KILLED |
if (parallelism < 1) { |
128 | throw new IllegalArgumentException("Parallelism can't be lower than 1"); | |
129 | } | |
130 | } | |
131 | ||
132 | static <T, R, RR> Collector<T, ?, CompletableFuture<RR>> asyncCollector(Function<T, R> mapper, Executor executor, Function<Stream<R>, RR> finisher) { | |
133 |
2
1. asyncCollector : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::asyncCollector → KILLED 2. lambda$asyncCollector$11 : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::lambda$asyncCollector$11 → KILLED |
return collectingAndThen(toList(), list -> supplyAsync(() -> { |
134 | Stream.Builder<R> acc = Stream.builder(); | |
135 | for (T t : list) { | |
136 | acc.add(mapper.apply(t)); | |
137 | } | |
138 |
1
1. lambda$null$10 : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::lambda$null$10 → KILLED |
return finisher.apply(acc.build()); |
139 | }, executor)); | |
140 | } | |
141 | ||
142 | static final class BatchingCollectors { | |
143 | ||
144 | private BatchingCollectors() { | |
145 | } | |
146 | ||
147 | static <T, R, RR> Collector<T, ?, CompletableFuture<RR>> collectingWithCollector(Collector<R, ?, RR> collector, Function<T, R> mapper, Executor executor, int parallelism) { | |
148 | requireNonNull(collector, "collector can't be null"); | |
149 | requireNonNull(executor, "executor can't be null"); | |
150 | requireNonNull(mapper, "mapper can't be null"); | |
151 |
1
1. collectingWithCollector : removed call to com/pivovarit/collectors/AsyncParallelCollector::requireValidParallelism → KILLED |
requireValidParallelism(parallelism); |
152 | ||
153 |
2
1. collectingWithCollector : negated conditional → KILLED 2. collectingWithCollector : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector$BatchingCollectors::collectingWithCollector → KILLED |
return parallelism == 1 |
154 |
1
1. lambda$collectingWithCollector$0 : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector$BatchingCollectors::lambda$collectingWithCollector$0 → KILLED |
? asyncCollector(mapper, executor, s -> s.collect(collector)) |
155 |
1
1. lambda$collectingWithCollector$1 : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector$BatchingCollectors::lambda$collectingWithCollector$1 → KILLED |
: batchingCollector(mapper, executor, parallelism, s -> s.collect(collector)); |
156 | } | |
157 | ||
158 | static <T, R> Collector<T, ?, CompletableFuture<Stream<R>>> collectingToStream( | |
159 | Function<T, R> mapper, | |
160 | Executor executor, int parallelism) { | |
161 | requireNonNull(executor, "executor can't be null"); | |
162 | requireNonNull(mapper, "mapper can't be null"); | |
163 |
1
1. collectingToStream : removed call to com/pivovarit/collectors/AsyncParallelCollector::requireValidParallelism → KILLED |
requireValidParallelism(parallelism); |
164 | ||
165 |
2
1. collectingToStream : negated conditional → KILLED 2. collectingToStream : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector$BatchingCollectors::collectingToStream → KILLED |
return parallelism == 1 |
166 |
1
1. lambda$collectingToStream$2 : replaced return value with Stream.empty for com/pivovarit/collectors/AsyncParallelCollector$BatchingCollectors::lambda$collectingToStream$2 → KILLED |
? asyncCollector(mapper, executor, i -> i) |
167 |
1
1. lambda$collectingToStream$3 : replaced return value with Stream.empty for com/pivovarit/collectors/AsyncParallelCollector$BatchingCollectors::lambda$collectingToStream$3 → KILLED |
: batchingCollector(mapper, executor, parallelism, s -> s); |
168 | } | |
169 | ||
170 | private static <T, R, RR> Collector<T, ?, CompletableFuture<RR>> batchingCollector(Function<T, R> mapper, Executor executor, int parallelism, Function<Stream<R>, RR> finisher) { | |
171 |
1
1. batchingCollector : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector$BatchingCollectors::batchingCollector → KILLED |
return collectingAndThen( |
172 | toList(), | |
173 | list -> { | |
174 | // no sense to repack into batches of size 1 | |
175 |
1
1. lambda$batchingCollector$5 : negated conditional → KILLED |
if (list.size() == parallelism) { |
176 |
1
1. lambda$batchingCollector$5 : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector$BatchingCollectors::lambda$batchingCollector$5 → KILLED |
return list.stream() |
177 | .collect(new AsyncParallelCollector<>( | |
178 | mapper, | |
179 | Dispatcher.of(executor, parallelism), | |
180 | finisher)); | |
181 | } else { | |
182 |
1
1. lambda$batchingCollector$5 : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector$BatchingCollectors::lambda$batchingCollector$5 → KILLED |
return partitioned(list, parallelism) |
183 | .collect(new AsyncParallelCollector<>( | |
184 | batching(mapper), | |
185 | Dispatcher.of(executor, parallelism), | |
186 |
1
1. lambda$null$4 : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector$BatchingCollectors::lambda$null$4 → KILLED |
listStream -> finisher.apply(listStream.flatMap(Collection::stream)))); |
187 | } | |
188 | }); | |
189 | } | |
190 | } | |
191 | } | |
Mutations | ||
48 |
1.1 |
|
53 |
1.1 |
|
60 |
1.1 |
|
61 |
1.1 |
|
62 |
1.1 |
|
64 |
1.1 |
|
70 |
1.1 |
|
71 |
1.1 |
|
73 |
1.1 |
|
85 |
1.1 |
|
94 |
1.1 |
|
98 |
1.1 |
|
104 |
1.1 |
|
106 |
1.1 2.2 |
|
107 |
1.1 |
|
108 |
1.1 |
|
112 |
1.1 |
|
119 |
1.1 |
|
121 |
1.1 2.2 |
|
122 |
1.1 |
|
123 |
1.1 |
|
127 |
1.1 2.2 |
|
133 |
1.1 2.2 |
|
138 |
1.1 |
|
151 |
1.1 |
|
153 |
1.1 2.2 |
|
154 |
1.1 |
|
155 |
1.1 |
|
163 |
1.1 |
|
165 |
1.1 2.2 |
|
166 |
1.1 |
|
167 |
1.1 |
|
171 |
1.1 |
|
175 |
1.1 |
|
176 |
1.1 |
|
182 |
1.1 |
|
186 |
1.1 |