1 /*
2  * Copyright (c) 2012, 2017, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.  Oracle designates this
8  * particular file as subject to the "Classpath" exception as provided
9  * by Oracle in the LICENSE file that accompanied this code.
10  *
11  * This code is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14  * version 2 for more details (a copy is included in the LICENSE file that
15  * accompanied this code).
16  *
17  * You should have received a copy of the GNU General Public License version
18  * 2 along with this work; if not, write to the Free Software Foundation,
19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20  *
21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22  * or visit www.oracle.com if you need additional information or have any
23  * questions.
24  */

25 package java.util.stream;
26
27 import java.util.Comparator;
28 import java.util.Iterator;
29 import java.util.Objects;
30 import java.util.Optional;
31 import java.util.Spliterator;
32 import java.util.Spliterators;
33 import java.util.function.BiConsumer;
34 import java.util.function.BiFunction;
35 import java.util.function.BinaryOperator;
36 import java.util.function.Consumer;
37 import java.util.function.DoubleConsumer;
38 import java.util.function.Function;
39 import java.util.function.IntConsumer;
40 import java.util.function.IntFunction;
41 import java.util.function.LongConsumer;
42 import java.util.function.Predicate;
43 import java.util.function.Supplier;
44 import java.util.function.ToDoubleFunction;
45 import java.util.function.ToIntFunction;
46 import java.util.function.ToLongFunction;
47
48 /**
49  * Abstract base class for an intermediate pipeline stage or pipeline source
50  * stage implementing whose elements are of type {@code U}.
51  *
52  * @param <P_IN> type of elements in the upstream source
53  * @param <P_OUT> type of elements in produced by this stage
54  *
55  * @since 1.8
56  */

57 abstract class ReferencePipeline<P_IN, P_OUT>
58         extends AbstractPipeline<P_IN, P_OUT, Stream<P_OUT>>
59         implements Stream<P_OUT>  {
60
61     /**
62      * Constructor for the head of a stream pipeline.
63      *
64      * @param source {@code Supplier<Spliterator>} describing the stream source
65      * @param sourceFlags the source flags for the stream source, described in
66      *        {@link StreamOpFlag}
67      * @param parallel {@code trueif the pipeline is parallel
68      */

69     ReferencePipeline(Supplier<? extends Spliterator<?>> source,
70                       int sourceFlags, boolean parallel) {
71         super(source, sourceFlags, parallel);
72     }
73
74     /**
75      * Constructor for the head of a stream pipeline.
76      *
77      * @param source {@code Spliterator} describing the stream source
78      * @param sourceFlags The source flags for the stream source, described in
79      *        {@link StreamOpFlag}
80      * @param parallel {@code trueif the pipeline is parallel
81      */

82     ReferencePipeline(Spliterator<?> source,
83                       int sourceFlags, boolean parallel) {
84         super(source, sourceFlags, parallel);
85     }
86
87     /**
88      * Constructor for appending an intermediate operation onto an existing
89      * pipeline.
90      *
91      * @param upstream the upstream element source.
92      */

93     ReferencePipeline(AbstractPipeline<?, P_IN, ?> upstream, int opFlags) {
94         super(upstream, opFlags);
95     }
96
97     // Shape-specific methods
98
99     @Override
100     final StreamShape getOutputShape() {
101         return StreamShape.REFERENCE;
102     }
103
104     @Override
105     final <P_IN> Node<P_OUT> evaluateToNode(PipelineHelper<P_OUT> helper,
106                                         Spliterator<P_IN> spliterator,
107                                         boolean flattenTree,
108                                         IntFunction<P_OUT[]> generator) {
109         return Nodes.collect(helper, spliterator, flattenTree, generator);
110     }
111
112     @Override
113     final <P_IN> Spliterator<P_OUT> wrap(PipelineHelper<P_OUT> ph,
114                                      Supplier<Spliterator<P_IN>> supplier,
115                                      boolean isParallel) {
116         return new StreamSpliterators.WrappingSpliterator<>(ph, supplier, isParallel);
117     }
118
119     @Override
120     final Spliterator<P_OUT> lazySpliterator(Supplier<? extends Spliterator<P_OUT>> supplier) {
121         return new StreamSpliterators.DelegatingSpliterator<>(supplier);
122     }
123
124     @Override
125     final boolean forEachWithCancel(Spliterator<P_OUT> spliterator, Sink<P_OUT> sink) {
126         boolean cancelled;
127         do { } while (!(cancelled = sink.cancellationRequested()) && spliterator.tryAdvance(sink));
128         return cancelled;
129     }
130
131     @Override
132     final Node.Builder<P_OUT> makeNodeBuilder(long exactSizeIfKnown, IntFunction<P_OUT[]> generator) {
133         return Nodes.builder(exactSizeIfKnown, generator);
134     }
135
136
137     // BaseStream
138
139     @Override
140     public final Iterator<P_OUT> iterator() {
141         return Spliterators.iterator(spliterator());
142     }
143
144
145     // Stream
146
147     // Stateless intermediate operations from Stream
148
149     @Override
150     public Stream<P_OUT> unordered() {
151         if (!isOrdered())
152             return this;
153         return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_ORDERED) {
154             @Override
155             Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
156                 return sink;
157             }
158         };
159     }
160
161     @Override
162     public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
163         Objects.requireNonNull(predicate);
164         return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
165                                      StreamOpFlag.NOT_SIZED) {
166             @Override
167             Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
168                 return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
169                     @Override
170                     public void begin(long size) {
171                         downstream.begin(-1);
172                     }
173
174                     @Override
175                     public void accept(P_OUT u) {
176                         if (predicate.test(u))
177                             downstream.accept(u);
178                     }
179                 };
180             }
181         };
182     }
183
184     @Override
185     @SuppressWarnings("unchecked")
186     public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
187         Objects.requireNonNull(mapper);
188         return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
189                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
190             @Override
191             Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
192                 return new Sink.ChainedReference<P_OUT, R>(sink) {
193                     @Override
194                     public void accept(P_OUT u) {
195                         downstream.accept(mapper.apply(u));
196                     }
197                 };
198             }
199         };
200     }
201
202     @Override
203     public final IntStream mapToInt(ToIntFunction<? super P_OUT> mapper) {
204         Objects.requireNonNull(mapper);
205         return new IntPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
206                                               StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
207             @Override
208             Sink<P_OUT> opWrapSink(int flags, Sink<Integer> sink) {
209                 return new Sink.ChainedReference<P_OUT, Integer>(sink) {
210                     @Override
211                     public void accept(P_OUT u) {
212                         downstream.accept(mapper.applyAsInt(u));
213                     }
214                 };
215             }
216         };
217     }
218
219     @Override
220     public final LongStream mapToLong(ToLongFunction<? super P_OUT> mapper) {
221         Objects.requireNonNull(mapper);
222         return new LongPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
223                                       StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
224             @Override
225             Sink<P_OUT> opWrapSink(int flags, Sink<Long> sink) {
226                 return new Sink.ChainedReference<P_OUT, Long>(sink) {
227                     @Override
228                     public void accept(P_OUT u) {
229                         downstream.accept(mapper.applyAsLong(u));
230                     }
231                 };
232             }
233         };
234     }
235
236     @Override
237     public final DoubleStream mapToDouble(ToDoubleFunction<? super P_OUT> mapper) {
238         Objects.requireNonNull(mapper);
239         return new DoublePipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
240                                         StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
241             @Override
242             Sink<P_OUT> opWrapSink(int flags, Sink<Double> sink) {
243                 return new Sink.ChainedReference<P_OUT, Double>(sink) {
244                     @Override
245                     public void accept(P_OUT u) {
246                         downstream.accept(mapper.applyAsDouble(u));
247                     }
248                 };
249             }
250         };
251     }
252
253     @Override
254     public final <R> Stream<R> flatMap(Function<? super P_OUT, ? extends Stream<? extends R>> mapper) {
255         Objects.requireNonNull(mapper);
256         return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
257                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
258             @Override
259             Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
260                 return new Sink.ChainedReference<P_OUT, R>(sink) {
261                     // true if cancellationRequested() has been called
262                     boolean cancellationRequestedCalled;
263
264                     @Override
265                     public void begin(long size) {
266                         downstream.begin(-1);
267                     }
268
269                     @Override
270                     public void accept(P_OUT u) {
271                         try (Stream<? extends R> result = mapper.apply(u)) {
272                             if (result != null) {
273                                 if (!cancellationRequestedCalled) {
274                                     result.sequential().forEach(downstream);
275                                 }
276                                 else {
277                                     var s = result.sequential().spliterator();
278                                     do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstream));
279                                 }
280                             }
281                         }
282                     }
283
284                     @Override
285                     public boolean cancellationRequested() {
286                         // If this method is called then an operation within the stream
287                         // pipeline is short-circuiting (see AbstractPipeline.copyInto).
288                         // Note that we cannot differentiate between an upstream or
289                         // downstream operation
290                         cancellationRequestedCalled = true;
291                         return downstream.cancellationRequested();
292                     }
293                 };
294             }
295         };
296     }
297
298     @Override
299     public final IntStream flatMapToInt(Function<? super P_OUT, ? extends IntStream> mapper) {
300         Objects.requireNonNull(mapper);
301         return new IntPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
302                                               StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
303             @Override
304             Sink<P_OUT> opWrapSink(int flags, Sink<Integer> sink) {
305                 return new Sink.ChainedReference<P_OUT, Integer>(sink) {
306                     // true if cancellationRequested() has been called
307                     boolean cancellationRequestedCalled;
308
309                     // cache the consumer to avoid creation on every accepted element
310                     IntConsumer downstreamAsInt = downstream::accept;
311
312                     @Override
313                     public void begin(long size) {
314                         downstream.begin(-1);
315                     }
316
317                     @Override
318                     public void accept(P_OUT u) {
319                         try (IntStream result = mapper.apply(u)) {
320                             if (result != null) {
321                                 if (!cancellationRequestedCalled) {
322                                     result.sequential().forEach(downstreamAsInt);
323                                 }
324                                 else {
325                                     var s = result.sequential().spliterator();
326                                     do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstreamAsInt));
327                                 }
328                             }
329                         }
330                     }
331
332                     @Override
333                     public boolean cancellationRequested() {
334                         cancellationRequestedCalled = true;
335                         return downstream.cancellationRequested();
336                     }
337                 };
338             }
339         };
340     }
341
342     @Override
343     public final DoubleStream flatMapToDouble(Function<? super P_OUT, ? extends DoubleStream> mapper) {
344         Objects.requireNonNull(mapper);
345         return new DoublePipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
346                                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
347             @Override
348             Sink<P_OUT> opWrapSink(int flags, Sink<Double> sink) {
349                 return new Sink.ChainedReference<P_OUT, Double>(sink) {
350                     // true if cancellationRequested() has been called
351                     boolean cancellationRequestedCalled;
352
353                     // cache the consumer to avoid creation on every accepted element
354                     DoubleConsumer downstreamAsDouble = downstream::accept;
355
356                     @Override
357                     public void begin(long size) {
358                         downstream.begin(-1);
359                     }
360
361                     @Override
362                     public void accept(P_OUT u) {
363                         try (DoubleStream result = mapper.apply(u)) {
364                             if (result != null) {
365                                 if (!cancellationRequestedCalled) {
366                                     result.sequential().forEach(downstreamAsDouble);
367                                 }
368                                 else {
369                                     var s = result.sequential().spliterator();
370                                     do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstreamAsDouble));
371                                 }
372                             }
373                         }
374                     }
375
376                     @Override
377                     public boolean cancellationRequested() {
378                         cancellationRequestedCalled = true;
379                         return downstream.cancellationRequested();
380                     }
381                 };
382             }
383         };
384     }
385
386     @Override
387     public final LongStream flatMapToLong(Function<? super P_OUT, ? extends LongStream> mapper) {
388         Objects.requireNonNull(mapper);
389         // We can do better than this, by polling cancellationRequested when stream is infinite
390         return new LongPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
391                                                    StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
392             @Override
393             Sink<P_OUT> opWrapSink(int flags, Sink<Long> sink) {
394                 return new Sink.ChainedReference<P_OUT, Long>(sink) {
395                     // true if cancellationRequested() has been called
396                     boolean cancellationRequestedCalled;
397
398                     // cache the consumer to avoid creation on every accepted element
399                     LongConsumer downstreamAsLong = downstream::accept;
400
401                     @Override
402                     public void begin(long size) {
403                         downstream.begin(-1);
404                     }
405
406                     @Override
407                     public void accept(P_OUT u) {
408                         try (LongStream result = mapper.apply(u)) {
409                             if (result != null) {
410                                 if (!cancellationRequestedCalled) {
411                                     result.sequential().forEach(downstreamAsLong);
412                                 }
413                                 else {
414                                     var s = result.sequential().spliterator();
415                                     do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstreamAsLong));
416                                 }
417                             }
418                         }
419                     }
420
421                     @Override
422                     public boolean cancellationRequested() {
423                         cancellationRequestedCalled = true;
424                         return downstream.cancellationRequested();
425                     }
426                 };
427             }
428         };
429     }
430
431     @Override
432     public final Stream<P_OUT> peek(Consumer<? super P_OUT> action) {
433         Objects.requireNonNull(action);
434         return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
435                                      0) {
436             @Override
437             Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
438                 return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
439                     @Override
440                     public void accept(P_OUT u) {
441                         action.accept(u);
442                         downstream.accept(u);
443                     }
444                 };
445             }
446         };
447     }
448
449     // Stateful intermediate operations from Stream
450
451     @Override
452     public final Stream<P_OUT> distinct() {
453         return DistinctOps.makeRef(this);
454     }
455
456     @Override
457     public final Stream<P_OUT> sorted() {
458         return SortedOps.makeRef(this);
459     }
460
461     @Override
462     public final Stream<P_OUT> sorted(Comparator<? super P_OUT> comparator) {
463         return SortedOps.makeRef(this, comparator);
464     }
465
466     @Override
467     public final Stream<P_OUT> limit(long maxSize) {
468         if (maxSize < 0)
469             throw new IllegalArgumentException(Long.toString(maxSize));
470         return SliceOps.makeRef(this, 0, maxSize);
471     }
472
473     @Override
474     public final Stream<P_OUT> skip(long n) {
475         if (n < 0)
476             throw new IllegalArgumentException(Long.toString(n));
477         if (n == 0)
478             return this;
479         else
480             return SliceOps.makeRef(this, n, -1);
481     }
482
483     @Override
484     public final Stream<P_OUT> takeWhile(Predicate<? super P_OUT> predicate) {
485         return WhileOps.makeTakeWhileRef(this, predicate);
486     }
487
488     @Override
489     public final Stream<P_OUT> dropWhile(Predicate<? super P_OUT> predicate) {
490         return WhileOps.makeDropWhileRef(this, predicate);
491     }
492
493     // Terminal operations from Stream
494
495     @Override
496     public void forEach(Consumer<? super P_OUT> action) {
497         evaluate(ForEachOps.makeRef(action, false));
498     }
499
500     @Override
501     public void forEachOrdered(Consumer<? super P_OUT> action) {
502         evaluate(ForEachOps.makeRef(action, true));
503     }
504
505     @Override
506     @SuppressWarnings("unchecked")
507     public final <A> A[] toArray(IntFunction<A[]> generator) {
508         // Since A has no relation to U (not possible to declare that A is an upper bound of U)
509         // there will be no static type checking.
510         // Therefore use a raw type and assume A == U rather than propagating the separation of A and U
511         // throughout the code-base.
512         // The runtime type of U is never checked for equality with the component type of the runtime type of A[].
513         // Runtime checking will be performed when an element is stored in A[], thus if A is not a
514         // super type of U an ArrayStoreException will be thrown.
515         @SuppressWarnings("rawtypes")
516         IntFunction rawGenerator = (IntFunction) generator;
517         return (A[]) Nodes.flatten(evaluateToArrayNode(rawGenerator), rawGenerator)
518                               .asArray(rawGenerator);
519     }
520
521     @Override
522     public final Object[] toArray() {
523         return toArray(Object[]::new);
524     }
525
526     @Override
527     public final boolean anyMatch(Predicate<? super P_OUT> predicate) {
528         return evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.ANY));
529     }
530
531     @Override
532     public final boolean allMatch(Predicate<? super P_OUT> predicate) {
533         return evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.ALL));
534     }
535
536     @Override
537     public final boolean noneMatch(Predicate<? super P_OUT> predicate) {
538         return evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.NONE));
539     }
540
541     @Override
542     public final Optional<P_OUT> findFirst() {
543         return evaluate(FindOps.makeRef(true));
544     }
545
546     @Override
547     public final Optional<P_OUT> findAny() {
548         return evaluate(FindOps.makeRef(false));
549     }
550
551     @Override
552     public final P_OUT reduce(final P_OUT identity, final BinaryOperator<P_OUT> accumulator) {
553         return evaluate(ReduceOps.makeRef(identity, accumulator, accumulator));
554     }
555
556     @Override
557     public final Optional<P_OUT> reduce(BinaryOperator<P_OUT> accumulator) {
558         return evaluate(ReduceOps.makeRef(accumulator));
559     }
560
561     @Override
562     public final <R> R reduce(R identity, BiFunction<R, ? super P_OUT, R> accumulator, BinaryOperator<R> combiner) {
563         return evaluate(ReduceOps.makeRef(identity, accumulator, combiner));
564     }
565
566     @Override
567     @SuppressWarnings("unchecked")
568     public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {
569         A container;
570         if (isParallel()
571                 && (collector.characteristics().contains(Collector.Characteristics.CONCURRENT))
572                 && (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) {
573             container = collector.supplier().get();
574             BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator();
575             forEach(u -> accumulator.accept(container, u));
576         }
577         else {
578             container = evaluate(ReduceOps.makeRef(collector));
579         }
580         return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
581                ? (R) container
582                : collector.finisher().apply(container);
583     }
584
585     @Override
586     public final <R> R collect(Supplier<R> supplier,
587                                BiConsumer<R, ? super P_OUT> accumulator,
588                                BiConsumer<R, R> combiner) {
589         return evaluate(ReduceOps.makeRef(supplier, accumulator, combiner));
590     }
591
592     @Override
593     public final Optional<P_OUT> max(Comparator<? super P_OUT> comparator) {
594         return reduce(BinaryOperator.maxBy(comparator));
595     }
596
597     @Override
598     public final Optional<P_OUT> min(Comparator<? super P_OUT> comparator) {
599         return reduce(BinaryOperator.minBy(comparator));
600
601     }
602
603     @Override
604     public final long count() {
605         return evaluate(ReduceOps.makeRefCounting());
606     }
607
608     //
609
610     /**
611      * Source stage of a ReferencePipeline.
612      *
613      * @param <E_IN> type of elements in the upstream source
614      * @param <E_OUT> type of elements in produced by this stage
615      * @since 1.8
616      */

617     static class Head<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> {
618         /**
619          * Constructor for the source stage of a Stream.
620          *
621          * @param source {@code Supplier<Spliterator>} describing the stream
622          *               source
623          * @param sourceFlags the source flags for the stream source, described
624          *                    in {@link StreamOpFlag}
625          */

626         Head(Supplier<? extends Spliterator<?>> source,
627              int sourceFlags, boolean parallel) {
628             super(source, sourceFlags, parallel);
629         }
630
631         /**
632          * Constructor for the source stage of a Stream.
633          *
634          * @param source {@code Spliterator} describing the stream source
635          * @param sourceFlags the source flags for the stream source, described
636          *                    in {@link StreamOpFlag}
637          */

638         Head(Spliterator<?> source,
639              int sourceFlags, boolean parallel) {
640             super(source, sourceFlags, parallel);
641         }
642
643         @Override
644         final boolean opIsStateful() {
645             throw new UnsupportedOperationException();
646         }
647
648         @Override
649         final Sink<E_IN> opWrapSink(int flags, Sink<E_OUT> sink) {
650             throw new UnsupportedOperationException();
651         }
652
653         // Optimized sequential terminal operations for the head of the pipeline
654
655         @Override
656         public void forEach(Consumer<? super E_OUT> action) {
657             if (!isParallel()) {
658                 sourceStageSpliterator().forEachRemaining(action);
659             }
660             else {
661                 super.forEach(action);
662             }
663         }
664
665         @Override
666         public void forEachOrdered(Consumer<? super E_OUT> action) {
667             if (!isParallel()) {
668                 sourceStageSpliterator().forEachRemaining(action);
669             }
670             else {
671                 super.forEachOrdered(action);
672             }
673         }
674     }
675
676     /**
677      * Base class for a stateless intermediate stage of a Stream.
678      *
679      * @param <E_IN> type of elements in the upstream source
680      * @param <E_OUT> type of elements in produced by this stage
681      * @since 1.8
682      */

683     abstract static class StatelessOp<E_IN, E_OUT>
684             extends ReferencePipeline<E_IN, E_OUT> {
685         /**
686          * Construct a new Stream by appending a stateless intermediate
687          * operation to an existing stream.
688          *
689          * @param upstream The upstream pipeline stage
690          * @param inputShape The stream shape for the upstream pipeline stage
691          * @param opFlags Operation flags for the new stage
692          */

693         StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,
694                     StreamShape inputShape,
695                     int opFlags) {
696             super(upstream, opFlags);
697             assert upstream.getOutputShape() == inputShape;
698         }
699
700         @Override
701         final boolean opIsStateful() {
702             return false;
703         }
704     }
705
706     /**
707      * Base class for a stateful intermediate stage of a Stream.
708      *
709      * @param <E_IN> type of elements in the upstream source
710      * @param <E_OUT> type of elements in produced by this stage
711      * @since 1.8
712      */

713     abstract static class StatefulOp<E_IN, E_OUT>
714             extends ReferencePipeline<E_IN, E_OUT> {
715         /**
716          * Construct a new Stream by appending a stateful intermediate operation
717          * to an existing stream.
718          * @param upstream The upstream pipeline stage
719          * @param inputShape The stream shape for the upstream pipeline stage
720          * @param opFlags Operation flags for the new stage
721          */

722         StatefulOp(AbstractPipeline<?, E_IN, ?> upstream,
723                    StreamShape inputShape,
724                    int opFlags) {
725             super(upstream, opFlags);
726             assert upstream.getOutputShape() == inputShape;
727         }
728
729         @Override
730         final boolean opIsStateful() {
731             return true;
732         }
733
734         @Override
735         abstract <P_IN> Node<E_OUT> opEvaluateParallel(PipelineHelper<E_OUT> helper,
736                                                        Spliterator<P_IN> spliterator,
737                                                        IntFunction<E_OUT[]> generator);
738     }
739 }
740