1
25 package java.util.stream;
26
27 import java.util.Objects;
28 import java.util.Optional;
29 import java.util.OptionalDouble;
30 import java.util.OptionalInt;
31 import java.util.OptionalLong;
32 import java.util.Spliterator;
33 import java.util.concurrent.CountedCompleter;
34 import java.util.function.BiConsumer;
35 import java.util.function.BiFunction;
36 import java.util.function.BinaryOperator;
37 import java.util.function.DoubleBinaryOperator;
38 import java.util.function.IntBinaryOperator;
39 import java.util.function.LongBinaryOperator;
40 import java.util.function.ObjDoubleConsumer;
41 import java.util.function.ObjIntConsumer;
42 import java.util.function.ObjLongConsumer;
43 import java.util.function.Supplier;
44
45
51 final class ReduceOps {
52
53 private ReduceOps() { }
54
55
68 public static <T, U> TerminalOp<T, U>
69 makeRef(U seed, BiFunction<U, ? super T, U> reducer, BinaryOperator<U> combiner) {
70 Objects.requireNonNull(reducer);
71 Objects.requireNonNull(combiner);
72 class ReducingSink extends Box<U> implements AccumulatingSink<T, U, ReducingSink> {
73 @Override
74 public void begin(long size) {
75 state = seed;
76 }
77
78 @Override
79 public void accept(T t) {
80 state = reducer.apply(state, t);
81 }
82
83 @Override
84 public void combine(ReducingSink other) {
85 state = combiner.apply(state, other.state);
86 }
87 }
88 return new ReduceOp<T, U, ReducingSink>(StreamShape.REFERENCE) {
89 @Override
90 public ReducingSink makeSink() {
91 return new ReducingSink();
92 }
93 };
94 }
95
96
104 public static <T> TerminalOp<T, Optional<T>>
105 makeRef(BinaryOperator<T> operator) {
106 Objects.requireNonNull(operator);
107 class ReducingSink
108 implements AccumulatingSink<T, Optional<T>, ReducingSink> {
109 private boolean empty;
110 private T state;
111
112 public void begin(long size) {
113 empty = true;
114 state = null;
115 }
116
117 @Override
118 public void accept(T t) {
119 if (empty) {
120 empty = false;
121 state = t;
122 } else {
123 state = operator.apply(state, t);
124 }
125 }
126
127 @Override
128 public Optional<T> get() {
129 return empty ? Optional.empty() : Optional.of(state);
130 }
131
132 @Override
133 public void combine(ReducingSink other) {
134 if (!other.empty)
135 accept(other.state);
136 }
137 }
138 return new ReduceOp<T, Optional<T>, ReducingSink>(StreamShape.REFERENCE) {
139 @Override
140 public ReducingSink makeSink() {
141 return new ReducingSink();
142 }
143 };
144 }
145
146
155 public static <T, I> TerminalOp<T, I>
156 makeRef(Collector<? super T, I, ?> collector) {
157 Supplier<I> supplier = Objects.requireNonNull(collector).supplier();
158 BiConsumer<I, ? super T> accumulator = collector.accumulator();
159 BinaryOperator<I> combiner = collector.combiner();
160 class ReducingSink extends Box<I>
161 implements AccumulatingSink<T, I, ReducingSink> {
162 @Override
163 public void begin(long size) {
164 state = supplier.get();
165 }
166
167 @Override
168 public void accept(T t) {
169 accumulator.accept(state, t);
170 }
171
172 @Override
173 public void combine(ReducingSink other) {
174 state = combiner.apply(state, other.state);
175 }
176 }
177 return new ReduceOp<T, I, ReducingSink>(StreamShape.REFERENCE) {
178 @Override
179 public ReducingSink makeSink() {
180 return new ReducingSink();
181 }
182
183 @Override
184 public int getOpFlags() {
185 return collector.characteristics().contains(Collector.Characteristics.UNORDERED)
186 ? StreamOpFlag.NOT_ORDERED
187 : 0;
188 }
189 };
190 }
191
192
204 public static <T, R> TerminalOp<T, R>
205 makeRef(Supplier<R> seedFactory,
206 BiConsumer<R, ? super T> accumulator,
207 BiConsumer<R,R> reducer) {
208 Objects.requireNonNull(seedFactory);
209 Objects.requireNonNull(accumulator);
210 Objects.requireNonNull(reducer);
211 class ReducingSink extends Box<R>
212 implements AccumulatingSink<T, R, ReducingSink> {
213 @Override
214 public void begin(long size) {
215 state = seedFactory.get();
216 }
217
218 @Override
219 public void accept(T t) {
220 accumulator.accept(state, t);
221 }
222
223 @Override
224 public void combine(ReducingSink other) {
225 reducer.accept(state, other.state);
226 }
227 }
228 return new ReduceOp<T, R, ReducingSink>(StreamShape.REFERENCE) {
229 @Override
230 public ReducingSink makeSink() {
231 return new ReducingSink();
232 }
233 };
234 }
235
236
246 public static <T> TerminalOp<T, Long>
247 makeRefCounting() {
248 return new ReduceOp<T, Long, CountingSink<T>>(StreamShape.REFERENCE) {
249 @Override
250 public CountingSink<T> makeSink() { return new CountingSink.OfRef<>(); }
251
252 @Override
253 public <P_IN> Long evaluateSequential(PipelineHelper<T> helper,
254 Spliterator<P_IN> spliterator) {
255 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags()))
256 return spliterator.getExactSizeIfKnown();
257 return super.evaluateSequential(helper, spliterator);
258 }
259
260 @Override
261 public <P_IN> Long evaluateParallel(PipelineHelper<T> helper,
262 Spliterator<P_IN> spliterator) {
263 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags()))
264 return spliterator.getExactSizeIfKnown();
265 return super.evaluateParallel(helper, spliterator);
266 }
267
268 @Override
269 public int getOpFlags() {
270 return StreamOpFlag.NOT_ORDERED;
271 }
272 };
273 }
274
275
283 public static TerminalOp<Integer, Integer>
284 makeInt(int identity, IntBinaryOperator operator) {
285 Objects.requireNonNull(operator);
286 class ReducingSink
287 implements AccumulatingSink<Integer, Integer, ReducingSink>, Sink.OfInt {
288 private int state;
289
290 @Override
291 public void begin(long size) {
292 state = identity;
293 }
294
295 @Override
296 public void accept(int t) {
297 state = operator.applyAsInt(state, t);
298 }
299
300 @Override
301 public Integer get() {
302 return state;
303 }
304
305 @Override
306 public void combine(ReducingSink other) {
307 accept(other.state);
308 }
309 }
310 return new ReduceOp<Integer, Integer, ReducingSink>(StreamShape.INT_VALUE) {
311 @Override
312 public ReducingSink makeSink() {
313 return new ReducingSink();
314 }
315 };
316 }
317
318
325 public static TerminalOp<Integer, OptionalInt>
326 makeInt(IntBinaryOperator operator) {
327 Objects.requireNonNull(operator);
328 class ReducingSink
329 implements AccumulatingSink<Integer, OptionalInt, ReducingSink>, Sink.OfInt {
330 private boolean empty;
331 private int state;
332
333 public void begin(long size) {
334 empty = true;
335 state = 0;
336 }
337
338 @Override
339 public void accept(int t) {
340 if (empty) {
341 empty = false;
342 state = t;
343 }
344 else {
345 state = operator.applyAsInt(state, t);
346 }
347 }
348
349 @Override
350 public OptionalInt get() {
351 return empty ? OptionalInt.empty() : OptionalInt.of(state);
352 }
353
354 @Override
355 public void combine(ReducingSink other) {
356 if (!other.empty)
357 accept(other.state);
358 }
359 }
360 return new ReduceOp<Integer, OptionalInt, ReducingSink>(StreamShape.INT_VALUE) {
361 @Override
362 public ReducingSink makeSink() {
363 return new ReducingSink();
364 }
365 };
366 }
367
368
379 public static <R> TerminalOp<Integer, R>
380 makeInt(Supplier<R> supplier,
381 ObjIntConsumer<R> accumulator,
382 BinaryOperator<R> combiner) {
383 Objects.requireNonNull(supplier);
384 Objects.requireNonNull(accumulator);
385 Objects.requireNonNull(combiner);
386 class ReducingSink extends Box<R>
387 implements AccumulatingSink<Integer, R, ReducingSink>, Sink.OfInt {
388 @Override
389 public void begin(long size) {
390 state = supplier.get();
391 }
392
393 @Override
394 public void accept(int t) {
395 accumulator.accept(state, t);
396 }
397
398 @Override
399 public void combine(ReducingSink other) {
400 state = combiner.apply(state, other.state);
401 }
402 }
403 return new ReduceOp<Integer, R, ReducingSink>(StreamShape.INT_VALUE) {
404 @Override
405 public ReducingSink makeSink() {
406 return new ReducingSink();
407 }
408 };
409 }
410
411
420 public static TerminalOp<Integer, Long>
421 makeIntCounting() {
422 return new ReduceOp<Integer, Long, CountingSink<Integer>>(StreamShape.INT_VALUE) {
423 @Override
424 public CountingSink<Integer> makeSink() { return new CountingSink.OfInt(); }
425
426 @Override
427 public <P_IN> Long evaluateSequential(PipelineHelper<Integer> helper,
428 Spliterator<P_IN> spliterator) {
429 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags()))
430 return spliterator.getExactSizeIfKnown();
431 return super.evaluateSequential(helper, spliterator);
432 }
433
434 @Override
435 public <P_IN> Long evaluateParallel(PipelineHelper<Integer> helper,
436 Spliterator<P_IN> spliterator) {
437 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags()))
438 return spliterator.getExactSizeIfKnown();
439 return super.evaluateParallel(helper, spliterator);
440 }
441
442 @Override
443 public int getOpFlags() {
444 return StreamOpFlag.NOT_ORDERED;
445 }
446 };
447 }
448
449
457 public static TerminalOp<Long, Long>
458 makeLong(long identity, LongBinaryOperator operator) {
459 Objects.requireNonNull(operator);
460 class ReducingSink
461 implements AccumulatingSink<Long, Long, ReducingSink>, Sink.OfLong {
462 private long state;
463
464 @Override
465 public void begin(long size) {
466 state = identity;
467 }
468
469 @Override
470 public void accept(long t) {
471 state = operator.applyAsLong(state, t);
472 }
473
474 @Override
475 public Long get() {
476 return state;
477 }
478
479 @Override
480 public void combine(ReducingSink other) {
481 accept(other.state);
482 }
483 }
484 return new ReduceOp<Long, Long, ReducingSink>(StreamShape.LONG_VALUE) {
485 @Override
486 public ReducingSink makeSink() {
487 return new ReducingSink();
488 }
489 };
490 }
491
492
499 public static TerminalOp<Long, OptionalLong>
500 makeLong(LongBinaryOperator operator) {
501 Objects.requireNonNull(operator);
502 class ReducingSink
503 implements AccumulatingSink<Long, OptionalLong, ReducingSink>, Sink.OfLong {
504 private boolean empty;
505 private long state;
506
507 public void begin(long size) {
508 empty = true;
509 state = 0;
510 }
511
512 @Override
513 public void accept(long t) {
514 if (empty) {
515 empty = false;
516 state = t;
517 }
518 else {
519 state = operator.applyAsLong(state, t);
520 }
521 }
522
523 @Override
524 public OptionalLong get() {
525 return empty ? OptionalLong.empty() : OptionalLong.of(state);
526 }
527
528 @Override
529 public void combine(ReducingSink other) {
530 if (!other.empty)
531 accept(other.state);
532 }
533 }
534 return new ReduceOp<Long, OptionalLong, ReducingSink>(StreamShape.LONG_VALUE) {
535 @Override
536 public ReducingSink makeSink() {
537 return new ReducingSink();
538 }
539 };
540 }
541
542
553 public static <R> TerminalOp<Long, R>
554 makeLong(Supplier<R> supplier,
555 ObjLongConsumer<R> accumulator,
556 BinaryOperator<R> combiner) {
557 Objects.requireNonNull(supplier);
558 Objects.requireNonNull(accumulator);
559 Objects.requireNonNull(combiner);
560 class ReducingSink extends Box<R>
561 implements AccumulatingSink<Long, R, ReducingSink>, Sink.OfLong {
562 @Override
563 public void begin(long size) {
564 state = supplier.get();
565 }
566
567 @Override
568 public void accept(long t) {
569 accumulator.accept(state, t);
570 }
571
572 @Override
573 public void combine(ReducingSink other) {
574 state = combiner.apply(state, other.state);
575 }
576 }
577 return new ReduceOp<Long, R, ReducingSink>(StreamShape.LONG_VALUE) {
578 @Override
579 public ReducingSink makeSink() {
580 return new ReducingSink();
581 }
582 };
583 }
584
585
594 public static TerminalOp<Long, Long>
595 makeLongCounting() {
596 return new ReduceOp<Long, Long, CountingSink<Long>>(StreamShape.LONG_VALUE) {
597 @Override
598 public CountingSink<Long> makeSink() { return new CountingSink.OfLong(); }
599
600 @Override
601 public <P_IN> Long evaluateSequential(PipelineHelper<Long> helper,
602 Spliterator<P_IN> spliterator) {
603 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags()))
604 return spliterator.getExactSizeIfKnown();
605 return super.evaluateSequential(helper, spliterator);
606 }
607
608 @Override
609 public <P_IN> Long evaluateParallel(PipelineHelper<Long> helper,
610 Spliterator<P_IN> spliterator) {
611 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags()))
612 return spliterator.getExactSizeIfKnown();
613 return super.evaluateParallel(helper, spliterator);
614 }
615
616 @Override
617 public int getOpFlags() {
618 return StreamOpFlag.NOT_ORDERED;
619 }
620 };
621 }
622
623
631 public static TerminalOp<Double, Double>
632 makeDouble(double identity, DoubleBinaryOperator operator) {
633 Objects.requireNonNull(operator);
634 class ReducingSink
635 implements AccumulatingSink<Double, Double, ReducingSink>, Sink.OfDouble {
636 private double state;
637
638 @Override
639 public void begin(long size) {
640 state = identity;
641 }
642
643 @Override
644 public void accept(double t) {
645 state = operator.applyAsDouble(state, t);
646 }
647
648 @Override
649 public Double get() {
650 return state;
651 }
652
653 @Override
654 public void combine(ReducingSink other) {
655 accept(other.state);
656 }
657 }
658 return new ReduceOp<Double, Double, ReducingSink>(StreamShape.DOUBLE_VALUE) {
659 @Override
660 public ReducingSink makeSink() {
661 return new ReducingSink();
662 }
663 };
664 }
665
666
673 public static TerminalOp<Double, OptionalDouble>
674 makeDouble(DoubleBinaryOperator operator) {
675 Objects.requireNonNull(operator);
676 class ReducingSink
677 implements AccumulatingSink<Double, OptionalDouble, ReducingSink>, Sink.OfDouble {
678 private boolean empty;
679 private double state;
680
681 public void begin(long size) {
682 empty = true;
683 state = 0;
684 }
685
686 @Override
687 public void accept(double t) {
688 if (empty) {
689 empty = false;
690 state = t;
691 }
692 else {
693 state = operator.applyAsDouble(state, t);
694 }
695 }
696
697 @Override
698 public OptionalDouble get() {
699 return empty ? OptionalDouble.empty() : OptionalDouble.of(state);
700 }
701
702 @Override
703 public void combine(ReducingSink other) {
704 if (!other.empty)
705 accept(other.state);
706 }
707 }
708 return new ReduceOp<Double, OptionalDouble, ReducingSink>(StreamShape.DOUBLE_VALUE) {
709 @Override
710 public ReducingSink makeSink() {
711 return new ReducingSink();
712 }
713 };
714 }
715
716
727 public static <R> TerminalOp<Double, R>
728 makeDouble(Supplier<R> supplier,
729 ObjDoubleConsumer<R> accumulator,
730 BinaryOperator<R> combiner) {
731 Objects.requireNonNull(supplier);
732 Objects.requireNonNull(accumulator);
733 Objects.requireNonNull(combiner);
734 class ReducingSink extends Box<R>
735 implements AccumulatingSink<Double, R, ReducingSink>, Sink.OfDouble {
736 @Override
737 public void begin(long size) {
738 state = supplier.get();
739 }
740
741 @Override
742 public void accept(double t) {
743 accumulator.accept(state, t);
744 }
745
746 @Override
747 public void combine(ReducingSink other) {
748 state = combiner.apply(state, other.state);
749 }
750 }
751 return new ReduceOp<Double, R, ReducingSink>(StreamShape.DOUBLE_VALUE) {
752 @Override
753 public ReducingSink makeSink() {
754 return new ReducingSink();
755 }
756 };
757 }
758
759
768 public static TerminalOp<Double, Long>
769 makeDoubleCounting() {
770 return new ReduceOp<Double, Long, CountingSink<Double>>(StreamShape.DOUBLE_VALUE) {
771 @Override
772 public CountingSink<Double> makeSink() { return new CountingSink.OfDouble(); }
773
774 @Override
775 public <P_IN> Long evaluateSequential(PipelineHelper<Double> helper,
776 Spliterator<P_IN> spliterator) {
777 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags()))
778 return spliterator.getExactSizeIfKnown();
779 return super.evaluateSequential(helper, spliterator);
780 }
781
782 @Override
783 public <P_IN> Long evaluateParallel(PipelineHelper<Double> helper,
784 Spliterator<P_IN> spliterator) {
785 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags()))
786 return spliterator.getExactSizeIfKnown();
787 return super.evaluateParallel(helper, spliterator);
788 }
789
790 @Override
791 public int getOpFlags() {
792 return StreamOpFlag.NOT_ORDERED;
793 }
794 };
795 }
796
797
800 abstract static class CountingSink<T>
801 extends Box<Long>
802 implements AccumulatingSink<T, Long, CountingSink<T>> {
803 long count;
804
805 @Override
806 public void begin(long size) {
807 count = 0L;
808 }
809
810 @Override
811 public Long get() {
812 return count;
813 }
814
815 @Override
816 public void combine(CountingSink<T> other) {
817 count += other.count;
818 }
819
820 static final class OfRef<T> extends CountingSink<T> {
821 @Override
822 public void accept(T t) {
823 count++;
824 }
825 }
826
827 static final class OfInt extends CountingSink<Integer> implements Sink.OfInt {
828 @Override
829 public void accept(int t) {
830 count++;
831 }
832 }
833
834 static final class OfLong extends CountingSink<Long> implements Sink.OfLong {
835 @Override
836 public void accept(long t) {
837 count++;
838 }
839 }
840
841 static final class OfDouble extends CountingSink<Double> implements Sink.OfDouble {
842 @Override
843 public void accept(double t) {
844 count++;
845 }
846 }
847 }
848
849
858 private interface AccumulatingSink<T, R, K extends AccumulatingSink<T, R, K>>
859 extends TerminalSink<T, R> {
860 void combine(K other);
861 }
862
863
869 private abstract static class Box<U> {
870 U state;
871
872 Box() {}
873
874 public U get() {
875 return state;
876 }
877 }
878
879
889 private abstract static class ReduceOp<T, R, S extends AccumulatingSink<T, R, S>>
890 implements TerminalOp<T, R> {
891 private final StreamShape inputShape;
892
893
899 ReduceOp(StreamShape shape) {
900 inputShape = shape;
901 }
902
903 public abstract S makeSink();
904
905 @Override
906 public StreamShape inputShape() {
907 return inputShape;
908 }
909
910 @Override
911 public <P_IN> R evaluateSequential(PipelineHelper<T> helper,
912 Spliterator<P_IN> spliterator) {
913 return helper.wrapAndCopyInto(makeSink(), spliterator).get();
914 }
915
916 @Override
917 public <P_IN> R evaluateParallel(PipelineHelper<T> helper,
918 Spliterator<P_IN> spliterator) {
919 return new ReduceTask<>(this, helper, spliterator).invoke().get();
920 }
921 }
922
923
926 @SuppressWarnings("serial")
927 private static final class ReduceTask<P_IN, P_OUT, R,
928 S extends AccumulatingSink<P_OUT, R, S>>
929 extends AbstractTask<P_IN, P_OUT, S, ReduceTask<P_IN, P_OUT, R, S>> {
930 private final ReduceOp<P_OUT, R, S> op;
931
932 ReduceTask(ReduceOp<P_OUT, R, S> op,
933 PipelineHelper<P_OUT> helper,
934 Spliterator<P_IN> spliterator) {
935 super(helper, spliterator);
936 this.op = op;
937 }
938
939 ReduceTask(ReduceTask<P_IN, P_OUT, R, S> parent,
940 Spliterator<P_IN> spliterator) {
941 super(parent, spliterator);
942 this.op = parent.op;
943 }
944
945 @Override
946 protected ReduceTask<P_IN, P_OUT, R, S> makeChild(Spliterator<P_IN> spliterator) {
947 return new ReduceTask<>(this, spliterator);
948 }
949
950 @Override
951 protected S doLeaf() {
952 return helper.wrapAndCopyInto(op.makeSink(), spliterator);
953 }
954
955 @Override
956 public void onCompletion(CountedCompleter<?> caller) {
957 if (!isLeaf()) {
958 S leftResult = leftChild.getLocalResult();
959 leftResult.combine(rightChild.getLocalResult());
960 setLocalResult(leftResult);
961 }
962
963 super.onCompletion(caller);
964 }
965 }
966 }
967