ParallelCollectors.java

1
package com.pivovarit.collectors;
2
3
import java.util.List;
4
import java.util.concurrent.CompletableFuture;
5
import java.util.concurrent.Executor;
6
import java.util.function.Function;
7
import java.util.stream.Collector;
8
import java.util.stream.Stream;
9
10
/**
11
 * An umbrella class exposing static factory methods for instantiating parallel {@link Collector}s
12
 *
13
 * @author Grzegorz Piwowarek
14
 */
15
public final class ParallelCollectors {
16
17
    private ParallelCollectors() {
18
    }
19
20
    /**
21
     * A convenience {@link Collector} used for executing parallel computations on a custom {@link Executor}
22
     * and returning them as a {@link CompletableFuture} containing a result of the application of the user-provided {@link Collector}.
23
     *
24
     * <br><br>
25
     * The maximum parallelism level defaults to {@code Runtime.availableProcessors() - 1}
26
     *
27
     * <br>
28
     * Example:
29
     * <pre>{@code
30
     * CompletableFuture<List<String>> result = Stream.of(1, 2, 3)
31
     *   .collect(parallel(i -> foo(i), toList(), executor));
32
     * }</pre>
33
     *
34
     * @param mapper    a transformation to be performed in parallel
35
     * @param collector the {@code Collector} describing the reduction
36
     * @param executor  the {@code Executor} to use for asynchronous execution
37
     * @param <T>       the type of the collected elements
38
     * @param <R>       the result returned by {@code mapper}
39
     * @param <RR>      the reduction result {@code collector}
40
     *
41
     * @return a {@code Collector} which collects all processed elements into a user-provided mutable {@code Collection} in parallel
42
     *
43
     * @since 2.0.0
44
     * @deprecated use {@link ParallelCollectors#parallel(Function, Collector, Executor, int)}
45
     */
46
    public static <T, R, RR> Collector<T, ?, CompletableFuture<RR>> parallel(Function<T, R> mapper, Collector<R, ?, RR> collector, Executor executor) {
47 1 1. parallel : replaced return value with null for com/pivovarit/collectors/ParallelCollectors::parallel → NO_COVERAGE
        return AsyncParallelCollector.collectingWithCollector(collector, mapper, executor);
48
    }
49
50
    /**
51
     * A convenience {@link Collector} used for executing parallel computations on a custom {@link Executor}
52
     * and returning them as a {@link CompletableFuture} containing a result of the application of the user-provided {@link Collector}.
53
     *
54
     * <br>
55
     * Example:
56
     * <pre>{@code
57
     * CompletableFuture<List<String>> result = Stream.of(1, 2, 3)
58
     *   .collect(parallel(i -> foo(i), toList(), executor, 2));
59
     * }</pre>
60
     *
61
     * @param mapper      a transformation to be performed in parallel
62
     * @param collector   the {@code Collector} describing the reduction
63
     * @param executor    the {@code Executor} to use for asynchronous execution
64
     * @param <T>         the type of the collected elements
65
     * @param <R>         the result returned by {@code mapper}
66
     * @param <RR>        the reduction result {@code collector}
67
     * @param parallelism the max parallelism level
68
     *
69
     * @return a {@code Collector} which collects all processed elements into a user-provided mutable {@code Collection} in parallel
70
     *
71
     * @since 2.0.0
72
     */
73
    public static <T, R, RR> Collector<T, ?, CompletableFuture<RR>> parallel(Function<T, R> mapper, Collector<R, ?, RR> collector, Executor executor, int parallelism) {
74 1 1. parallel : replaced return value with null for com/pivovarit/collectors/ParallelCollectors::parallel → KILLED
        return AsyncParallelCollector.collectingWithCollector(collector, mapper, executor, parallelism);
75
    }
76
77
    /**
78
     * A convenience {@link Collector} used for executing parallel computations on a custom {@link Executor}
79
     * and returning them as {@link CompletableFuture} containing a {@link Stream} of these elements
80
     *
81
     * <br><br>
82
     * The max parallelism level defaults to {@code Runtime.availableProcessors() - 1} but not less than 4
83
     *
84
     * <br><br>
85
     * The collector maintains the order of processed {@link Stream}. Instances should not be reused.
86
     *
87
     * <br>
88
     * Example:
89
     * <pre>{@code
90
     * CompletableFuture<Stream<String>> result = Stream.of(1, 2, 3)
91
     *   .collect(parallel(i -> foo(), executor));
92
     * }</pre>
93
     *
94
     * @param mapper   a transformation to be performed in parallel
95
     * @param executor the {@code Executor} to use for asynchronous execution
96
     * @param <T>      the type of the collected elements
97
     * @param <R>      the result returned by {@code mapper}
98
     *
99
     * @return a {@code Collector} which collects all processed elements into a {@code Stream} in parallel
100
     *
101
     * @since 2.0.0
102
     * @deprecated use {@link ParallelCollectors#parallel(Function, Executor, int)}
103
     */
104
    public static <T, R> Collector<T, ?, CompletableFuture<Stream<R>>> parallel(Function<T, R> mapper, Executor executor) {
105 1 1. parallel : replaced return value with null for com/pivovarit/collectors/ParallelCollectors::parallel → NO_COVERAGE
        return AsyncParallelCollector.collectingToStream(mapper, executor);
106
    }
107
108
    /**
109
     * A convenience {@link Collector} used for executing parallel computations on a custom {@link Executor}
110
     * and returning them as {@link CompletableFuture} containing a {@link Stream} of these elements.
111
     *
112
     * <br><br>
113
     * The collector maintains the order of processed {@link Stream}. Instances should not be reused.
114
     *
115
     * <br>
116
     * Example:
117
     * <pre>{@code
118
     * CompletableFuture<Stream<String>> result = Stream.of(1, 2, 3)
119
     *   .collect(parallel(i -> foo(), executor, 2));
120
     * }</pre>
121
     *
122
     * @param mapper      a transformation to be performed in parallel
123
     * @param executor    the {@code Executor} to use for asynchronous execution
124
     * @param parallelism the max parallelism level
125
     * @param <T>         the type of the collected elements
126
     * @param <R>         the result returned by {@code mapper}
127
     *
128
     * @return a {@code Collector} which collects all processed elements into a {@code Stream} in parallel
129
     *
130
     * @since 2.0.0
131
     */
132
    public static <T, R> Collector<T, ?, CompletableFuture<Stream<R>>> parallel(Function<T, R> mapper, Executor executor, int parallelism) {
133 1 1. parallel : replaced return value with null for com/pivovarit/collectors/ParallelCollectors::parallel → KILLED
        return AsyncParallelCollector.collectingToStream(mapper, executor, parallelism);
134
    }
135
136
    /**
137
     * A convenience {@link Collector} used for executing parallel computations on a custom {@link Executor}
138
     * and returning a {@link Stream} instance returning results in completion order
139
     * <p>
140
     * For the parallelism of 1, the stream is executed by the calling thread.
141
     *
142
     * <br><br>
143
     * The max parallelism level defaults to {@code Runtime.availableProcessors() - 1} but not less than 4
144
     *
145
     * <br><br>
146
     * Instances should not be reused.
147
     *
148
     * <br>
149
     * Example:
150
     * <pre>{@code
151
     * List<String> result = Stream.of(1, 2, 3)
152
     *   .collect(parallelToStream(i -> foo(), executor))
153
     *   .collect(toList());
154
     * }</pre>
155
     *
156
     * @param mapper   a transformation to be performed in parallel
157
     * @param executor the {@code Executor} to use for asynchronous execution
158
     * @param <T>      the type of the collected elements
159
     * @param <R>      the result returned by {@code mapper}
160
     *
161
     * @return a {@code Collector} which collects all processed elements into a {@code Stream} in parallel
162
     *
163
     * @since 2.0.0
164
     * @deprecated use {@link ParallelCollectors#parallelToStream(Function, Executor, int)}
165
     */
166
    public static <T, R> Collector<T, ?, Stream<R>> parallelToStream(Function<T, R> mapper, Executor executor) {
167 1 1. parallelToStream : replaced return value with null for com/pivovarit/collectors/ParallelCollectors::parallelToStream → NO_COVERAGE
        return ParallelStreamCollector.streaming(mapper, executor);
168
    }
169
170
    /**
171
     * A convenience {@link Collector} used for executing parallel computations on a custom {@link Executor}
172
     * and returning a {@link Stream} instance returning results as they arrive.
173
     * <p>
174
     * For the parallelism of 1, the stream is executed by the calling thread.
175
     *
176
     * <br>
177
     * Example:
178
     * <pre>{@code
179
     * Stream.of(1, 2, 3)
180
     *   .collect(parallelToStream(i -> foo(), executor, 2))
181
     *   .forEach(System.out::println);
182
     * }</pre>
183
     *
184
     * @param mapper      a transformation to be performed in parallel
185
     * @param executor    the {@code Executor} to use for asynchronous execution
186
     * @param parallelism the max parallelism level
187
     * @param <T>         the type of the collected elements
188
     * @param <R>         the result returned by {@code mapper}
189
     *
190
     * @return a {@code Collector} which collects all processed elements into a {@code Stream} in parallel
191
     *
192
     * @since 2.0.0
193
     */
194
    public static <T, R> Collector<T, ?, Stream<R>> parallelToStream(Function<T, R> mapper, Executor executor, int parallelism) {
195 1 1. parallelToStream : replaced return value with null for com/pivovarit/collectors/ParallelCollectors::parallelToStream → KILLED
        return ParallelStreamCollector.streaming(mapper, executor, parallelism);
196
    }
197
198
    /**
199
     * A convenience {@link Collector} used for executing parallel computations on a custom {@link Executor}
200
     * and returning a {@link Stream} instance returning results as they arrive while maintaining the initial order.
201
     * <p>
202
     * For the parallelism of 1, the stream is executed by the calling thread.
203
     *
204
     * <br><br>
205
     * The max parallelism level defaults to {@code Runtime.availableProcessors() - 1} but not less than 4
206
     *
207
     * <br><br>
208
     * Instances should not be reused.
209
     * <br><br>
210
     * Example:
211
     * <pre>{@code
212
     * Stream.of(1, 2, 3)
213
     *   .collect(parallelToOrderedStream(i -> foo(), executor))
214
     *   .forEach(System.out::println);
215
     * }</pre>
216
     *
217
     * @param mapper   a transformation to be performed in parallel
218
     * @param executor the {@code Executor} to use for asynchronous execution
219
     * @param <T>      the type of the collected elements
220
     * @param <R>      the result returned by {@code mapper}
221
     *
222
     * @return a {@code Collector} which collects all processed elements into a {@code Stream} in parallel
223
     *
224
     * @since 2.0.0
225
     * @deprecated use {@link ParallelCollectors#parallelToOrderedStream(Function, Executor, int)}
226
     */
227
    public static <T, R> Collector<T, ?, Stream<R>> parallelToOrderedStream(Function<T, R> mapper, Executor executor) {
228 1 1. parallelToOrderedStream : replaced return value with null for com/pivovarit/collectors/ParallelCollectors::parallelToOrderedStream → NO_COVERAGE
        return ParallelStreamCollector.streamingOrdered(mapper, executor);
229
    }
230
231
    /**
232
     * A convenience {@link Collector} used for executing parallel computations on a custom {@link Executor}
233
     * and returning a {@link Stream} instance returning results as they arrive while maintaining the initial order.
234
     * <p>
235
     * For the parallelism of 1, the stream is executed by the calling thread.
236
     *
237
     * <br>
238
     * Example:
239
     * <pre>{@code
240
     * Stream.of(1, 2, 3)
241
     *   .collect(parallelToOrderedStream(i -> foo(), executor, 2))
242
     *   .forEach(System.out::println);
243
     * }</pre>
244
     *
245
     * @param mapper      a transformation to be performed in parallel
246
     * @param executor    the {@code Executor} to use for asynchronous execution
247
     * @param parallelism the max parallelism level
248
     * @param <T>         the type of the collected elements
249
     * @param <R>         the result returned by {@code mapper}
250
     *
251
     * @return a {@code Collector} which collects all processed elements into a {@code Stream} in parallel
252
     *
253
     * @since 2.0.0
254
     */
255
    public static <T, R> Collector<T, ?, Stream<R>> parallelToOrderedStream(Function<T, R> mapper, Executor executor, int parallelism) {
256 1 1. parallelToOrderedStream : replaced return value with null for com/pivovarit/collectors/ParallelCollectors::parallelToOrderedStream → KILLED
        return ParallelStreamCollector.streamingOrdered(mapper, executor, parallelism);
257
    }
258
259
    /**
260
     * A convenience {@code Collector} for collecting a {@code Stream<CompletableFuture<T>>}
261
     * into a {@code CompletableFuture<R>} using a provided {@code Collector<T, ?, R>}
262
     *
263
     * @param collector the {@code Collector} describing the reduction
264
     * @param <T>       the type of the collected elements
265
     * @param <R>       the result of the transformation
266
     *
267
     * @return a {@code Collector} which collects all futures and combines them into a single future
268
     * using the provided downstream {@code Collector}
269
     *
270
     * @since 2.3.0
271
     */
272
    public static <T, R> Collector<CompletableFuture<T>, ?, CompletableFuture<R>> toFuture(Collector<T, ?, R> collector) {
273 1 1. toFuture : replaced return value with null for com/pivovarit/collectors/ParallelCollectors::toFuture → KILLED
        return FutureCollectors.toFuture(collector);
274
    }
275
276
    /**
277
     * A convenience {@code Collector} for collecting a {@code Stream<CompletableFuture<T>>} into a {@code CompletableFuture<List<T>>}
278
     *
279
     * @param <T> the type of the collected elements
280
     *
281
     * @return a {@code Collector} which collects all futures and combines them into a single future
282
     * returning a list of results
283
     *
284
     * @since 2.3.0
285
     */
286
    public static <T> Collector<CompletableFuture<T>, ?, CompletableFuture<List<T>>> toFuture() {
287 1 1. toFuture : replaced return value with null for com/pivovarit/collectors/ParallelCollectors::toFuture → KILLED
        return FutureCollectors.toFuture();
288
    }
289
290
    /**
291
     * A subset of collectors which perform operations in batches and not separately (one object in a thread pool's worker queue represents a batch of operations to be performed by a single thread)
292
     */
293
    public static final class Batching {
294
295
        private Batching() {
296
        }
297
298
        /**
299
         * A convenience {@link Collector} used for executing parallel computations on a custom {@link Executor}
300
         * and returning them as a {@link CompletableFuture} containing a result of the application of the user-provided {@link Collector}.
301
         *
302
         * <br>
303
         * Example:
304
         * <pre>{@code
305
         * CompletableFuture<List<String>> result = Stream.of(1, 2, 3)
306
         *   .collect(parallel(i -> foo(i), toList(), executor, 2));
307
         * }</pre>
308
         *
309
         * @param mapper      a transformation to be performed in parallel
310
         * @param collector   the {@code Collector} describing the reduction
311
         * @param executor    the {@code Executor} to use for asynchronous execution
312
         * @param parallelism the max parallelism level
313
         * @param <T>         the type of the collected elements
314
         * @param <R>         the result returned by {@code mapper}
315
         * @param <RR>        the reduction result {@code collector}
316
         *
317
         * @return a {@code Collector} which collects all processed elements into a user-provided mutable {@code Collection} in parallel
318
         *
319
         * @since 2.1.0
320
         */
321
        public static <T, R, RR> Collector<T, ?, CompletableFuture<RR>> parallel(Function<T, R> mapper, Collector<R, ?, RR> collector, Executor executor, int parallelism) {
322 1 1. parallel : replaced return value with null for com/pivovarit/collectors/ParallelCollectors$Batching::parallel → KILLED
            return AsyncParallelCollector.BatchingCollectors
323
              .collectingWithCollector(collector, mapper, executor, parallelism);
324
        }
325
326
        /**
327
         * A convenience {@link Collector} used for executing parallel computations on a custom {@link Executor}
328
         * and returning them as {@link CompletableFuture} containing a {@link Stream} of these elements.
329
         *
330
         * <br><br>
331
         * The collector maintains the order of processed {@link Stream}. Instances should not be reused.
332
         *
333
         * <br>
334
         * Example:
335
         * <pre>{@code
336
         * CompletableFuture<Stream<String>> result = Stream.of(1, 2, 3)
337
         *   .collect(parallel(i -> foo(), executor, 2));
338
         * }</pre>
339
         *
340
         * @param mapper      a transformation to be performed in parallel
341
         * @param executor    the {@code Executor} to use for asynchronous execution
342
         * @param parallelism the max parallelism level
343
         * @param <T>         the type of the collected elements
344
         * @param <R>         the result returned by {@code mapper}
345
         *
346
         * @return a {@code Collector} which collects all processed elements into a {@code Stream} in parallel
347
         *
348
         * @since 2.1.0
349
         */
350
        public static <T, R> Collector<T, ?, CompletableFuture<Stream<R>>> parallel(Function<T, R> mapper, Executor executor, int parallelism) {
351 1 1. parallel : replaced return value with null for com/pivovarit/collectors/ParallelCollectors$Batching::parallel → KILLED
            return AsyncParallelCollector.BatchingCollectors.collectingToStream(mapper, executor, parallelism);
352
        }
353
354
        /**
355
         * A convenience {@link Collector} used for executing parallel computations on a custom {@link Executor}
356
         * and returning a {@link Stream} instance returning results as they arrive.
357
         * <p>
358
         * For the parallelism of 1, the stream is executed by the calling thread.
359
         *
360
         * <br>
361
         * Example:
362
         * <pre>{@code
363
         * Stream.of(1, 2, 3)
364
         *   .collect(parallelToStream(i -> foo(), executor, 2))
365
         *   .forEach(System.out::println);
366
         * }</pre>
367
         *
368
         * @param mapper      a transformation to be performed in parallel
369
         * @param executor    the {@code Executor} to use for asynchronous execution
370
         * @param parallelism the max parallelism level
371
         * @param <T>         the type of the collected elements
372
         * @param <R>         the result returned by {@code mapper}
373
         *
374
         * @return a {@code Collector} which collects all processed elements into a {@code Stream} in parallel
375
         *
376
         * @since 2.1.0
377
         */
378
        public static <T, R> Collector<T, ?, Stream<R>> parallelToStream(Function<T, R> mapper, Executor executor, int parallelism) {
379 1 1. parallelToStream : replaced return value with null for com/pivovarit/collectors/ParallelCollectors$Batching::parallelToStream → KILLED
            return ParallelStreamCollector.BatchingCollectors.streaming(mapper, executor, parallelism);
380
        }
381
382
        /**
383
         * A convenience {@link Collector} used for executing parallel computations on a custom {@link Executor}
384
         * and returning a {@link Stream} instance returning results as they arrive while maintaining the initial order.
385
         * <p>
386
         * For the parallelism of 1, the stream is executed by the calling thread.
387
         *
388
         * <br>
389
         * Example:
390
         * <pre>{@code
391
         * Stream.of(1, 2, 3)
392
         *   .collect(parallelToOrderedStream(i -> foo(), executor, 2))
393
         *   .forEach(System.out::println);
394
         * }</pre>
395
         *
396
         * @param mapper      a transformation to be performed in parallel
397
         * @param executor    the {@code Executor} to use for asynchronous execution
398
         * @param parallelism the max parallelism level
399
         * @param <T>         the type of the collected elements
400
         * @param <R>         the result returned by {@code mapper}
401
         *
402
         * @return a {@code Collector} which collects all processed elements into a {@code Stream} in parallel
403
         *
404
         * @since 2.1.0
405
         */
406
        public static <T, R> Collector<T, ?, Stream<R>> parallelToOrderedStream(Function<T, R> mapper, Executor executor, int parallelism) {
407 1 1. parallelToOrderedStream : replaced return value with null for com/pivovarit/collectors/ParallelCollectors$Batching::parallelToOrderedStream → KILLED
            return ParallelStreamCollector.BatchingCollectors.streamingOrdered(mapper, executor, parallelism);
408
        }
409
    }
410
}

Mutations

47

1.1
Location : parallel
Killed by : none
replaced return value with null for com/pivovarit/collectors/ParallelCollectors::parallel → NO_COVERAGE

74

1.1
Location : parallel
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:collectors()]/[dynamic-test:#8]
replaced return value with null for com/pivovarit/collectors/ParallelCollectors::parallel → KILLED

105

1.1
Location : parallel
Killed by : none
replaced return value with null for com/pivovarit/collectors/ParallelCollectors::parallel → NO_COVERAGE

133

1.1
Location : parallel
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:collectors()]/[dynamic-test:#50]
replaced return value with null for com/pivovarit/collectors/ParallelCollectors::parallel → KILLED

167

1.1
Location : parallelToStream
Killed by : none
replaced return value with null for com/pivovarit/collectors/ParallelCollectors::parallelToStream → NO_COVERAGE

195

1.1
Location : parallelToStream
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_collectors()]/[dynamic-test:#6]
replaced return value with null for com/pivovarit/collectors/ParallelCollectors::parallelToStream → KILLED

228

1.1
Location : parallelToOrderedStream
Killed by : none
replaced return value with null for com/pivovarit/collectors/ParallelCollectors::parallelToOrderedStream → NO_COVERAGE

256

1.1
Location : parallelToOrderedStream
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_collectors()]/[dynamic-test:#15]
replaced return value with null for com/pivovarit/collectors/ParallelCollectors::parallelToOrderedStream → KILLED

273

1.1
Location : toFuture
Killed by : com.pivovarit.collectors.FutureCollectorsTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FutureCollectorsTest]/[method:shouldCollectToList()]
replaced return value with null for com/pivovarit/collectors/ParallelCollectors::toFuture → KILLED

287

1.1
Location : toFuture
Killed by : com.pivovarit.collectors.FutureCollectorsTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FutureCollectorsTest]/[method:shouldCollect()]
replaced return value with null for com/pivovarit/collectors/ParallelCollectors::toFuture → KILLED

322

1.1
Location : parallel
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:batching_collectors()]/[dynamic-test:#25]
replaced return value with null for com/pivovarit/collectors/ParallelCollectors$Batching::parallel → KILLED

351

1.1
Location : parallel
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:batching_collectors()]/[dynamic-test:#58]
replaced return value with null for com/pivovarit/collectors/ParallelCollectors$Batching::parallel → KILLED

379

1.1
Location : parallelToStream
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_batching_collectors()]/[dynamic-test:#6]
replaced return value with null for com/pivovarit/collectors/ParallelCollectors$Batching::parallelToStream → KILLED

407

1.1
Location : parallelToOrderedStream
Killed by : com.pivovarit.collectors.FunctionalTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.FunctionalTest]/[test-factory:streaming_batching_collectors()]/[dynamic-test:#14]
replaced return value with null for com/pivovarit/collectors/ParallelCollectors$Batching::parallelToOrderedStream → KILLED

Active mutators

Tests examined


Report generated by PIT 1.9.8