1 | package com.pivovarit.collectors; | |
2 | ||
3 | import java.util.ArrayList; | |
4 | import java.util.Collection; | |
5 | import java.util.Collections; | |
6 | import java.util.List; | |
7 | import java.util.Set; | |
8 | import java.util.concurrent.CompletableFuture; | |
9 | import java.util.concurrent.Executor; | |
10 | import java.util.function.BiConsumer; | |
11 | import java.util.function.BinaryOperator; | |
12 | import java.util.function.Function; | |
13 | import java.util.function.Supplier; | |
14 | import java.util.stream.Collector; | |
15 | import java.util.stream.Stream; | |
16 | ||
17 | import static com.pivovarit.collectors.BatchingSpliterator.batching; | |
18 | import static com.pivovarit.collectors.BatchingSpliterator.partitioned; | |
19 | import static java.util.Objects.requireNonNull; | |
20 | import static java.util.concurrent.CompletableFuture.allOf; | |
21 | import static java.util.concurrent.CompletableFuture.supplyAsync; | |
22 | import static java.util.stream.Collectors.collectingAndThen; | |
23 | import static java.util.stream.Collectors.toList; | |
24 | ||
25 | /** | |
26 | * @author Grzegorz Piwowarek | |
27 | */ | |
28 | final class AsyncParallelCollector<T, R, C> | |
29 | implements Collector<T, List<CompletableFuture<R>>, CompletableFuture<C>> { | |
30 | ||
31 | private final Dispatcher<R> dispatcher; | |
32 | private final Function<T, R> task; | |
33 | private final Function<Stream<R>, C> finalizer; | |
34 | ||
35 | private AsyncParallelCollector( | |
36 | Function<T, R> task, | |
37 | Dispatcher<R> dispatcher, | |
38 | Function<Stream<R>, C> finalizer) { | |
39 | this.dispatcher = dispatcher; | |
40 | this.finalizer = finalizer; | |
41 | this.task = task; | |
42 | } | |
43 | ||
44 | @Override | |
45 | public Supplier<List<CompletableFuture<R>>> supplier() { | |
46 |
1
1. supplier : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::supplier → KILLED |
return ArrayList::new; |
47 | } | |
48 | ||
49 | @Override | |
50 | public BinaryOperator<List<CompletableFuture<R>>> combiner() { | |
51 |
1
1. combiner : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::combiner → SURVIVED |
return (left, right) -> { |
52 | throw new UnsupportedOperationException("Using parallel stream with parallel collectors is a bad idea"); | |
53 | }; | |
54 | } | |
55 | ||
56 | @Override | |
57 | public BiConsumer<List<CompletableFuture<R>>, T> accumulator() { | |
58 |
2
1. accumulator : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::accumulator → KILLED 2. lambda$accumulator$1 : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::lambda$accumulator$1 → KILLED |
return (acc, e) -> acc.add(dispatcher.enqueue(() -> task.apply(e))); |
59 | } | |
60 | ||
61 | @Override | |
62 | public Function<List<CompletableFuture<R>>, CompletableFuture<C>> finisher() { | |
63 |
2
1. lambda$finisher$3 : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::lambda$finisher$3 → KILLED 2. finisher : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::finisher → KILLED |
return futures -> combine(futures).thenApply(finalizer); |
64 | } | |
65 | ||
66 | @Override | |
67 | public Set<Characteristics> characteristics() { | |
68 | return Collections.emptySet(); | |
69 | } | |
70 | ||
71 | private static <T> CompletableFuture<Stream<T>> combine(List<CompletableFuture<T>> futures) { | |
72 |
1
1. lambda$combine$4 : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::lambda$combine$4 → KILLED |
var combined = allOf(futures.toArray(CompletableFuture[]::new)) |
73 |
1
1. lambda$combine$5 : replaced return value with Stream.empty for com/pivovarit/collectors/AsyncParallelCollector::lambda$combine$5 → KILLED |
.thenApply(__ -> futures.stream().map(CompletableFuture::join)); |
74 | ||
75 | for (var future : futures) { | |
76 | future.whenComplete((o, ex) -> { | |
77 |
1
1. lambda$combine$6 : negated conditional → SURVIVED |
if (ex != null) { |
78 | combined.completeExceptionally(ex); | |
79 | } | |
80 | }); | |
81 | } | |
82 | ||
83 |
1
1. combine : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::combine → KILLED |
return combined; |
84 | } | |
85 | ||
86 | static <T, R> Collector<T, ?, CompletableFuture<Stream<R>>> collectingToStream(Function<T, R> mapper) { | |
87 | requireNonNull(mapper, "mapper can't be null"); | |
88 | ||
89 |
1
1. collectingToStream : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::collectingToStream → KILLED |
return new AsyncParallelCollector<>(mapper, Dispatcher.virtual(), Function.identity()); |
90 | } | |
91 | ||
92 | static <T, R> Collector<T, ?, CompletableFuture<Stream<R>>> collectingToStream(Function<T, R> mapper, Executor executor, int parallelism) { | |
93 | requireNonNull(executor, "executor can't be null"); | |
94 | requireNonNull(mapper, "mapper can't be null"); | |
95 |
1
1. collectingToStream : removed call to com/pivovarit/collectors/AsyncParallelCollector::requireValidParallelism → KILLED |
requireValidParallelism(parallelism); |
96 | ||
97 |
2
1. collectingToStream : negated conditional → KILLED 2. collectingToStream : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::collectingToStream → KILLED |
return parallelism == 1 |
98 |
1
1. lambda$collectingToStream$7 : replaced return value with Stream.empty for com/pivovarit/collectors/AsyncParallelCollector::lambda$collectingToStream$7 → KILLED |
? asyncCollector(mapper, executor, i -> i) |
99 | : new AsyncParallelCollector<>(mapper, Dispatcher.from(executor, parallelism), Function.identity()); | |
100 | } | |
101 | ||
102 | static <T, R, RR> Collector<T, ?, CompletableFuture<RR>> collectingWithCollector(Collector<R, ?, RR> collector, Function<T, R> mapper) { | |
103 | requireNonNull(collector, "collector can't be null"); | |
104 | requireNonNull(mapper, "mapper can't be null"); | |
105 | ||
106 |
2
1. collectingWithCollector : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::collectingWithCollector → KILLED 2. lambda$collectingWithCollector$8 : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::lambda$collectingWithCollector$8 → KILLED |
return new AsyncParallelCollector<>(mapper, Dispatcher.virtual(),s -> s.collect(collector)); |
107 | } | |
108 | ||
109 | static <T, R, RR> Collector<T, ?, CompletableFuture<RR>> collectingWithCollector(Collector<R, ?, RR> collector, Function<T, R> mapper, Executor executor, int parallelism) { | |
110 | requireNonNull(collector, "collector can't be null"); | |
111 | requireNonNull(executor, "executor can't be null"); | |
112 | requireNonNull(mapper, "mapper can't be null"); | |
113 |
1
1. collectingWithCollector : removed call to com/pivovarit/collectors/AsyncParallelCollector::requireValidParallelism → KILLED |
requireValidParallelism(parallelism); |
114 | ||
115 |
2
1. collectingWithCollector : negated conditional → TIMED_OUT 2. collectingWithCollector : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::collectingWithCollector → KILLED |
return parallelism == 1 |
116 |
1
1. lambda$collectingWithCollector$9 : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::lambda$collectingWithCollector$9 → KILLED |
? asyncCollector(mapper, executor, s -> s.collect(collector)) |
117 |
1
1. lambda$collectingWithCollector$10 : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::lambda$collectingWithCollector$10 → KILLED |
: new AsyncParallelCollector<>(mapper, Dispatcher.from(executor, parallelism), s -> s.collect(collector)); |
118 | } | |
119 | ||
120 | static void requireValidParallelism(int parallelism) { | |
121 |
2
1. requireValidParallelism : changed conditional boundary → KILLED 2. requireValidParallelism : negated conditional → KILLED |
if (parallelism < 1) { |
122 | throw new IllegalArgumentException("Parallelism can't be lower than 1"); | |
123 | } | |
124 | } | |
125 | ||
126 | static <T, R, RR> Collector<T, ?, CompletableFuture<RR>> asyncCollector(Function<T, R> mapper, Executor executor, Function<Stream<R>, RR> finisher) { | |
127 |
2
1. asyncCollector : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::asyncCollector → KILLED 2. lambda$asyncCollector$12 : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::lambda$asyncCollector$12 → KILLED |
return collectingAndThen(toList(), list -> supplyAsync(() -> { |
128 | Stream.Builder<R> acc = Stream.builder(); | |
129 | for (T t : list) { | |
130 | acc.add(mapper.apply(t)); | |
131 | } | |
132 |
1
1. lambda$asyncCollector$11 : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::lambda$asyncCollector$11 → KILLED |
return finisher.apply(acc.build()); |
133 | }, executor)); | |
134 | } | |
135 | ||
136 | static final class BatchingCollectors { | |
137 | ||
138 | private BatchingCollectors() { | |
139 | } | |
140 | ||
141 | static <T, R, RR> Collector<T, ?, CompletableFuture<RR>> collectingWithCollector(Collector<R, ?, RR> collector, Function<T, R> mapper, Executor executor, int parallelism) { | |
142 | requireNonNull(collector, "collector can't be null"); | |
143 | requireNonNull(executor, "executor can't be null"); | |
144 | requireNonNull(mapper, "mapper can't be null"); | |
145 |
1
1. collectingWithCollector : removed call to com/pivovarit/collectors/AsyncParallelCollector::requireValidParallelism → KILLED |
requireValidParallelism(parallelism); |
146 | ||
147 |
2
1. collectingWithCollector : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector$BatchingCollectors::collectingWithCollector → KILLED 2. collectingWithCollector : negated conditional → KILLED |
return parallelism == 1 |
148 |
1
1. lambda$collectingWithCollector$0 : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector$BatchingCollectors::lambda$collectingWithCollector$0 → KILLED |
? asyncCollector(mapper, executor, s -> s.collect(collector)) |
149 |
1
1. lambda$collectingWithCollector$1 : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector$BatchingCollectors::lambda$collectingWithCollector$1 → KILLED |
: batchingCollector(mapper, executor, parallelism, s -> s.collect(collector)); |
150 | } | |
151 | ||
152 | static <T, R> Collector<T, ?, CompletableFuture<Stream<R>>> collectingToStream( | |
153 | Function<T, R> mapper, | |
154 | Executor executor, int parallelism) { | |
155 | requireNonNull(executor, "executor can't be null"); | |
156 | requireNonNull(mapper, "mapper can't be null"); | |
157 |
1
1. collectingToStream : removed call to com/pivovarit/collectors/AsyncParallelCollector::requireValidParallelism → KILLED |
requireValidParallelism(parallelism); |
158 | ||
159 |
2
1. collectingToStream : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector$BatchingCollectors::collectingToStream → KILLED 2. collectingToStream : negated conditional → KILLED |
return parallelism == 1 |
160 |
1
1. lambda$collectingToStream$2 : replaced return value with Stream.empty for com/pivovarit/collectors/AsyncParallelCollector$BatchingCollectors::lambda$collectingToStream$2 → KILLED |
? asyncCollector(mapper, executor, i -> i) |
161 |
1
1. lambda$collectingToStream$3 : replaced return value with Stream.empty for com/pivovarit/collectors/AsyncParallelCollector$BatchingCollectors::lambda$collectingToStream$3 → KILLED |
: batchingCollector(mapper, executor, parallelism, s -> s); |
162 | } | |
163 | ||
164 | private static <T, R, RR> Collector<T, ?, CompletableFuture<RR>> batchingCollector(Function<T, R> mapper, Executor executor, int parallelism, Function<Stream<R>, RR> finisher) { | |
165 |
1
1. batchingCollector : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector$BatchingCollectors::batchingCollector → KILLED |
return collectingAndThen( |
166 | toList(), | |
167 | list -> { | |
168 | // no sense to repack into batches of size 1 | |
169 |
1
1. lambda$batchingCollector$5 : negated conditional → KILLED |
if (list.size() == parallelism) { |
170 |
1
1. lambda$batchingCollector$5 : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector$BatchingCollectors::lambda$batchingCollector$5 → KILLED |
return list.stream() |
171 | .collect(new AsyncParallelCollector<>( | |
172 | mapper, | |
173 | Dispatcher.from(executor, parallelism), | |
174 | finisher)); | |
175 | } else { | |
176 |
1
1. lambda$batchingCollector$5 : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector$BatchingCollectors::lambda$batchingCollector$5 → KILLED |
return partitioned(list, parallelism) |
177 | .collect(new AsyncParallelCollector<>( | |
178 | batching(mapper), | |
179 | Dispatcher.from(executor, parallelism), | |
180 |
1
1. lambda$batchingCollector$4 : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector$BatchingCollectors::lambda$batchingCollector$4 → KILLED |
listStream -> finisher.apply(listStream.flatMap(Collection::stream)))); |
181 | } | |
182 | }); | |
183 | } | |
184 | } | |
185 | } | |
Mutations | ||
46 |
1.1 |
|
51 |
1.1 |
|
58 |
1.1 2.2 |
|
63 |
1.1 2.2 |
|
72 |
1.1 |
|
73 |
1.1 |
|
77 |
1.1 |
|
83 |
1.1 |
|
89 |
1.1 |
|
95 |
1.1 |
|
97 |
1.1 2.2 |
|
98 |
1.1 |
|
106 |
1.1 2.2 |
|
113 |
1.1 |
|
115 |
1.1 2.2 |
|
116 |
1.1 |
|
117 |
1.1 |
|
121 |
1.1 2.2 |
|
127 |
1.1 2.2 |
|
132 |
1.1 |
|
145 |
1.1 |
|
147 |
1.1 2.2 |
|
148 |
1.1 |
|
149 |
1.1 |
|
157 |
1.1 |
|
159 |
1.1 2.2 |
|
160 |
1.1 |
|
161 |
1.1 |
|
165 |
1.1 |
|
169 |
1.1 |
|
170 |
1.1 |
|
176 |
1.1 |
|
180 |
1.1 |