ParallelCollectors.java

1
package com.pivovarit.collectors;
2
3
import java.util.List;
4
import java.util.concurrent.CompletableFuture;
5
import java.util.concurrent.Executor;
6
import java.util.function.Function;
7
import java.util.stream.Collector;
8
import java.util.stream.Stream;
9
10
/**
11
 * An umbrella class exposing static factory methods for instantiating parallel {@link Collector}s
12
 *
13
 * @author Grzegorz Piwowarek
14
 */
15
public final class ParallelCollectors {
16
17
    private ParallelCollectors() {
18
    }
19
20
21
    /**
22
     * A convenience {@link Collector} used for executing parallel computations using Virtual Threads
23
     * and returning them as a {@link CompletableFuture} containing a result of the application of the user-provided {@link Collector}.
24
     *
25
     * <br>
26
     * Example:
27
     * <pre>{@code
28
     * CompletableFuture<List<String>> result = Stream.of(1, 2, 3)
29
     *   .collect(parallel(i -> foo(i), toList()));
30
     * }</pre>
31
     *
32
     * @param mapper      a transformation to be performed in parallel
33
     * @param collector   the {@code Collector} describing the reduction
34
     * @param <T>         the type of the collected elements
35
     * @param <R>         the result returned by {@code mapper}
36
     * @param <RR>        the reduction result {@code collector}
37
     *
38
     * @return a {@code Collector} which collects all processed elements into a user-provided mutable {@code Collection} in parallel
39
     *
40
     * @since 3.0.0
41
     */
42
    public static <T, R, RR> Collector<T, ?, CompletableFuture<RR>> parallel(Function<T, R> mapper, Collector<R, ?, RR> collector) {
43 1 1. parallel : replaced return value with null for com/pivovarit/collectors/ParallelCollectors::parallel → KILLED
        return AsyncParallelCollector.collectingWithCollector(collector, mapper);
44
    }
45
46
    /**
47
     * A convenience {@link Collector} used for executing parallel computations on a custom {@link Executor}
48
     * and returning them as a {@link CompletableFuture} containing a result of the application of the user-provided {@link Collector}.
49
     *
50
     * <br>
51
     * Example:
52
     * <pre>{@code
53
     * CompletableFuture<List<String>> result = Stream.of(1, 2, 3)
54
     *   .collect(parallel(i -> foo(i), toList(), executor, 2));
55
     * }</pre>
56
     *
57
     * @param mapper      a transformation to be performed in parallel
58
     * @param collector   the {@code Collector} describing the reduction
59
     * @param executor    the {@code Executor} to use for asynchronous execution
60
     * @param <T>         the type of the collected elements
61
     * @param <R>         the result returned by {@code mapper}
62
     * @param <RR>        the reduction result {@code collector}
63
     * @param parallelism the max parallelism level
64
     *
65
     * @return a {@code Collector} which collects all processed elements into a user-provided mutable {@code Collection} in parallel
66
     *
67
     * @since 2.0.0
68
     */
69
    public static <T, R, RR> Collector<T, ?, CompletableFuture<RR>> parallel(Function<T, R> mapper, Collector<R, ?, RR> collector, Executor executor, int parallelism) {
70 1 1. parallel : replaced return value with null for com/pivovarit/collectors/ParallelCollectors::parallel → KILLED
        return AsyncParallelCollector.collectingWithCollector(collector, mapper, executor, parallelism);
71
    }
72
73
    /**
74
     * A convenience {@link Collector} used for executing parallel computations using Virtual Threads
75
     * and returning them as {@link CompletableFuture} containing a {@link Stream} of these elements.
76
     *
77
     * <br><br>
78
     * The collector maintains the order of processed {@link Stream}. Instances should not be reused.
79
     *
80
     * <br>
81
     * Example:
82
     * <pre>{@code
83
     * CompletableFuture<Stream<String>> result = Stream.of(1, 2, 3)
84
     *   .collect(parallel(i -> foo()));
85
     * }</pre>
86
     *
87
     * @param mapper      a transformation to be performed in parallel
88
     * @param <T>         the type of the collected elements
89
     * @param <R>         the result returned by {@code mapper}
90
     *
91
     * @return a {@code Collector} which collects all processed elements into a {@code Stream} in parallel
92
     *
93
     * @since 3.0.0
94
     */
95
    public static <T, R> Collector<T, ?, CompletableFuture<Stream<R>>> parallel(Function<T, R> mapper) {
96 1 1. parallel : replaced return value with null for com/pivovarit/collectors/ParallelCollectors::parallel → KILLED
        return AsyncParallelCollector.collectingToStream(mapper);
97
    }
98
99
    /**
100
     * A convenience {@link Collector} used for executing parallel computations on a custom {@link Executor}
101
     * and returning them as {@link CompletableFuture} containing a {@link Stream} of these elements.
102
     *
103
     * <br><br>
104
     * The collector maintains the order of processed {@link Stream}. Instances should not be reused.
105
     *
106
     * <br>
107
     * Example:
108
     * <pre>{@code
109
     * CompletableFuture<Stream<String>> result = Stream.of(1, 2, 3)
110
     *   .collect(parallel(i -> foo(), executor, 2));
111
     * }</pre>
112
     *
113
     * @param mapper      a transformation to be performed in parallel
114
     * @param executor    the {@code Executor} to use for asynchronous execution
115
     * @param parallelism the max parallelism level
116
     * @param <T>         the type of the collected elements
117
     * @param <R>         the result returned by {@code mapper}
118
     *
119
     * @return a {@code Collector} which collects all processed elements into a {@code Stream} in parallel
120
     *
121
     * @since 2.0.0
122
     */
123
    public static <T, R> Collector<T, ?, CompletableFuture<Stream<R>>> parallel(Function<T, R> mapper, Executor executor, int parallelism) {
124 1 1. parallel : replaced return value with null for com/pivovarit/collectors/ParallelCollectors::parallel → KILLED
        return AsyncParallelCollector.collectingToStream(mapper, executor, parallelism);
125
    }
126
127
    /**
128
     * A convenience {@link Collector} used for executing parallel computations using Virtual Threads
129
     * and returning a {@link Stream} instance returning results as they arrive.
130
     * <p>
131
     * For the parallelism of 1, the stream is executed by the calling thread.
132
     *
133
     * <br>
134
     * Example:
135
     * <pre>{@code
136
     * Stream.of(1, 2, 3)
137
     *   .collect(parallelToStream(i -> foo()))
138
     *   .forEach(System.out::println);
139
     * }</pre>
140
     *
141
     * @param mapper      a transformation to be performed in parallel
142
     * @param <T>         the type of the collected elements
143
     * @param <R>         the result returned by {@code mapper}
144
     *
145
     * @return a {@code Collector} which collects all processed elements into a {@code Stream} in parallel
146
     *
147
     * @since 3.0.0
148
     */
149
    public static <T, R> Collector<T, ?, Stream<R>> parallelToStream(Function<T, R> mapper) {
150 1 1. parallelToStream : replaced return value with null for com/pivovarit/collectors/ParallelCollectors::parallelToStream → KILLED
        return ParallelStreamCollector.streaming(mapper);
151
    }
152
153
    /**
154
     * A convenience {@link Collector} used for executing parallel computations on a custom {@link Executor}
155
     * and returning a {@link Stream} instance returning results as they arrive.
156
     * <p>
157
     * For the parallelism of 1, the stream is executed by the calling thread.
158
     *
159
     * <br>
160
     * Example:
161
     * <pre>{@code
162
     * Stream.of(1, 2, 3)
163
     *   .collect(parallelToStream(i -> foo(), executor, 2))
164
     *   .forEach(System.out::println);
165
     * }</pre>
166
     *
167
     * @param mapper      a transformation to be performed in parallel
168
     * @param executor    the {@code Executor} to use for asynchronous execution
169
     * @param parallelism the max parallelism level
170
     * @param <T>         the type of the collected elements
171
     * @param <R>         the result returned by {@code mapper}
172
     *
173
     * @return a {@code Collector} which collects all processed elements into a {@code Stream} in parallel
174
     *
175
     * @since 2.0.0
176
     */
177
    public static <T, R> Collector<T, ?, Stream<R>> parallelToStream(Function<T, R> mapper, Executor executor, int parallelism) {
178 1 1. parallelToStream : replaced return value with null for com/pivovarit/collectors/ParallelCollectors::parallelToStream → KILLED
        return ParallelStreamCollector.streaming(mapper, executor, parallelism);
179
    }
180
181
    /**
182
     * A convenience {@link Collector} used for executing parallel computations using Virtual Threads
183
     * and returning a {@link Stream} instance returning results as they arrive while maintaining the initial order.
184
     * <p>
185
     * For the parallelism of 1, the stream is executed by the calling thread.
186
     *
187
     * <br>
188
     * Example:
189
     * <pre>{@code
190
     * Stream.of(1, 2, 3)
191
     *   .collect(parallelToOrderedStream(i -> foo()))
192
     *   .forEach(System.out::println);
193
     * }</pre>
194
     *
195
     * @param mapper      a transformation to be performed in parallel
196
     * @param <T>         the type of the collected elements
197
     * @param <R>         the result returned by {@code mapper}
198
     *
199
     * @return a {@code Collector} which collects all processed elements into a {@code Stream} in parallel
200
     *
201
     * @since 3.0.0
202
     */
203
    public static <T, R> Collector<T, ?, Stream<R>> parallelToOrderedStream(Function<T, R> mapper) {
204 1 1. parallelToOrderedStream : replaced return value with null for com/pivovarit/collectors/ParallelCollectors::parallelToOrderedStream → KILLED
        return ParallelStreamCollector.streamingOrdered(mapper);
205
    }
206
207
    /**
208
     * A convenience {@link Collector} used for executing parallel computations on a custom {@link Executor}
209
     * and returning a {@link Stream} instance returning results as they arrive while maintaining the initial order.
210
     * <p>
211
     * For the parallelism of 1, the stream is executed by the calling thread.
212
     *
213
     * <br>
214
     * Example:
215
     * <pre>{@code
216
     * Stream.of(1, 2, 3)
217
     *   .collect(parallelToOrderedStream(i -> foo(), executor, 2))
218
     *   .forEach(System.out::println);
219
     * }</pre>
220
     *
221
     * @param mapper      a transformation to be performed in parallel
222
     * @param executor    the {@code Executor} to use for asynchronous execution
223
     * @param parallelism the max parallelism level
224
     * @param <T>         the type of the collected elements
225
     * @param <R>         the result returned by {@code mapper}
226
     *
227
     * @return a {@code Collector} which collects all processed elements into a {@code Stream} in parallel
228
     *
229
     * @since 2.0.0
230
     */
231
    public static <T, R> Collector<T, ?, Stream<R>> parallelToOrderedStream(Function<T, R> mapper, Executor executor, int parallelism) {
232 1 1. parallelToOrderedStream : replaced return value with null for com/pivovarit/collectors/ParallelCollectors::parallelToOrderedStream → KILLED
        return ParallelStreamCollector.streamingOrdered(mapper, executor, parallelism);
233
    }
234
235
    /**
236
     * A convenience {@code Collector} for collecting a {@code Stream<CompletableFuture<T>>}
237
     * into a {@code CompletableFuture<R>} using a provided {@code Collector<T, ?, R>}
238
     *
239
     * @param collector the {@code Collector} describing the reduction
240
     * @param <T>       the type of the collected elements
241
     * @param <R>       the result of the transformation
242
     *
243
     * @return a {@code Collector} which collects all futures and combines them into a single future
244
     * using the provided downstream {@code Collector}
245
     *
246
     * @since 2.3.0
247
     */
248
    public static <T, R> Collector<CompletableFuture<T>, ?, CompletableFuture<R>> toFuture(Collector<T, ?, R> collector) {
249 1 1. toFuture : replaced return value with null for com/pivovarit/collectors/ParallelCollectors::toFuture → KILLED
        return FutureCollectors.toFuture(collector);
250
    }
251
252
    /**
253
     * A convenience {@code Collector} for collecting a {@code Stream<CompletableFuture<T>>} into a {@code CompletableFuture<List<T>>}
254
     *
255
     * @param <T> the type of the collected elements
256
     *
257
     * @return a {@code Collector} which collects all futures and combines them into a single future
258
     * returning a list of results
259
     *
260
     * @since 2.3.0
261
     */
262
    public static <T> Collector<CompletableFuture<T>, ?, CompletableFuture<List<T>>> toFuture() {
263 1 1. toFuture : replaced return value with null for com/pivovarit/collectors/ParallelCollectors::toFuture → KILLED
        return FutureCollectors.toFuture();
264
    }
265
266
    /**
267
     * A subset of collectors which perform operations in batches and not separately (one object in a thread pool's worker queue represents a batch of operations to be performed by a single thread)
268
     */
269
    public static final class Batching {
270
271
        private Batching() {
272
        }
273
274
        /**
275
         * A convenience {@link Collector} used for executing parallel computations on a custom {@link Executor}
276
         * and returning them as a {@link CompletableFuture} containing a result of the application of the user-provided {@link Collector}.
277
         *
278
         * <br>
279
         * Example:
280
         * <pre>{@code
281
         * CompletableFuture<List<String>> result = Stream.of(1, 2, 3)
282
         *   .collect(parallel(i -> foo(i), toList(), executor, 2));
283
         * }</pre>
284
         *
285
         * @param mapper      a transformation to be performed in parallel
286
         * @param collector   the {@code Collector} describing the reduction
287
         * @param executor    the {@code Executor} to use for asynchronous execution
288
         * @param parallelism the max parallelism level
289
         * @param <T>         the type of the collected elements
290
         * @param <R>         the result returned by {@code mapper}
291
         * @param <RR>        the reduction result {@code collector}
292
         *
293
         * @return a {@code Collector} which collects all processed elements into a user-provided mutable {@code Collection} in parallel
294
         *
295
         * @since 2.1.0
296
         */
297
        public static <T, R, RR> Collector<T, ?, CompletableFuture<RR>> parallel(Function<T, R> mapper, Collector<R, ?, RR> collector, Executor executor, int parallelism) {
298 1 1. parallel : replaced return value with null for com/pivovarit/collectors/ParallelCollectors$Batching::parallel → KILLED
            return AsyncParallelCollector.BatchingCollectors
299
              .collectingWithCollector(collector, mapper, executor, parallelism);
300
        }
301
302
        /**
303
         * A convenience {@link Collector} used for executing parallel computations on a custom {@link Executor}
304
         * and returning them as {@link CompletableFuture} containing a {@link Stream} of these elements.
305
         *
306
         * <br><br>
307
         * The collector maintains the order of processed {@link Stream}. Instances should not be reused.
308
         *
309
         * <br>
310
         * Example:
311
         * <pre>{@code
312
         * CompletableFuture<Stream<String>> result = Stream.of(1, 2, 3)
313
         *   .collect(parallel(i -> foo(), executor, 2));
314
         * }</pre>
315
         *
316
         * @param mapper      a transformation to be performed in parallel
317
         * @param executor    the {@code Executor} to use for asynchronous execution
318
         * @param parallelism the max parallelism level
319
         * @param <T>         the type of the collected elements
320
         * @param <R>         the result returned by {@code mapper}
321
         *
322
         * @return a {@code Collector} which collects all processed elements into a {@code Stream} in parallel
323
         *
324
         * @since 2.1.0
325
         */
326
        public static <T, R> Collector<T, ?, CompletableFuture<Stream<R>>> parallel(Function<T, R> mapper, Executor executor, int parallelism) {
327 1 1. parallel : replaced return value with null for com/pivovarit/collectors/ParallelCollectors$Batching::parallel → KILLED
            return AsyncParallelCollector.BatchingCollectors.collectingToStream(mapper, executor, parallelism);
328
        }
329
330
        /**
331
         * A convenience {@link Collector} used for executing parallel computations on a custom {@link Executor}
332
         * and returning a {@link Stream} instance returning results as they arrive.
333
         * <p>
334
         * For the parallelism of 1, the stream is executed by the calling thread.
335
         *
336
         * <br>
337
         * Example:
338
         * <pre>{@code
339
         * Stream.of(1, 2, 3)
340
         *   .collect(parallelToStream(i -> foo(), executor, 2))
341
         *   .forEach(System.out::println);
342
         * }</pre>
343
         *
344
         * @param mapper      a transformation to be performed in parallel
345
         * @param executor    the {@code Executor} to use for asynchronous execution
346
         * @param parallelism the max parallelism level
347
         * @param <T>         the type of the collected elements
348
         * @param <R>         the result returned by {@code mapper}
349
         *
350
         * @return a {@code Collector} which collects all processed elements into a {@code Stream} in parallel
351
         *
352
         * @since 2.1.0
353
         */
354
        public static <T, R> Collector<T, ?, Stream<R>> parallelToStream(Function<T, R> mapper, Executor executor, int parallelism) {
355 1 1. parallelToStream : replaced return value with null for com/pivovarit/collectors/ParallelCollectors$Batching::parallelToStream → KILLED
            return ParallelStreamCollector.BatchingCollectors.streaming(mapper, executor, parallelism);
356
        }
357
358
        /**
359
         * A convenience {@link Collector} used for executing parallel computations on a custom {@link Executor}
360
         * and returning a {@link Stream} instance returning results as they arrive while maintaining the initial order.
361
         * <p>
362
         * For the parallelism of 1, the stream is executed by the calling thread.
363
         *
364
         * <br>
365
         * Example:
366
         * <pre>{@code
367
         * Stream.of(1, 2, 3)
368
         *   .collect(parallelToOrderedStream(i -> foo(), executor, 2))
369
         *   .forEach(System.out::println);
370
         * }</pre>
371
         *
372
         * @param mapper      a transformation to be performed in parallel
373
         * @param executor    the {@code Executor} to use for asynchronous execution
374
         * @param parallelism the max parallelism level
375
         * @param <T>         the type of the collected elements
376
         * @param <R>         the result returned by {@code mapper}
377
         *
378
         * @return a {@code Collector} which collects all processed elements into a {@code Stream} in parallel
379
         *
380
         * @since 2.1.0
381
         */
382
        public static <T, R> Collector<T, ?, Stream<R>> parallelToOrderedStream(Function<T, R> mapper, Executor executor, int parallelism) {
383 1 1. parallelToOrderedStream : replaced return value with null for com/pivovarit/collectors/ParallelCollectors$Batching::parallelToOrderedStream → KILLED
            return ParallelStreamCollector.BatchingCollectors.streamingOrdered(mapper, executor, parallelism);
384
        }
385
    }
386
}

Mutations

43

1.1
Location : parallel
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:collectors()]/[dynamic-test:#7]
replaced return value with null for com/pivovarit/collectors/ParallelCollectors::parallel → KILLED

70

1.1
Location : parallel
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:collectors()]/[dynamic-test:#70]
replaced return value with null for com/pivovarit/collectors/ParallelCollectors::parallel → KILLED

96

1.1
Location : parallel
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:collectors()]/[dynamic-test:#42]
replaced return value with null for com/pivovarit/collectors/ParallelCollectors::parallel → KILLED

124

1.1
Location : parallel
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:collectors()]/[dynamic-test:#101]
replaced return value with null for com/pivovarit/collectors/ParallelCollectors::parallel → KILLED

150

1.1
Location : parallelToStream
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_collectors()]/[dynamic-test:#5]
replaced return value with null for com/pivovarit/collectors/ParallelCollectors::parallelToStream → KILLED

178

1.1
Location : parallelToStream
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_collectors()]/[dynamic-test:#22]
replaced return value with null for com/pivovarit/collectors/ParallelCollectors::parallelToStream → KILLED

204

1.1
Location : parallelToOrderedStream
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_collectors()]/[dynamic-test:#11]
replaced return value with null for com/pivovarit/collectors/ParallelCollectors::parallelToOrderedStream → KILLED

232

1.1
Location : parallelToOrderedStream
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_collectors()]/[dynamic-test:#34]
replaced return value with null for com/pivovarit/collectors/ParallelCollectors::parallelToOrderedStream → KILLED

249

1.1
Location : toFuture
Killed by : com.pivovarit.collectors.FutureCollectorsTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FutureCollectorsTest]/[method:shouldCollectToList()]
replaced return value with null for com/pivovarit/collectors/ParallelCollectors::toFuture → KILLED

263

1.1
Location : toFuture
Killed by : com.pivovarit.collectors.FutureCollectorsTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FutureCollectorsTest]/[method:shouldCollect()]
replaced return value with null for com/pivovarit/collectors/ParallelCollectors::toFuture → KILLED

298

1.1
Location : parallel
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:batching_collectors()]/[dynamic-test:#40]
replaced return value with null for com/pivovarit/collectors/ParallelCollectors$Batching::parallel → KILLED

327

1.1
Location : parallel
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:batching_collectors()]/[dynamic-test:#57]
replaced return value with null for com/pivovarit/collectors/ParallelCollectors$Batching::parallel → KILLED

355

1.1
Location : parallelToStream
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_batching_collectors()]/[dynamic-test:#32]
replaced return value with null for com/pivovarit/collectors/ParallelCollectors$Batching::parallelToStream → KILLED

383

1.1
Location : parallelToOrderedStream
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_batching_collectors()]/[dynamic-test:#45]
replaced return value with null for com/pivovarit/collectors/ParallelCollectors$Batching::parallelToOrderedStream → KILLED

Active mutators

Tests examined


Report generated by PIT 1.16.0