1 | package com.pivovarit.collectors; | |
2 | ||
3 | import java.util.ArrayList; | |
4 | import java.util.Collection; | |
5 | import java.util.Collections; | |
6 | import java.util.List; | |
7 | import java.util.Set; | |
8 | import java.util.concurrent.CompletableFuture; | |
9 | import java.util.concurrent.Executor; | |
10 | import java.util.concurrent.Executors; | |
11 | import java.util.function.BiConsumer; | |
12 | import java.util.function.BinaryOperator; | |
13 | import java.util.function.Function; | |
14 | import java.util.function.Supplier; | |
15 | import java.util.stream.Collector; | |
16 | import java.util.stream.Stream; | |
17 | ||
18 | import static com.pivovarit.collectors.BatchingSpliterator.batching; | |
19 | import static com.pivovarit.collectors.BatchingSpliterator.partitioned; | |
20 | import static java.util.Objects.requireNonNull; | |
21 | import static java.util.concurrent.CompletableFuture.allOf; | |
22 | import static java.util.concurrent.CompletableFuture.supplyAsync; | |
23 | import static java.util.stream.Collectors.collectingAndThen; | |
24 | import static java.util.stream.Collectors.toList; | |
25 | ||
26 | /** | |
27 | * @author Grzegorz Piwowarek | |
28 | */ | |
29 | final class AsyncParallelCollector<T, R, C> | |
30 | implements Collector<T, List<CompletableFuture<R>>, CompletableFuture<C>> { | |
31 | ||
32 | private final Dispatcher<R> dispatcher; | |
33 | private final Function<T, R> task; | |
34 | private final Function<Stream<R>, C> finalizer; | |
35 | ||
36 | private AsyncParallelCollector( | |
37 | Function<T, R> task, | |
38 | Dispatcher<R> dispatcher, | |
39 | Function<Stream<R>, C> finalizer) { | |
40 | this.dispatcher = dispatcher; | |
41 | this.finalizer = finalizer; | |
42 | this.task = task; | |
43 | } | |
44 | ||
45 | @Override | |
46 | public Supplier<List<CompletableFuture<R>>> supplier() { | |
47 |
1
1. supplier : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::supplier → KILLED |
return ArrayList::new; |
48 | } | |
49 | ||
50 | @Override | |
51 | public BinaryOperator<List<CompletableFuture<R>>> combiner() { | |
52 |
1
1. combiner : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::combiner → SURVIVED |
return (left, right) -> { |
53 | throw new UnsupportedOperationException("Using parallel stream with parallel collectors is a bad idea"); | |
54 | }; | |
55 | } | |
56 | ||
57 | @Override | |
58 | public BiConsumer<List<CompletableFuture<R>>, T> accumulator() { | |
59 |
1
1. accumulator : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::accumulator → KILLED |
return (acc, e) -> { |
60 |
1
1. lambda$accumulator$2 : negated conditional → TIMED_OUT |
if (!dispatcher.isRunning()) { |
61 |
1
1. lambda$accumulator$2 : removed call to com/pivovarit/collectors/Dispatcher::start → TIMED_OUT |
dispatcher.start(); |
62 | } | |
63 |
1
1. lambda$accumulator$1 : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::lambda$accumulator$1 → KILLED |
acc.add(dispatcher.enqueue(() -> task.apply(e))); |
64 | }; | |
65 | } | |
66 | ||
67 | @Override | |
68 | public Function<List<CompletableFuture<R>>, CompletableFuture<C>> finisher() { | |
69 |
1
1. finisher : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::finisher → KILLED |
return futures -> { |
70 |
1
1. lambda$finisher$3 : removed call to com/pivovarit/collectors/Dispatcher::stop → SURVIVED |
dispatcher.stop(); |
71 | ||
72 |
1
1. lambda$finisher$3 : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::lambda$finisher$3 → KILLED |
return combine(futures).thenApply(finalizer); |
73 | }; | |
74 | } | |
75 | ||
76 | @Override | |
77 | public Set<Characteristics> characteristics() { | |
78 | return Collections.emptySet(); | |
79 | } | |
80 | ||
81 | private static <T> CompletableFuture<Stream<T>> combine(List<CompletableFuture<T>> futures) { | |
82 |
1
1. lambda$combine$4 : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::lambda$combine$4 → KILLED |
var combined = allOf(futures.toArray(CompletableFuture[]::new)) |
83 |
1
1. lambda$combine$5 : replaced return value with Stream.empty for com/pivovarit/collectors/AsyncParallelCollector::lambda$combine$5 → KILLED |
.thenApply(__ -> futures.stream().map(CompletableFuture::join)); |
84 | ||
85 | for (var future : futures) { | |
86 | future.whenComplete((o, ex) -> { | |
87 |
1
1. lambda$combine$6 : negated conditional → SURVIVED |
if (ex != null) { |
88 | combined.completeExceptionally(ex); | |
89 | } | |
90 | }); | |
91 | } | |
92 | ||
93 |
1
1. combine : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::combine → KILLED |
return combined; |
94 | } | |
95 | ||
96 | static <T, R> Collector<T, ?, CompletableFuture<Stream<R>>> collectingToStream(Function<T, R> mapper) { | |
97 | requireNonNull(mapper, "mapper can't be null"); | |
98 | ||
99 |
1
1. collectingToStream : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::collectingToStream → KILLED |
return new AsyncParallelCollector<>(mapper, Dispatcher.virtual(), Function.identity()); |
100 | } | |
101 | ||
102 | static <T, R> Collector<T, ?, CompletableFuture<Stream<R>>> collectingToStream(Function<T, R> mapper, int parallelism) { | |
103 | requireNonNull(mapper, "mapper can't be null"); | |
104 |
1
1. collectingToStream : removed call to com/pivovarit/collectors/AsyncParallelCollector::requireValidParallelism → SURVIVED |
requireValidParallelism(parallelism); |
105 | ||
106 |
1
1. collectingToStream : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::collectingToStream → KILLED |
return new AsyncParallelCollector<>(mapper, Dispatcher.virtual(parallelism), Function.identity()); |
107 | } | |
108 | ||
109 | static <T, R> Collector<T, ?, CompletableFuture<Stream<R>>> collectingToStream(Function<T, R> mapper, Executor executor, int parallelism) { | |
110 | requireNonNull(executor, "executor can't be null"); | |
111 | requireNonNull(mapper, "mapper can't be null"); | |
112 |
1
1. collectingToStream : removed call to com/pivovarit/collectors/AsyncParallelCollector::requireValidParallelism → KILLED |
requireValidParallelism(parallelism); |
113 | ||
114 |
2
1. collectingToStream : negated conditional → KILLED 2. collectingToStream : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::collectingToStream → KILLED |
return parallelism == 1 |
115 |
1
1. lambda$collectingToStream$7 : replaced return value with Stream.empty for com/pivovarit/collectors/AsyncParallelCollector::lambda$collectingToStream$7 → KILLED |
? asyncCollector(mapper, executor, i -> i) |
116 | : new AsyncParallelCollector<>(mapper, Dispatcher.from(executor, parallelism), Function.identity()); | |
117 | } | |
118 | ||
119 | static <T, R, RR> Collector<T, ?, CompletableFuture<RR>> collectingWithCollector(Collector<R, ?, RR> collector, Function<T, R> mapper) { | |
120 | requireNonNull(collector, "collector can't be null"); | |
121 | requireNonNull(mapper, "mapper can't be null"); | |
122 | ||
123 |
2
1. collectingWithCollector : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::collectingWithCollector → KILLED 2. lambda$collectingWithCollector$8 : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::lambda$collectingWithCollector$8 → KILLED |
return new AsyncParallelCollector<>(mapper, Dispatcher.virtual(), s -> s.collect(collector)); |
124 | } | |
125 | ||
126 | static <T, R, RR> Collector<T, ?, CompletableFuture<RR>> collectingWithCollector(Collector<R, ?, RR> collector, Function<T, R> mapper, int parallelism) { | |
127 | requireNonNull(collector, "collector can't be null"); | |
128 | requireNonNull(mapper, "mapper can't be null"); | |
129 |
1
1. collectingWithCollector : removed call to com/pivovarit/collectors/AsyncParallelCollector::requireValidParallelism → SURVIVED |
requireValidParallelism(parallelism); |
130 | ||
131 |
2
1. collectingWithCollector : negated conditional → TIMED_OUT 2. collectingWithCollector : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::collectingWithCollector → KILLED |
return parallelism == 1 |
132 |
1
1. lambda$collectingWithCollector$9 : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::lambda$collectingWithCollector$9 → KILLED |
? asyncCollector(mapper, Executors.newVirtualThreadPerTaskExecutor(), s -> s.collect(collector)) |
133 |
1
1. lambda$collectingWithCollector$10 : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::lambda$collectingWithCollector$10 → KILLED |
: new AsyncParallelCollector<>(mapper, Dispatcher.virtual(parallelism), s -> s.collect(collector)); |
134 | } | |
135 | ||
136 | static <T, R, RR> Collector<T, ?, CompletableFuture<RR>> collectingWithCollector(Collector<R, ?, RR> collector, Function<T, R> mapper, Executor executor, int parallelism) { | |
137 | requireNonNull(collector, "collector can't be null"); | |
138 | requireNonNull(executor, "executor can't be null"); | |
139 | requireNonNull(mapper, "mapper can't be null"); | |
140 |
1
1. collectingWithCollector : removed call to com/pivovarit/collectors/AsyncParallelCollector::requireValidParallelism → KILLED |
requireValidParallelism(parallelism); |
141 | ||
142 |
2
1. collectingWithCollector : negated conditional → TIMED_OUT 2. collectingWithCollector : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::collectingWithCollector → KILLED |
return parallelism == 1 |
143 |
1
1. lambda$collectingWithCollector$11 : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::lambda$collectingWithCollector$11 → KILLED |
? asyncCollector(mapper, executor, s -> s.collect(collector)) |
144 |
1
1. lambda$collectingWithCollector$12 : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::lambda$collectingWithCollector$12 → KILLED |
: new AsyncParallelCollector<>(mapper, Dispatcher.from(executor, parallelism), s -> s.collect(collector)); |
145 | } | |
146 | ||
147 | static void requireValidParallelism(int parallelism) { | |
148 |
2
1. requireValidParallelism : changed conditional boundary → KILLED 2. requireValidParallelism : negated conditional → KILLED |
if (parallelism < 1) { |
149 | throw new IllegalArgumentException("Parallelism can't be lower than 1"); | |
150 | } | |
151 | } | |
152 | ||
153 | static <T, R, RR> Collector<T, ?, CompletableFuture<RR>> asyncCollector(Function<T, R> mapper, Executor executor, Function<Stream<R>, RR> finisher) { | |
154 |
2
1. asyncCollector : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::asyncCollector → KILLED 2. lambda$asyncCollector$14 : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::lambda$asyncCollector$14 → KILLED |
return collectingAndThen(toList(), list -> supplyAsync(() -> { |
155 | Stream.Builder<R> acc = Stream.builder(); | |
156 | for (T t : list) { | |
157 | acc.add(mapper.apply(t)); | |
158 | } | |
159 |
1
1. lambda$asyncCollector$13 : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::lambda$asyncCollector$13 → KILLED |
return finisher.apply(acc.build()); |
160 | }, executor)); | |
161 | } | |
162 | ||
163 | static final class BatchingCollectors { | |
164 | ||
165 | private BatchingCollectors() { | |
166 | } | |
167 | ||
168 | static <T, R, RR> Collector<T, ?, CompletableFuture<RR>> collectingWithCollector(Collector<R, ?, RR> collector, Function<T, R> mapper, Executor executor, int parallelism) { | |
169 | requireNonNull(collector, "collector can't be null"); | |
170 | requireNonNull(executor, "executor can't be null"); | |
171 | requireNonNull(mapper, "mapper can't be null"); | |
172 |
1
1. collectingWithCollector : removed call to com/pivovarit/collectors/AsyncParallelCollector::requireValidParallelism → KILLED |
requireValidParallelism(parallelism); |
173 | ||
174 |
2
1. collectingWithCollector : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector$BatchingCollectors::collectingWithCollector → KILLED 2. collectingWithCollector : negated conditional → KILLED |
return parallelism == 1 |
175 |
1
1. lambda$collectingWithCollector$0 : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector$BatchingCollectors::lambda$collectingWithCollector$0 → KILLED |
? asyncCollector(mapper, executor, s -> s.collect(collector)) |
176 |
1
1. lambda$collectingWithCollector$1 : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector$BatchingCollectors::lambda$collectingWithCollector$1 → KILLED |
: batchingCollector(mapper, executor, parallelism, s -> s.collect(collector)); |
177 | } | |
178 | ||
179 | static <T, R> Collector<T, ?, CompletableFuture<Stream<R>>> collectingToStream( | |
180 | Function<T, R> mapper, | |
181 | Executor executor, int parallelism) { | |
182 | requireNonNull(executor, "executor can't be null"); | |
183 | requireNonNull(mapper, "mapper can't be null"); | |
184 |
1
1. collectingToStream : removed call to com/pivovarit/collectors/AsyncParallelCollector::requireValidParallelism → KILLED |
requireValidParallelism(parallelism); |
185 | ||
186 |
2
1. collectingToStream : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector$BatchingCollectors::collectingToStream → KILLED 2. collectingToStream : negated conditional → KILLED |
return parallelism == 1 |
187 |
1
1. lambda$collectingToStream$2 : replaced return value with Stream.empty for com/pivovarit/collectors/AsyncParallelCollector$BatchingCollectors::lambda$collectingToStream$2 → KILLED |
? asyncCollector(mapper, executor, i -> i) |
188 |
1
1. lambda$collectingToStream$3 : replaced return value with Stream.empty for com/pivovarit/collectors/AsyncParallelCollector$BatchingCollectors::lambda$collectingToStream$3 → KILLED |
: batchingCollector(mapper, executor, parallelism, s -> s); |
189 | } | |
190 | ||
191 | private static <T, R, RR> Collector<T, ?, CompletableFuture<RR>> batchingCollector(Function<T, R> mapper, Executor executor, int parallelism, Function<Stream<R>, RR> finisher) { | |
192 |
1
1. batchingCollector : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector$BatchingCollectors::batchingCollector → KILLED |
return collectingAndThen( |
193 | toList(), | |
194 | list -> { | |
195 | // no sense to repack into batches of size 1 | |
196 |
1
1. lambda$batchingCollector$5 : negated conditional → KILLED |
if (list.size() == parallelism) { |
197 |
1
1. lambda$batchingCollector$5 : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector$BatchingCollectors::lambda$batchingCollector$5 → KILLED |
return list.stream() |
198 | .collect(new AsyncParallelCollector<>( | |
199 | mapper, | |
200 | Dispatcher.from(executor, parallelism), | |
201 | finisher)); | |
202 | } else { | |
203 |
1
1. lambda$batchingCollector$5 : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector$BatchingCollectors::lambda$batchingCollector$5 → KILLED |
return partitioned(list, parallelism) |
204 | .collect(new AsyncParallelCollector<>( | |
205 | batching(mapper), | |
206 | Dispatcher.from(executor, parallelism), | |
207 |
1
1. lambda$batchingCollector$4 : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector$BatchingCollectors::lambda$batchingCollector$4 → KILLED |
listStream -> finisher.apply(listStream.flatMap(Collection::stream)))); |
208 | } | |
209 | }); | |
210 | } | |
211 | } | |
212 | } | |
Mutations | ||
47 |
1.1 |
|
52 |
1.1 |
|
59 |
1.1 |
|
60 |
1.1 |
|
61 |
1.1 |
|
63 |
1.1 |
|
69 |
1.1 |
|
70 |
1.1 |
|
72 |
1.1 |
|
82 |
1.1 |
|
83 |
1.1 |
|
87 |
1.1 |
|
93 |
1.1 |
|
99 |
1.1 |
|
104 |
1.1 |
|
106 |
1.1 |
|
112 |
1.1 |
|
114 |
1.1 2.2 |
|
115 |
1.1 |
|
123 |
1.1 2.2 |
|
129 |
1.1 |
|
131 |
1.1 2.2 |
|
132 |
1.1 |
|
133 |
1.1 |
|
140 |
1.1 |
|
142 |
1.1 2.2 |
|
143 |
1.1 |
|
144 |
1.1 |
|
148 |
1.1 2.2 |
|
154 |
1.1 2.2 |
|
159 |
1.1 |
|
172 |
1.1 |
|
174 |
1.1 2.2 |
|
175 |
1.1 |
|
176 |
1.1 |
|
184 |
1.1 |
|
186 |
1.1 2.2 |
|
187 |
1.1 |
|
188 |
1.1 |
|
192 |
1.1 |
|
196 |
1.1 |
|
197 |
1.1 |
|
203 |
1.1 |
|
207 |
1.1 |