1 /*
2  * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.  Oracle designates this
8  * particular file as subject to the "Classpath" exception as provided
9  * by Oracle in the LICENSE file that accompanied this code.
10  *
11  * This code is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14  * version 2 for more details (a copy is included in the LICENSE file that
15  * accompanied this code).
16  *
17  * You should have received a copy of the GNU General Public License version
18  * 2 along with this work; if not, write to the Free Software Foundation,
19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20  *
21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22  * or visit www.oracle.com if you need additional information or have any
23  * questions.
24  */

25 package java.util.stream;
26
27 import java.util.Objects;
28 import java.util.Optional;
29 import java.util.OptionalDouble;
30 import java.util.OptionalInt;
31 import java.util.OptionalLong;
32 import java.util.Spliterator;
33 import java.util.concurrent.CountedCompleter;
34 import java.util.function.BiConsumer;
35 import java.util.function.BiFunction;
36 import java.util.function.BinaryOperator;
37 import java.util.function.DoubleBinaryOperator;
38 import java.util.function.IntBinaryOperator;
39 import java.util.function.LongBinaryOperator;
40 import java.util.function.ObjDoubleConsumer;
41 import java.util.function.ObjIntConsumer;
42 import java.util.function.ObjLongConsumer;
43 import java.util.function.Supplier;
44
45 /**
46  * Factory for creating instances of {@code TerminalOp} that implement
47  * reductions.
48  *
49  * @since 1.8
50  */

51 final class ReduceOps {
52
53     private ReduceOps() { }
54
55     /**
56      * Constructs a {@code TerminalOp} that implements a functional reduce on
57      * reference values.
58      *
59      * @param <T> the type of the input elements
60      * @param <U> the type of the result
61      * @param seed the identity element for the reduction
62      * @param reducer the accumulating function that incorporates an additional
63      *        input element into the result
64      * @param combiner the combining function that combines two intermediate
65      *        results
66      * @return a {@code TerminalOp} implementing the reduction
67      */

68     public static <T, U> TerminalOp<T, U>
69     makeRef(U seed, BiFunction<U, ? super T, U> reducer, BinaryOperator<U> combiner) {
70         Objects.requireNonNull(reducer);
71         Objects.requireNonNull(combiner);
72         class ReducingSink extends Box<U> implements AccumulatingSink<T, U, ReducingSink> {
73             @Override
74             public void begin(long size) {
75                 state = seed;
76             }
77
78             @Override
79             public void accept(T t) {
80                 state = reducer.apply(state, t);
81             }
82
83             @Override
84             public void combine(ReducingSink other) {
85                 state = combiner.apply(state, other.state);
86             }
87         }
88         return new ReduceOp<T, U, ReducingSink>(StreamShape.REFERENCE) {
89             @Override
90             public ReducingSink makeSink() {
91                 return new ReducingSink();
92             }
93         };
94     }
95
96     /**
97      * Constructs a {@code TerminalOp} that implements a functional reduce on
98      * reference values producing an optional reference result.
99      *
100      * @param <T> The type of the input elements, and the type of the result
101      * @param operator The reducing function
102      * @return A {@code TerminalOp} implementing the reduction
103      */

104     public static <T> TerminalOp<T, Optional<T>>
105     makeRef(BinaryOperator<T> operator) {
106         Objects.requireNonNull(operator);
107         class ReducingSink
108                 implements AccumulatingSink<T, Optional<T>, ReducingSink> {
109             private boolean empty;
110             private T state;
111
112             public void begin(long size) {
113                 empty = true;
114                 state = null;
115             }
116
117             @Override
118             public void accept(T t) {
119                 if (empty) {
120                     empty = false;
121                     state = t;
122                 } else {
123                     state = operator.apply(state, t);
124                 }
125             }
126
127             @Override
128             public Optional<T> get() {
129                 return empty ? Optional.empty() : Optional.of(state);
130             }
131
132             @Override
133             public void combine(ReducingSink other) {
134                 if (!other.empty)
135                     accept(other.state);
136             }
137         }
138         return new ReduceOp<T, Optional<T>, ReducingSink>(StreamShape.REFERENCE) {
139             @Override
140             public ReducingSink makeSink() {
141                 return new ReducingSink();
142             }
143         };
144     }
145
146     /**
147      * Constructs a {@code TerminalOp} that implements a mutable reduce on
148      * reference values.
149      *
150      * @param <T> the type of the input elements
151      * @param <I> the type of the intermediate reduction result
152      * @param collector a {@code Collector} defining the reduction
153      * @return a {@code ReduceOp} implementing the reduction
154      */

155     public static <T, I> TerminalOp<T, I>
156     makeRef(Collector<? super T, I, ?> collector) {
157         Supplier<I> supplier = Objects.requireNonNull(collector).supplier();
158         BiConsumer<I, ? super T> accumulator = collector.accumulator();
159         BinaryOperator<I> combiner = collector.combiner();
160         class ReducingSink extends Box<I>
161                 implements AccumulatingSink<T, I, ReducingSink> {
162             @Override
163             public void begin(long size) {
164                 state = supplier.get();
165             }
166
167             @Override
168             public void accept(T t) {
169                 accumulator.accept(state, t);
170             }
171
172             @Override
173             public void combine(ReducingSink other) {
174                 state = combiner.apply(state, other.state);
175             }
176         }
177         return new ReduceOp<T, I, ReducingSink>(StreamShape.REFERENCE) {
178             @Override
179             public ReducingSink makeSink() {
180                 return new ReducingSink();
181             }
182
183             @Override
184             public int getOpFlags() {
185                 return collector.characteristics().contains(Collector.Characteristics.UNORDERED)
186                        ? StreamOpFlag.NOT_ORDERED
187                        : 0;
188             }
189         };
190     }
191
192     /**
193      * Constructs a {@code TerminalOp} that implements a mutable reduce on
194      * reference values.
195      *
196      * @param <T> the type of the input elements
197      * @param <R> the type of the result
198      * @param seedFactory a factory to produce a new base accumulator
199      * @param accumulator a function to incorporate an element into an
200      *        accumulator
201      * @param reducer a function to combine an accumulator into another
202      * @return a {@code TerminalOp} implementing the reduction
203      */

204     public static <T, R> TerminalOp<T, R>
205     makeRef(Supplier<R> seedFactory,
206             BiConsumer<R, ? super T> accumulator,
207             BiConsumer<R,R> reducer) {
208         Objects.requireNonNull(seedFactory);
209         Objects.requireNonNull(accumulator);
210         Objects.requireNonNull(reducer);
211         class ReducingSink extends Box<R>
212                 implements AccumulatingSink<T, R, ReducingSink> {
213             @Override
214             public void begin(long size) {
215                 state = seedFactory.get();
216             }
217
218             @Override
219             public void accept(T t) {
220                 accumulator.accept(state, t);
221             }
222
223             @Override
224             public void combine(ReducingSink other) {
225                 reducer.accept(state, other.state);
226             }
227         }
228         return new ReduceOp<T, R, ReducingSink>(StreamShape.REFERENCE) {
229             @Override
230             public ReducingSink makeSink() {
231                 return new ReducingSink();
232             }
233         };
234     }
235
236     /**
237      * Constructs a {@code TerminalOp} that counts the number of stream
238      * elements.  If the size of the pipeline is known then count is the size
239      * and there is no need to evaluate the pipeline.  If the size of the
240      * pipeline is non known then count is produced, via reduction, using a
241      * {@link CountingSink}.
242      *
243      * @param <T> the type of the input elements
244      * @return a {@code TerminalOp} implementing the counting
245      */

246     public static <T> TerminalOp<T, Long>
247     makeRefCounting() {
248         return new ReduceOp<T, Long, CountingSink<T>>(StreamShape.REFERENCE) {
249             @Override
250             public CountingSink<T> makeSink() { return new CountingSink.OfRef<>(); }
251
252             @Override
253             public <P_IN> Long evaluateSequential(PipelineHelper<T> helper,
254                                                   Spliterator<P_IN> spliterator) {
255                 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags()))
256                     return spliterator.getExactSizeIfKnown();
257                 return super.evaluateSequential(helper, spliterator);
258             }
259
260             @Override
261             public <P_IN> Long evaluateParallel(PipelineHelper<T> helper,
262                                                 Spliterator<P_IN> spliterator) {
263                 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags()))
264                     return spliterator.getExactSizeIfKnown();
265                 return super.evaluateParallel(helper, spliterator);
266             }
267
268             @Override
269             public int getOpFlags() {
270                 return StreamOpFlag.NOT_ORDERED;
271             }
272         };
273     }
274
275     /**
276      * Constructs a {@code TerminalOp} that implements a functional reduce on
277      * {@code int} values.
278      *
279      * @param identity the identity for the combining function
280      * @param operator the combining function
281      * @return a {@code TerminalOp} implementing the reduction
282      */

283     public static TerminalOp<Integer, Integer>
284     makeInt(int identity, IntBinaryOperator operator) {
285         Objects.requireNonNull(operator);
286         class ReducingSink
287                 implements AccumulatingSink<Integer, Integer, ReducingSink>, Sink.OfInt {
288             private int state;
289
290             @Override
291             public void begin(long size) {
292                 state = identity;
293             }
294
295             @Override
296             public void accept(int t) {
297                 state = operator.applyAsInt(state, t);
298             }
299
300             @Override
301             public Integer get() {
302                 return state;
303             }
304
305             @Override
306             public void combine(ReducingSink other) {
307                 accept(other.state);
308             }
309         }
310         return new ReduceOp<Integer, Integer, ReducingSink>(StreamShape.INT_VALUE) {
311             @Override
312             public ReducingSink makeSink() {
313                 return new ReducingSink();
314             }
315         };
316     }
317
318     /**
319      * Constructs a {@code TerminalOp} that implements a functional reduce on
320      * {@code int} values, producing an optional integer result.
321      *
322      * @param operator the combining function
323      * @return a {@code TerminalOp} implementing the reduction
324      */

325     public static TerminalOp<Integer, OptionalInt>
326     makeInt(IntBinaryOperator operator) {
327         Objects.requireNonNull(operator);
328         class ReducingSink
329                 implements AccumulatingSink<Integer, OptionalInt, ReducingSink>, Sink.OfInt {
330             private boolean empty;
331             private int state;
332
333             public void begin(long size) {
334                 empty = true;
335                 state = 0;
336             }
337
338             @Override
339             public void accept(int t) {
340                 if (empty) {
341                     empty = false;
342                     state = t;
343                 }
344                 else {
345                     state = operator.applyAsInt(state, t);
346                 }
347             }
348
349             @Override
350             public OptionalInt get() {
351                 return empty ? OptionalInt.empty() : OptionalInt.of(state);
352             }
353
354             @Override
355             public void combine(ReducingSink other) {
356                 if (!other.empty)
357                     accept(other.state);
358             }
359         }
360         return new ReduceOp<Integer, OptionalInt, ReducingSink>(StreamShape.INT_VALUE) {
361             @Override
362             public ReducingSink makeSink() {
363                 return new ReducingSink();
364             }
365         };
366     }
367
368     /**
369      * Constructs a {@code TerminalOp} that implements a mutable reduce on
370      * {@code int} values.
371      *
372      * @param <R> The type of the result
373      * @param supplier a factory to produce a new accumulator of the result type
374      * @param accumulator a function to incorporate an int into an
375      *        accumulator
376      * @param combiner a function to combine an accumulator into another
377      * @return A {@code ReduceOp} implementing the reduction
378      */

379     public static <R> TerminalOp<Integer, R>
380     makeInt(Supplier<R> supplier,
381             ObjIntConsumer<R> accumulator,
382             BinaryOperator<R> combiner) {
383         Objects.requireNonNull(supplier);
384         Objects.requireNonNull(accumulator);
385         Objects.requireNonNull(combiner);
386         class ReducingSink extends Box<R>
387                 implements AccumulatingSink<Integer, R, ReducingSink>, Sink.OfInt {
388             @Override
389             public void begin(long size) {
390                 state = supplier.get();
391             }
392
393             @Override
394             public void accept(int t) {
395                 accumulator.accept(state, t);
396             }
397
398             @Override
399             public void combine(ReducingSink other) {
400                 state = combiner.apply(state, other.state);
401             }
402         }
403         return new ReduceOp<Integer, R, ReducingSink>(StreamShape.INT_VALUE) {
404             @Override
405             public ReducingSink makeSink() {
406                 return new ReducingSink();
407             }
408         };
409     }
410
411     /**
412      * Constructs a {@code TerminalOp} that counts the number of stream
413      * elements.  If the size of the pipeline is known then count is the size
414      * and there is no need to evaluate the pipeline.  If the size of the
415      * pipeline is non known then count is produced, via reduction, using a
416      * {@link CountingSink}.
417      *
418      * @return a {@code TerminalOp} implementing the counting
419      */

420     public static TerminalOp<Integer, Long>
421     makeIntCounting() {
422         return new ReduceOp<Integer, Long, CountingSink<Integer>>(StreamShape.INT_VALUE) {
423             @Override
424             public CountingSink<Integer> makeSink() { return new CountingSink.OfInt(); }
425
426             @Override
427             public <P_IN> Long evaluateSequential(PipelineHelper<Integer> helper,
428                                                   Spliterator<P_IN> spliterator) {
429                 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags()))
430                     return spliterator.getExactSizeIfKnown();
431                 return super.evaluateSequential(helper, spliterator);
432             }
433
434             @Override
435             public <P_IN> Long evaluateParallel(PipelineHelper<Integer> helper,
436                                                 Spliterator<P_IN> spliterator) {
437                 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags()))
438                     return spliterator.getExactSizeIfKnown();
439                 return super.evaluateParallel(helper, spliterator);
440             }
441
442             @Override
443             public int getOpFlags() {
444                 return StreamOpFlag.NOT_ORDERED;
445             }
446         };
447     }
448
449     /**
450      * Constructs a {@code TerminalOp} that implements a functional reduce on
451      * {@code long} values.
452      *
453      * @param identity the identity for the combining function
454      * @param operator the combining function
455      * @return a {@code TerminalOp} implementing the reduction
456      */

457     public static TerminalOp<Long, Long>
458     makeLong(long identity, LongBinaryOperator operator) {
459         Objects.requireNonNull(operator);
460         class ReducingSink
461                 implements AccumulatingSink<Long, Long, ReducingSink>, Sink.OfLong {
462             private long state;
463
464             @Override
465             public void begin(long size) {
466                 state = identity;
467             }
468
469             @Override
470             public void accept(long t) {
471                 state = operator.applyAsLong(state, t);
472             }
473
474             @Override
475             public Long get() {
476                 return state;
477             }
478
479             @Override
480             public void combine(ReducingSink other) {
481                 accept(other.state);
482             }
483         }
484         return new ReduceOp<Long, Long, ReducingSink>(StreamShape.LONG_VALUE) {
485             @Override
486             public ReducingSink makeSink() {
487                 return new ReducingSink();
488             }
489         };
490     }
491
492     /**
493      * Constructs a {@code TerminalOp} that implements a functional reduce on
494      * {@code long} values, producing an optional long result.
495      *
496      * @param operator the combining function
497      * @return a {@code TerminalOp} implementing the reduction
498      */

499     public static TerminalOp<Long, OptionalLong>
500     makeLong(LongBinaryOperator operator) {
501         Objects.requireNonNull(operator);
502         class ReducingSink
503                 implements AccumulatingSink<Long, OptionalLong, ReducingSink>, Sink.OfLong {
504             private boolean empty;
505             private long state;
506
507             public void begin(long size) {
508                 empty = true;
509                 state = 0;
510             }
511
512             @Override
513             public void accept(long t) {
514                 if (empty) {
515                     empty = false;
516                     state = t;
517                 }
518                 else {
519                     state = operator.applyAsLong(state, t);
520                 }
521             }
522
523             @Override
524             public OptionalLong get() {
525                 return empty ? OptionalLong.empty() : OptionalLong.of(state);
526             }
527
528             @Override
529             public void combine(ReducingSink other) {
530                 if (!other.empty)
531                     accept(other.state);
532             }
533         }
534         return new ReduceOp<Long, OptionalLong, ReducingSink>(StreamShape.LONG_VALUE) {
535             @Override
536             public ReducingSink makeSink() {
537                 return new ReducingSink();
538             }
539         };
540     }
541
542     /**
543      * Constructs a {@code TerminalOp} that implements a mutable reduce on
544      * {@code long} values.
545      *
546      * @param <R> the type of the result
547      * @param supplier a factory to produce a new accumulator of the result type
548      * @param accumulator a function to incorporate an int into an
549      *        accumulator
550      * @param combiner a function to combine an accumulator into another
551      * @return a {@code TerminalOp} implementing the reduction
552      */

553     public static <R> TerminalOp<Long, R>
554     makeLong(Supplier<R> supplier,
555              ObjLongConsumer<R> accumulator,
556              BinaryOperator<R> combiner) {
557         Objects.requireNonNull(supplier);
558         Objects.requireNonNull(accumulator);
559         Objects.requireNonNull(combiner);
560         class ReducingSink extends Box<R>
561                 implements AccumulatingSink<Long, R, ReducingSink>, Sink.OfLong {
562             @Override
563             public void begin(long size) {
564                 state = supplier.get();
565             }
566
567             @Override
568             public void accept(long t) {
569                 accumulator.accept(state, t);
570             }
571
572             @Override
573             public void combine(ReducingSink other) {
574                 state = combiner.apply(state, other.state);
575             }
576         }
577         return new ReduceOp<Long, R, ReducingSink>(StreamShape.LONG_VALUE) {
578             @Override
579             public ReducingSink makeSink() {
580                 return new ReducingSink();
581             }
582         };
583     }
584
585     /**
586      * Constructs a {@code TerminalOp} that counts the number of stream
587      * elements.  If the size of the pipeline is known then count is the size
588      * and there is no need to evaluate the pipeline.  If the size of the
589      * pipeline is non known then count is produced, via reduction, using a
590      * {@link CountingSink}.
591      *
592      * @return a {@code TerminalOp} implementing the counting
593      */

594     public static TerminalOp<Long, Long>
595     makeLongCounting() {
596         return new ReduceOp<Long, Long, CountingSink<Long>>(StreamShape.LONG_VALUE) {
597             @Override
598             public CountingSink<Long> makeSink() { return new CountingSink.OfLong(); }
599
600             @Override
601             public <P_IN> Long evaluateSequential(PipelineHelper<Long> helper,
602                                                   Spliterator<P_IN> spliterator) {
603                 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags()))
604                     return spliterator.getExactSizeIfKnown();
605                 return super.evaluateSequential(helper, spliterator);
606             }
607
608             @Override
609             public <P_IN> Long evaluateParallel(PipelineHelper<Long> helper,
610                                                 Spliterator<P_IN> spliterator) {
611                 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags()))
612                     return spliterator.getExactSizeIfKnown();
613                 return super.evaluateParallel(helper, spliterator);
614             }
615
616             @Override
617             public int getOpFlags() {
618                 return StreamOpFlag.NOT_ORDERED;
619             }
620         };
621     }
622
623     /**
624      * Constructs a {@code TerminalOp} that implements a functional reduce on
625      * {@code double} values.
626      *
627      * @param identity the identity for the combining function
628      * @param operator the combining function
629      * @return a {@code TerminalOp} implementing the reduction
630      */

631     public static TerminalOp<Double, Double>
632     makeDouble(double identity, DoubleBinaryOperator operator) {
633         Objects.requireNonNull(operator);
634         class ReducingSink
635                 implements AccumulatingSink<Double, Double, ReducingSink>, Sink.OfDouble {
636             private double state;
637
638             @Override
639             public void begin(long size) {
640                 state = identity;
641             }
642
643             @Override
644             public void accept(double t) {
645                 state = operator.applyAsDouble(state, t);
646             }
647
648             @Override
649             public Double get() {
650                 return state;
651             }
652
653             @Override
654             public void combine(ReducingSink other) {
655                 accept(other.state);
656             }
657         }
658         return new ReduceOp<Double, Double, ReducingSink>(StreamShape.DOUBLE_VALUE) {
659             @Override
660             public ReducingSink makeSink() {
661                 return new ReducingSink();
662             }
663         };
664     }
665
666     /**
667      * Constructs a {@code TerminalOp} that implements a functional reduce on
668      * {@code double} values, producing an optional double result.
669      *
670      * @param operator the combining function
671      * @return a {@code TerminalOp} implementing the reduction
672      */

673     public static TerminalOp<Double, OptionalDouble>
674     makeDouble(DoubleBinaryOperator operator) {
675         Objects.requireNonNull(operator);
676         class ReducingSink
677                 implements AccumulatingSink<Double, OptionalDouble, ReducingSink>, Sink.OfDouble {
678             private boolean empty;
679             private double state;
680
681             public void begin(long size) {
682                 empty = true;
683                 state = 0;
684             }
685
686             @Override
687             public void accept(double t) {
688                 if (empty) {
689                     empty = false;
690                     state = t;
691                 }
692                 else {
693                     state = operator.applyAsDouble(state, t);
694                 }
695             }
696
697             @Override
698             public OptionalDouble get() {
699                 return empty ? OptionalDouble.empty() : OptionalDouble.of(state);
700             }
701
702             @Override
703             public void combine(ReducingSink other) {
704                 if (!other.empty)
705                     accept(other.state);
706             }
707         }
708         return new ReduceOp<Double, OptionalDouble, ReducingSink>(StreamShape.DOUBLE_VALUE) {
709             @Override
710             public ReducingSink makeSink() {
711                 return new ReducingSink();
712             }
713         };
714     }
715
716     /**
717      * Constructs a {@code TerminalOp} that implements a mutable reduce on
718      * {@code double} values.
719      *
720      * @param <R> the type of the result
721      * @param supplier a factory to produce a new accumulator of the result type
722      * @param accumulator a function to incorporate an int into an
723      *        accumulator
724      * @param combiner a function to combine an accumulator into another
725      * @return a {@code TerminalOp} implementing the reduction
726      */

727     public static <R> TerminalOp<Double, R>
728     makeDouble(Supplier<R> supplier,
729                ObjDoubleConsumer<R> accumulator,
730                BinaryOperator<R> combiner) {
731         Objects.requireNonNull(supplier);
732         Objects.requireNonNull(accumulator);
733         Objects.requireNonNull(combiner);
734         class ReducingSink extends Box<R>
735                 implements AccumulatingSink<Double, R, ReducingSink>, Sink.OfDouble {
736             @Override
737             public void begin(long size) {
738                 state = supplier.get();
739             }
740
741             @Override
742             public void accept(double t) {
743                 accumulator.accept(state, t);
744             }
745
746             @Override
747             public void combine(ReducingSink other) {
748                 state = combiner.apply(state, other.state);
749             }
750         }
751         return new ReduceOp<Double, R, ReducingSink>(StreamShape.DOUBLE_VALUE) {
752             @Override
753             public ReducingSink makeSink() {
754                 return new ReducingSink();
755             }
756         };
757     }
758
759     /**
760      * Constructs a {@code TerminalOp} that counts the number of stream
761      * elements.  If the size of the pipeline is known then count is the size
762      * and there is no need to evaluate the pipeline.  If the size of the
763      * pipeline is non known then count is produced, via reduction, using a
764      * {@link CountingSink}.
765      *
766      * @return a {@code TerminalOp} implementing the counting
767      */

768     public static TerminalOp<Double, Long>
769     makeDoubleCounting() {
770         return new ReduceOp<Double, Long, CountingSink<Double>>(StreamShape.DOUBLE_VALUE) {
771             @Override
772             public CountingSink<Double> makeSink() { return new CountingSink.OfDouble(); }
773
774             @Override
775             public <P_IN> Long evaluateSequential(PipelineHelper<Double> helper,
776                                                   Spliterator<P_IN> spliterator) {
777                 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags()))
778                     return spliterator.getExactSizeIfKnown();
779                 return super.evaluateSequential(helper, spliterator);
780             }
781
782             @Override
783             public <P_IN> Long evaluateParallel(PipelineHelper<Double> helper,
784                                                 Spliterator<P_IN> spliterator) {
785                 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags()))
786                     return spliterator.getExactSizeIfKnown();
787                 return super.evaluateParallel(helper, spliterator);
788             }
789
790             @Override
791             public int getOpFlags() {
792                 return StreamOpFlag.NOT_ORDERED;
793             }
794         };
795     }
796
797     /**
798      * A sink that counts elements
799      */

800     abstract static class CountingSink<T>
801             extends Box<Long>
802             implements AccumulatingSink<T, Long, CountingSink<T>> {
803         long count;
804
805         @Override
806         public void begin(long size) {
807             count = 0L;
808         }
809
810         @Override
811         public Long get() {
812             return count;
813         }
814
815         @Override
816         public void combine(CountingSink<T> other) {
817             count += other.count;
818         }
819
820         static final class OfRef<T> extends CountingSink<T> {
821             @Override
822             public void accept(T t) {
823                 count++;
824             }
825         }
826
827         static final class OfInt extends CountingSink<Integer> implements Sink.OfInt {
828             @Override
829             public void accept(int t) {
830                 count++;
831             }
832         }
833
834         static final class OfLong extends CountingSink<Long> implements Sink.OfLong {
835             @Override
836             public void accept(long t) {
837                 count++;
838             }
839         }
840
841         static final class OfDouble extends CountingSink<Double> implements Sink.OfDouble {
842             @Override
843             public void accept(double t) {
844                 count++;
845             }
846         }
847     }
848
849     /**
850      * A type of {@code TerminalSink} that implements an associative reducing
851      * operation on elements of type {@code T} and producing a result of type
852      * {@code R}.
853      *
854      * @param <T> the type of input element to the combining operation
855      * @param <R> the result type
856      * @param <K> the type of the {@code AccumulatingSink}.
857      */

858     private interface AccumulatingSink<T, R, K extends AccumulatingSink<T, R, K>>
859             extends TerminalSink<T, R> {
860         void combine(K other);
861     }
862
863     /**
864      * State box for a single state element, used as a base class for
865      * {@code AccumulatingSink} instances
866      *
867      * @param <U> The type of the state element
868      */

869     private abstract static class Box<U> {
870         U state;
871
872         Box() {} // Avoid creation of special accessor
873
874         public U get() {
875             return state;
876         }
877     }
878
879     /**
880      * A {@code TerminalOp} that evaluates a stream pipeline and sends the
881      * output into an {@code AccumulatingSink}, which performs a reduce
882      * operation. The {@code AccumulatingSink} must represent an associative
883      * reducing operation.
884      *
885      * @param <T> the output type of the stream pipeline
886      * @param <R> the result type of the reducing operation
887      * @param <S> the type of the {@code AccumulatingSink}
888      */

889     private abstract static class ReduceOp<T, R, S extends AccumulatingSink<T, R, S>>
890             implements TerminalOp<T, R> {
891         private final StreamShape inputShape;
892
893         /**
894          * Create a {@code ReduceOp} of the specified stream shape which uses
895          * the specified {@code Supplier} to create accumulating sinks.
896          *
897          * @param shape The shape of the stream pipeline
898          */

899         ReduceOp(StreamShape shape) {
900             inputShape = shape;
901         }
902
903         public abstract S makeSink();
904
905         @Override
906         public StreamShape inputShape() {
907             return inputShape;
908         }
909
910         @Override
911         public <P_IN> R evaluateSequential(PipelineHelper<T> helper,
912                                            Spliterator<P_IN> spliterator) {
913             return helper.wrapAndCopyInto(makeSink(), spliterator).get();
914         }
915
916         @Override
917         public <P_IN> R evaluateParallel(PipelineHelper<T> helper,
918                                          Spliterator<P_IN> spliterator) {
919             return new ReduceTask<>(this, helper, spliterator).invoke().get();
920         }
921     }
922
923     /**
924      * A {@code ForkJoinTask} for performing a parallel reduce operation.
925      */

926     @SuppressWarnings("serial")
927     private static final class ReduceTask<P_IN, P_OUT, R,
928                                           S extends AccumulatingSink<P_OUT, R, S>>
929             extends AbstractTask<P_IN, P_OUT, S, ReduceTask<P_IN, P_OUT, R, S>> {
930         private final ReduceOp<P_OUT, R, S> op;
931
932         ReduceTask(ReduceOp<P_OUT, R, S> op,
933                    PipelineHelper<P_OUT> helper,
934                    Spliterator<P_IN> spliterator) {
935             super(helper, spliterator);
936             this.op = op;
937         }
938
939         ReduceTask(ReduceTask<P_IN, P_OUT, R, S> parent,
940                    Spliterator<P_IN> spliterator) {
941             super(parent, spliterator);
942             this.op = parent.op;
943         }
944
945         @Override
946         protected ReduceTask<P_IN, P_OUT, R, S> makeChild(Spliterator<P_IN> spliterator) {
947             return new ReduceTask<>(this, spliterator);
948         }
949
950         @Override
951         protected S doLeaf() {
952             return helper.wrapAndCopyInto(op.makeSink(), spliterator);
953         }
954
955         @Override
956         public void onCompletion(CountedCompleter<?> caller) {
957             if (!isLeaf()) {
958                 S leftResult = leftChild.getLocalResult();
959                 leftResult.combine(rightChild.getLocalResult());
960                 setLocalResult(leftResult);
961             }
962             // GC spliterator, left and right child
963             super.onCompletion(caller);
964         }
965     }
966 }
967