AsyncParallelStreamingCollector.java

1
/*
2
 * Copyright 2014-2026 Grzegorz Piwowarek, https://4comprehension.com/
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 * https://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
package com.pivovarit.collectors;
17
18
import java.util.ArrayList;
19
import java.util.Collection;
20
import java.util.EnumSet;
21
import java.util.List;
22
import java.util.Set;
23
import java.util.concurrent.CompletableFuture;
24
import java.util.concurrent.Executor;
25
import java.util.function.BiConsumer;
26
import java.util.function.BinaryOperator;
27
import java.util.function.Function;
28
import java.util.function.Supplier;
29
import java.util.stream.Collector;
30
import java.util.stream.Stream;
31
import java.util.stream.StreamSupport;
32
33
import static com.pivovarit.collectors.BatchingSpliterator.batching;
34
import static com.pivovarit.collectors.BatchingSpliterator.partitioned;
35
import static java.util.Collections.emptySet;
36
37
/**
38
 * @author Grzegorz Piwowarek
39
 */
40
final class AsyncParallelStreamingCollector<T, R> extends AbstractParallelCollector<T, R, Stream<R>> {
41
42
    private static final EnumSet<Characteristics> UNORDERED_CHARACTERISTICS = EnumSet.of(Characteristics.UNORDERED);
43
44
    private final CompletionStrategy completionStrategy;
45
46
    AsyncParallelStreamingCollector(Function<? super T, ? extends R> task, Dispatcher<R> dispatcher, boolean ordered) {
47
        super(task, dispatcher);
48 1 1. <init> : negated conditional → KILLED
        this.completionStrategy = ordered ? CompletionStrategy.ORDERED : CompletionStrategy.UNORDERED;
49
    }
50
51
    @Override
52
    public Function<List<CompletableFuture<R>>, Stream<R>> finalizer() {
53 2 1. lambda$finalizer$0 : replaced return value with Stream.empty for com/pivovarit/collectors/AsyncParallelStreamingCollector::lambda$finalizer$0 → KILLED
2. finalizer : replaced return value with null for com/pivovarit/collectors/AsyncParallelStreamingCollector::finalizer → KILLED
        return acc -> switch (completionStrategy) {
54
            case ORDERED -> acc.stream().map(CompletableFuture::join);
55
            case UNORDERED -> StreamSupport.stream(new CompletionOrderSpliterator<>(acc), false);
56
        };
57
    }
58
59
    @Override
60
    public Set<Characteristics> characteristics() {
61 1 1. characteristics : replaced return value with Collections.emptySet for com/pivovarit/collectors/AsyncParallelStreamingCollector::characteristics → SURVIVED
        return switch (completionStrategy) {
62
            case ORDERED -> emptySet();
63
            case UNORDERED -> UNORDERED_CHARACTERISTICS;
64
        };
65
    }
66
67
    record BatchingCollector<T, R>(Function<? super T, ? extends R> task, Executor executor, int parallelism, boolean ordered)
68
      implements Collector<T, ArrayList<T>, Stream<R>> {
69
70
        @Override
71
        public Supplier<ArrayList<T>> supplier() {
72 1 1. supplier : replaced return value with null for com/pivovarit/collectors/AsyncParallelStreamingCollector$BatchingCollector::supplier → KILLED
            return ArrayList::new;
73
        }
74
75
        @Override
76
        public BiConsumer<ArrayList<T>, T> accumulator() {
77 1 1. accumulator : replaced return value with null for com/pivovarit/collectors/AsyncParallelStreamingCollector$BatchingCollector::accumulator → KILLED
            return ArrayList::add;
78
        }
79
80
        @Override
81
        public BinaryOperator<ArrayList<T>> combiner() {
82 1 1. combiner : replaced return value with null for com/pivovarit/collectors/AsyncParallelStreamingCollector$BatchingCollector::combiner → KILLED
            return (left, right) -> {
83
                throw new UnsupportedOperationException("using parallel stream with parallel collectors is not supported");
84
            };
85
        }
86
87
        @Override
88
        public Function<ArrayList<T>, Stream<R>> finisher() {
89 1 1. finisher : replaced return value with null for com/pivovarit/collectors/AsyncParallelStreamingCollector$BatchingCollector::finisher → KILLED
            return items -> {
90 1 1. lambda$finisher$0 : negated conditional → KILLED
                if (items.size() == parallelism) {
91 1 1. lambda$finisher$0 : replaced return value with Stream.empty for com/pivovarit/collectors/AsyncParallelStreamingCollector$BatchingCollector::lambda$finisher$0 → KILLED
                    return items.stream()
92
                      .collect(new AsyncParallelStreamingCollector<>(task, new Dispatcher<>(executor, parallelism), ordered));
93
                } else {
94 1 1. lambda$finisher$0 : replaced return value with Stream.empty for com/pivovarit/collectors/AsyncParallelStreamingCollector$BatchingCollector::lambda$finisher$0 → KILLED
                    return partitioned(items, parallelism)
95
                      .collect(new AsyncParallelStreamingCollector<>(batching(task), new Dispatcher<>(executor, parallelism), ordered))
96
                      .flatMap(Collection::stream);
97
                }
98
            };
99
        }
100
101
        @Override
102
        public Set<Characteristics> characteristics() {
103
            return emptySet();
104
        }
105
    }
106
}

Mutations

48

1.1
Location : <init>
Killed by : com.pivovarit.collectors.test.GroupingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.GroupingTest]/[test-factory:shouldGroupByClassifierOrdered()]/[dynamic-test:#2]
negated conditional → KILLED

53

1.1
Location : lambda$finalizer$0
Killed by : com.pivovarit.collectors.test.BasicProcessingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicProcessingTest]/[test-factory:shouldProcessAllElementsInOrder()]/[dynamic-test:#18]
replaced return value with Stream.empty for com/pivovarit/collectors/AsyncParallelStreamingCollector::lambda$finalizer$0 → KILLED

2.2
Location : finalizer
Killed by : com.pivovarit.collectors.test.BasicProcessingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicProcessingTest]/[test-factory:shouldProcessEmpty()]/[dynamic-test:#18]
replaced return value with null for com/pivovarit/collectors/AsyncParallelStreamingCollector::finalizer → KILLED

61

1.1
Location : characteristics
Killed by : none
replaced return value with Collections.emptySet for com/pivovarit/collectors/AsyncParallelStreamingCollector::characteristics → SURVIVED
Covering tests

72

1.1
Location : supplier
Killed by : com.pivovarit.collectors.test.ParallelismValidationTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.ParallelismValidationTest]/[test-factory:shouldThrowOnParallelStream()]/[dynamic-test:#19]
replaced return value with null for com/pivovarit/collectors/AsyncParallelStreamingCollector$BatchingCollector::supplier → KILLED

77

1.1
Location : accumulator
Killed by : com.pivovarit.collectors.test.ParallelismValidationTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.ParallelismValidationTest]/[test-factory:shouldThrowOnParallelStream()]/[dynamic-test:#19]
replaced return value with null for com/pivovarit/collectors/AsyncParallelStreamingCollector$BatchingCollector::accumulator → KILLED

82

1.1
Location : combiner
Killed by : com.pivovarit.collectors.test.ParallelismValidationTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.ParallelismValidationTest]/[test-factory:shouldThrowOnParallelStream()]/[dynamic-test:#19]
replaced return value with null for com/pivovarit/collectors/AsyncParallelStreamingCollector$BatchingCollector::combiner → KILLED

89

1.1
Location : finisher
Killed by : com.pivovarit.collectors.test.ParallelismValidationTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.ParallelismValidationTest]/[test-factory:shouldThrowOnParallelStream()]/[dynamic-test:#19]
replaced return value with null for com/pivovarit/collectors/AsyncParallelStreamingCollector$BatchingCollector::finisher → KILLED

90

1.1
Location : lambda$finisher$0
Killed by : com.pivovarit.collectors.test.BatchingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BatchingTest]/[test-factory:shouldProcessOnExactlyNThreads()]/[dynamic-test:#4]
negated conditional → KILLED

91

1.1
Location : lambda$finisher$0
Killed by : com.pivovarit.collectors.test.RejectedExecutionHandlingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.RejectedExecutionHandlingTest]/[test-factory:shouldRejectInvalidRejectedExecutionHandlerFactory()]/[dynamic-test:#8]
replaced return value with Stream.empty for com/pivovarit/collectors/AsyncParallelStreamingCollector$BatchingCollector::lambda$finisher$0 → KILLED

94

1.1
Location : lambda$finisher$0
Killed by : com.pivovarit.collectors.test.BasicProcessingTest.[engine:junit-jupiter]/[class:com.pivovarit.collectors.test.BasicProcessingTest]/[test-factory:shouldProcessAllElements()]/[dynamic-test:#31]
replaced return value with Stream.empty for com/pivovarit/collectors/AsyncParallelStreamingCollector$BatchingCollector::lambda$finisher$0 → KILLED

Active mutators

Tests examined


Report generated by PIT 1.22.0