AsyncParallelCollector.java

1
/*
2
 * Copyright 2014-2026 Grzegorz Piwowarek, https://4comprehension.com/
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 * https://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
package com.pivovarit.collectors;
17
18
import java.util.ArrayList;
19
import java.util.Collection;
20
import java.util.Collections;
21
import java.util.List;
22
import java.util.Set;
23
import java.util.concurrent.CompletableFuture;
24
import java.util.concurrent.Executor;
25
import java.util.function.BiConsumer;
26
import java.util.function.BinaryOperator;
27
import java.util.function.Function;
28
import java.util.function.Supplier;
29
import java.util.stream.Collector;
30
import java.util.stream.Stream;
31
32
import static com.pivovarit.collectors.BatchingSpliterator.partitioned;
33
import static java.util.concurrent.CompletableFuture.allOf;
34
35
/**
36
 * @author Grzegorz Piwowarek
37
 */
38
final class AsyncParallelCollector<T, R, C> extends AbstractParallelCollector<T, R, CompletableFuture<C>> {
39
40
    private final Function<Stream<R>, C> finalizer;
41
42
    AsyncParallelCollector(Function<? super T, ? extends R> task, Dispatcher<R> dispatcher, Function<Stream<R>, C> finalizer) {
43
        super(task, dispatcher);
44
        this.finalizer = finalizer;
45
    }
46
47
    @Override
48
    public Function<List<CompletableFuture<R>>, CompletableFuture<C>> finalizer() {
49 1 1. finalizer : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::finalizer → KILLED
        return futures -> {
50 1 1. lambda$finalizer$1 : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::lambda$finalizer$1 → KILLED
            var combined = allOf(futures.toArray(CompletableFuture[]::new))
51 1 1. lambda$finalizer$2 : replaced return value with Stream.empty for com/pivovarit/collectors/AsyncParallelCollector::lambda$finalizer$2 → KILLED
              .thenApply(__ -> futures.stream().map(CompletableFuture::join));
52
53
            for (var future : futures) {
54
                future.whenComplete((o, ex) -> {
55 1 1. lambda$finalizer$3 : negated conditional → SURVIVED
                    if (ex != null) {
56
                        combined.completeExceptionally(ex);
57
                    }
58
                });
59
            }
60
61 1 1. lambda$finalizer$0 : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::lambda$finalizer$0 → KILLED
            return combined.thenApply(finalizer);
62
        };
63
    }
64
65
    record BatchingCollector<T, R, C>(Function<? super T, ? extends R> task, Function<Stream<R>, C> finalizer,
66
                                      Executor executor, int parallelism)
67
      implements Collector<T, ArrayList<T>, CompletableFuture<C>> {
68
69
        @Override
70
        public Supplier<ArrayList<T>> supplier() {
71 1 1. supplier : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector$BatchingCollector::supplier → KILLED
            return ArrayList::new;
72
        }
73
74
        @Override
75
        public BiConsumer<ArrayList<T>, T> accumulator() {
76 1 1. accumulator : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector$BatchingCollector::accumulator → KILLED
            return ArrayList::add;
77
        }
78
79
        @Override
80
        public BinaryOperator<ArrayList<T>> combiner() {
81 1 1. combiner : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector$BatchingCollector::combiner → KILLED
            return (left, right) -> {
82
                throw new UnsupportedOperationException("using parallel stream with parallel collectors is not supported");
83
            };
84
        }
85
86
        @Override
87
        public Function<ArrayList<T>, CompletableFuture<C>> finisher() {
88 1 1. finisher : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector$BatchingCollector::finisher → KILLED
            return items -> {
89 1 1. lambda$finisher$0 : negated conditional → KILLED
                if (items.size() == parallelism) {
90 1 1. lambda$finisher$0 : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector$BatchingCollector::lambda$finisher$0 → KILLED
                    return items.stream()
91
                      .collect((Collector<T, ?, CompletableFuture<C>>) new AsyncParallelCollector<T, R, C>(task, new Dispatcher<>(executor, parallelism), finalizer));
92
                } else {
93 1 1. lambda$finisher$0 : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector$BatchingCollector::lambda$finisher$0 → KILLED
                    return partitioned(items, parallelism)
94
                      .collect((Collector<List<T>, ?, CompletableFuture<C>>) new AsyncParallelCollector<List<T>, List<R>, C>((Function<? super List<T>, ? extends List<R>>) batch -> {
95
                          List<R> list = new ArrayList<>(batch.size());
96
                          for (T t : batch) {
97
                              list.add(task.apply(t));
98
                          }
99 1 1. lambda$finisher$1 : replaced return value with Collections.emptyList for com/pivovarit/collectors/AsyncParallelCollector$BatchingCollector::lambda$finisher$1 → KILLED
                          return list;
100 1 1. lambda$finisher$2 : replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector$BatchingCollector::lambda$finisher$2 → KILLED
                      }, new Dispatcher<>(executor, parallelism), r -> finalizer.apply(r.flatMap(Collection::stream))));
101
                }
102
            };
103
        }
104
105
        @Override
106
        public Set<Characteristics> characteristics() {
107
            return Collections.emptySet();
108
        }
109
    }
110
}

Mutations

49

1.1
Location : finalizer
Killed by : com.pivovarit.collectors.test.BasicProcessingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicProcessingTest]/[test-factory:shouldProcessEmpty()]/[dynamic-test:#14]
replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::finalizer → KILLED

50

1.1
Location : lambda$finalizer$1
Killed by : com.pivovarit.collectors.test.BasicProcessingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicProcessingTest]/[test-factory:shouldProcessEmpty()]/[dynamic-test:#14]
replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::lambda$finalizer$1 → KILLED

51

1.1
Location : lambda$finalizer$2
Killed by : com.pivovarit.collectors.test.BasicProcessingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicProcessingTest]/[test-factory:shouldProcessAllElementsInOrder()]/[dynamic-test:#9]
replaced return value with Stream.empty for com/pivovarit/collectors/AsyncParallelCollector::lambda$finalizer$2 → KILLED

55

1.1
Location : lambda$finalizer$3
Killed by : none
negated conditional → SURVIVED
Covering tests

61

1.1
Location : lambda$finalizer$0
Killed by : com.pivovarit.collectors.test.BasicProcessingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicProcessingTest]/[test-factory:shouldProcessEmpty()]/[dynamic-test:#14]
replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector::lambda$finalizer$0 → KILLED

71

1.1
Location : supplier
Killed by : com.pivovarit.collectors.test.BasicParallelismTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicParallelismTest]/[test-factory:shouldProcessEmptyWithMaxParallelism()]/[dynamic-test:#138]
replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector$BatchingCollector::supplier → KILLED

76

1.1
Location : accumulator
Killed by : com.pivovarit.collectors.test.ParallelismValidationTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.ParallelismValidationTest]/[test-factory:shouldThrowOnParallelStream()]/[dynamic-test:#9]
replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector$BatchingCollector::accumulator → KILLED

81

1.1
Location : combiner
Killed by : com.pivovarit.collectors.test.ParallelismValidationTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.ParallelismValidationTest]/[test-factory:shouldThrowOnParallelStream()]/[dynamic-test:#9]
replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector$BatchingCollector::combiner → KILLED

88

1.1
Location : finisher
Killed by : com.pivovarit.collectors.test.BasicParallelismTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicParallelismTest]/[test-factory:shouldProcessEmptyWithMaxParallelism()]/[dynamic-test:#138]
replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector$BatchingCollector::finisher → KILLED

89

1.1
Location : lambda$finisher$0
Killed by : com.pivovarit.collectors.test.BatchingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BatchingTest]/[test-factory:shouldProcessOnExactlyNThreads()]/[dynamic-test:#2]
negated conditional → KILLED

90

1.1
Location : lambda$finisher$0
Killed by : com.pivovarit.collectors.test.BasicParallelismTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicParallelismTest]/[test-factory:shouldProcessAllElementsWithMaxParallelism()]/[dynamic-test:#138]
replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector$BatchingCollector::lambda$finisher$0 → KILLED

93

1.1
Location : lambda$finisher$0
Killed by : com.pivovarit.collectors.test.BasicParallelismTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicParallelismTest]/[test-factory:shouldProcessEmptyWithMaxParallelism()]/[dynamic-test:#138]
replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector$BatchingCollector::lambda$finisher$0 → KILLED

99

1.1
Location : lambda$finisher$1
Killed by : com.pivovarit.collectors.test.BasicParallelismTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicParallelismTest]/[test-factory:shouldProcessAllElementsWithMaxParallelism()]/[dynamic-test:#24]
replaced return value with Collections.emptyList for com/pivovarit/collectors/AsyncParallelCollector$BatchingCollector::lambda$finisher$1 → KILLED

100

1.1
Location : lambda$finisher$2
Killed by : com.pivovarit.collectors.test.BasicParallelismTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicParallelismTest]/[test-factory:shouldProcessEmptyWithMaxParallelism()]/[dynamic-test:#138]
replaced return value with null for com/pivovarit/collectors/AsyncParallelCollector$BatchingCollector::lambda$finisher$2 → KILLED

Active mutators

Tests examined


Report generated by PIT 1.22.0