1
25 package java.util.stream;
26
27 import java.util.Comparator;
28 import java.util.Iterator;
29 import java.util.Objects;
30 import java.util.Optional;
31 import java.util.Spliterator;
32 import java.util.Spliterators;
33 import java.util.function.BiConsumer;
34 import java.util.function.BiFunction;
35 import java.util.function.BinaryOperator;
36 import java.util.function.Consumer;
37 import java.util.function.DoubleConsumer;
38 import java.util.function.Function;
39 import java.util.function.IntConsumer;
40 import java.util.function.IntFunction;
41 import java.util.function.LongConsumer;
42 import java.util.function.Predicate;
43 import java.util.function.Supplier;
44 import java.util.function.ToDoubleFunction;
45 import java.util.function.ToIntFunction;
46 import java.util.function.ToLongFunction;
47
48
57 abstract class ReferencePipeline<P_IN, P_OUT>
58 extends AbstractPipeline<P_IN, P_OUT, Stream<P_OUT>>
59 implements Stream<P_OUT> {
60
61
69 ReferencePipeline(Supplier<? extends Spliterator<?>> source,
70 int sourceFlags, boolean parallel) {
71 super(source, sourceFlags, parallel);
72 }
73
74
82 ReferencePipeline(Spliterator<?> source,
83 int sourceFlags, boolean parallel) {
84 super(source, sourceFlags, parallel);
85 }
86
87
93 ReferencePipeline(AbstractPipeline<?, P_IN, ?> upstream, int opFlags) {
94 super(upstream, opFlags);
95 }
96
97
98
99 @Override
100 final StreamShape getOutputShape() {
101 return StreamShape.REFERENCE;
102 }
103
104 @Override
105 final <P_IN> Node<P_OUT> evaluateToNode(PipelineHelper<P_OUT> helper,
106 Spliterator<P_IN> spliterator,
107 boolean flattenTree,
108 IntFunction<P_OUT[]> generator) {
109 return Nodes.collect(helper, spliterator, flattenTree, generator);
110 }
111
112 @Override
113 final <P_IN> Spliterator<P_OUT> wrap(PipelineHelper<P_OUT> ph,
114 Supplier<Spliterator<P_IN>> supplier,
115 boolean isParallel) {
116 return new StreamSpliterators.WrappingSpliterator<>(ph, supplier, isParallel);
117 }
118
119 @Override
120 final Spliterator<P_OUT> lazySpliterator(Supplier<? extends Spliterator<P_OUT>> supplier) {
121 return new StreamSpliterators.DelegatingSpliterator<>(supplier);
122 }
123
124 @Override
125 final boolean forEachWithCancel(Spliterator<P_OUT> spliterator, Sink<P_OUT> sink) {
126 boolean cancelled;
127 do { } while (!(cancelled = sink.cancellationRequested()) && spliterator.tryAdvance(sink));
128 return cancelled;
129 }
130
131 @Override
132 final Node.Builder<P_OUT> makeNodeBuilder(long exactSizeIfKnown, IntFunction<P_OUT[]> generator) {
133 return Nodes.builder(exactSizeIfKnown, generator);
134 }
135
136
137
138
139 @Override
140 public final Iterator<P_OUT> iterator() {
141 return Spliterators.iterator(spliterator());
142 }
143
144
145
146
147
148
149 @Override
150 public Stream<P_OUT> unordered() {
151 if (!isOrdered())
152 return this;
153 return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_ORDERED) {
154 @Override
155 Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
156 return sink;
157 }
158 };
159 }
160
161 @Override
162 public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
163 Objects.requireNonNull(predicate);
164 return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
165 StreamOpFlag.NOT_SIZED) {
166 @Override
167 Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
168 return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
169 @Override
170 public void begin(long size) {
171 downstream.begin(-1);
172 }
173
174 @Override
175 public void accept(P_OUT u) {
176 if (predicate.test(u))
177 downstream.accept(u);
178 }
179 };
180 }
181 };
182 }
183
184 @Override
185 @SuppressWarnings("unchecked")
186 public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
187 Objects.requireNonNull(mapper);
188 return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
189 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
190 @Override
191 Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
192 return new Sink.ChainedReference<P_OUT, R>(sink) {
193 @Override
194 public void accept(P_OUT u) {
195 downstream.accept(mapper.apply(u));
196 }
197 };
198 }
199 };
200 }
201
202 @Override
203 public final IntStream mapToInt(ToIntFunction<? super P_OUT> mapper) {
204 Objects.requireNonNull(mapper);
205 return new IntPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
206 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
207 @Override
208 Sink<P_OUT> opWrapSink(int flags, Sink<Integer> sink) {
209 return new Sink.ChainedReference<P_OUT, Integer>(sink) {
210 @Override
211 public void accept(P_OUT u) {
212 downstream.accept(mapper.applyAsInt(u));
213 }
214 };
215 }
216 };
217 }
218
219 @Override
220 public final LongStream mapToLong(ToLongFunction<? super P_OUT> mapper) {
221 Objects.requireNonNull(mapper);
222 return new LongPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
223 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
224 @Override
225 Sink<P_OUT> opWrapSink(int flags, Sink<Long> sink) {
226 return new Sink.ChainedReference<P_OUT, Long>(sink) {
227 @Override
228 public void accept(P_OUT u) {
229 downstream.accept(mapper.applyAsLong(u));
230 }
231 };
232 }
233 };
234 }
235
236 @Override
237 public final DoubleStream mapToDouble(ToDoubleFunction<? super P_OUT> mapper) {
238 Objects.requireNonNull(mapper);
239 return new DoublePipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
240 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
241 @Override
242 Sink<P_OUT> opWrapSink(int flags, Sink<Double> sink) {
243 return new Sink.ChainedReference<P_OUT, Double>(sink) {
244 @Override
245 public void accept(P_OUT u) {
246 downstream.accept(mapper.applyAsDouble(u));
247 }
248 };
249 }
250 };
251 }
252
253 @Override
254 public final <R> Stream<R> flatMap(Function<? super P_OUT, ? extends Stream<? extends R>> mapper) {
255 Objects.requireNonNull(mapper);
256 return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
257 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
258 @Override
259 Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
260 return new Sink.ChainedReference<P_OUT, R>(sink) {
261
262 boolean cancellationRequestedCalled;
263
264 @Override
265 public void begin(long size) {
266 downstream.begin(-1);
267 }
268
269 @Override
270 public void accept(P_OUT u) {
271 try (Stream<? extends R> result = mapper.apply(u)) {
272 if (result != null) {
273 if (!cancellationRequestedCalled) {
274 result.sequential().forEach(downstream);
275 }
276 else {
277 var s = result.sequential().spliterator();
278 do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstream));
279 }
280 }
281 }
282 }
283
284 @Override
285 public boolean cancellationRequested() {
286
287
288
289
290 cancellationRequestedCalled = true;
291 return downstream.cancellationRequested();
292 }
293 };
294 }
295 };
296 }
297
298 @Override
299 public final IntStream flatMapToInt(Function<? super P_OUT, ? extends IntStream> mapper) {
300 Objects.requireNonNull(mapper);
301 return new IntPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
302 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
303 @Override
304 Sink<P_OUT> opWrapSink(int flags, Sink<Integer> sink) {
305 return new Sink.ChainedReference<P_OUT, Integer>(sink) {
306
307 boolean cancellationRequestedCalled;
308
309
310 IntConsumer downstreamAsInt = downstream::accept;
311
312 @Override
313 public void begin(long size) {
314 downstream.begin(-1);
315 }
316
317 @Override
318 public void accept(P_OUT u) {
319 try (IntStream result = mapper.apply(u)) {
320 if (result != null) {
321 if (!cancellationRequestedCalled) {
322 result.sequential().forEach(downstreamAsInt);
323 }
324 else {
325 var s = result.sequential().spliterator();
326 do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstreamAsInt));
327 }
328 }
329 }
330 }
331
332 @Override
333 public boolean cancellationRequested() {
334 cancellationRequestedCalled = true;
335 return downstream.cancellationRequested();
336 }
337 };
338 }
339 };
340 }
341
342 @Override
343 public final DoubleStream flatMapToDouble(Function<? super P_OUT, ? extends DoubleStream> mapper) {
344 Objects.requireNonNull(mapper);
345 return new DoublePipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
346 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
347 @Override
348 Sink<P_OUT> opWrapSink(int flags, Sink<Double> sink) {
349 return new Sink.ChainedReference<P_OUT, Double>(sink) {
350
351 boolean cancellationRequestedCalled;
352
353
354 DoubleConsumer downstreamAsDouble = downstream::accept;
355
356 @Override
357 public void begin(long size) {
358 downstream.begin(-1);
359 }
360
361 @Override
362 public void accept(P_OUT u) {
363 try (DoubleStream result = mapper.apply(u)) {
364 if (result != null) {
365 if (!cancellationRequestedCalled) {
366 result.sequential().forEach(downstreamAsDouble);
367 }
368 else {
369 var s = result.sequential().spliterator();
370 do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstreamAsDouble));
371 }
372 }
373 }
374 }
375
376 @Override
377 public boolean cancellationRequested() {
378 cancellationRequestedCalled = true;
379 return downstream.cancellationRequested();
380 }
381 };
382 }
383 };
384 }
385
386 @Override
387 public final LongStream flatMapToLong(Function<? super P_OUT, ? extends LongStream> mapper) {
388 Objects.requireNonNull(mapper);
389
390 return new LongPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
391 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
392 @Override
393 Sink<P_OUT> opWrapSink(int flags, Sink<Long> sink) {
394 return new Sink.ChainedReference<P_OUT, Long>(sink) {
395
396 boolean cancellationRequestedCalled;
397
398
399 LongConsumer downstreamAsLong = downstream::accept;
400
401 @Override
402 public void begin(long size) {
403 downstream.begin(-1);
404 }
405
406 @Override
407 public void accept(P_OUT u) {
408 try (LongStream result = mapper.apply(u)) {
409 if (result != null) {
410 if (!cancellationRequestedCalled) {
411 result.sequential().forEach(downstreamAsLong);
412 }
413 else {
414 var s = result.sequential().spliterator();
415 do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstreamAsLong));
416 }
417 }
418 }
419 }
420
421 @Override
422 public boolean cancellationRequested() {
423 cancellationRequestedCalled = true;
424 return downstream.cancellationRequested();
425 }
426 };
427 }
428 };
429 }
430
431 @Override
432 public final Stream<P_OUT> peek(Consumer<? super P_OUT> action) {
433 Objects.requireNonNull(action);
434 return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
435 0) {
436 @Override
437 Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
438 return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
439 @Override
440 public void accept(P_OUT u) {
441 action.accept(u);
442 downstream.accept(u);
443 }
444 };
445 }
446 };
447 }
448
449
450
451 @Override
452 public final Stream<P_OUT> distinct() {
453 return DistinctOps.makeRef(this);
454 }
455
456 @Override
457 public final Stream<P_OUT> sorted() {
458 return SortedOps.makeRef(this);
459 }
460
461 @Override
462 public final Stream<P_OUT> sorted(Comparator<? super P_OUT> comparator) {
463 return SortedOps.makeRef(this, comparator);
464 }
465
466 @Override
467 public final Stream<P_OUT> limit(long maxSize) {
468 if (maxSize < 0)
469 throw new IllegalArgumentException(Long.toString(maxSize));
470 return SliceOps.makeRef(this, 0, maxSize);
471 }
472
473 @Override
474 public final Stream<P_OUT> skip(long n) {
475 if (n < 0)
476 throw new IllegalArgumentException(Long.toString(n));
477 if (n == 0)
478 return this;
479 else
480 return SliceOps.makeRef(this, n, -1);
481 }
482
483 @Override
484 public final Stream<P_OUT> takeWhile(Predicate<? super P_OUT> predicate) {
485 return WhileOps.makeTakeWhileRef(this, predicate);
486 }
487
488 @Override
489 public final Stream<P_OUT> dropWhile(Predicate<? super P_OUT> predicate) {
490 return WhileOps.makeDropWhileRef(this, predicate);
491 }
492
493
494
495 @Override
496 public void forEach(Consumer<? super P_OUT> action) {
497 evaluate(ForEachOps.makeRef(action, false));
498 }
499
500 @Override
501 public void forEachOrdered(Consumer<? super P_OUT> action) {
502 evaluate(ForEachOps.makeRef(action, true));
503 }
504
505 @Override
506 @SuppressWarnings("unchecked")
507 public final <A> A[] toArray(IntFunction<A[]> generator) {
508
509
510
511
512
513
514
515 @SuppressWarnings("rawtypes")
516 IntFunction rawGenerator = (IntFunction) generator;
517 return (A[]) Nodes.flatten(evaluateToArrayNode(rawGenerator), rawGenerator)
518 .asArray(rawGenerator);
519 }
520
521 @Override
522 public final Object[] toArray() {
523 return toArray(Object[]::new);
524 }
525
526 @Override
527 public final boolean anyMatch(Predicate<? super P_OUT> predicate) {
528 return evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.ANY));
529 }
530
531 @Override
532 public final boolean allMatch(Predicate<? super P_OUT> predicate) {
533 return evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.ALL));
534 }
535
536 @Override
537 public final boolean noneMatch(Predicate<? super P_OUT> predicate) {
538 return evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.NONE));
539 }
540
541 @Override
542 public final Optional<P_OUT> findFirst() {
543 return evaluate(FindOps.makeRef(true));
544 }
545
546 @Override
547 public final Optional<P_OUT> findAny() {
548 return evaluate(FindOps.makeRef(false));
549 }
550
551 @Override
552 public final P_OUT reduce(final P_OUT identity, final BinaryOperator<P_OUT> accumulator) {
553 return evaluate(ReduceOps.makeRef(identity, accumulator, accumulator));
554 }
555
556 @Override
557 public final Optional<P_OUT> reduce(BinaryOperator<P_OUT> accumulator) {
558 return evaluate(ReduceOps.makeRef(accumulator));
559 }
560
561 @Override
562 public final <R> R reduce(R identity, BiFunction<R, ? super P_OUT, R> accumulator, BinaryOperator<R> combiner) {
563 return evaluate(ReduceOps.makeRef(identity, accumulator, combiner));
564 }
565
566 @Override
567 @SuppressWarnings("unchecked")
568 public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {
569 A container;
570 if (isParallel()
571 && (collector.characteristics().contains(Collector.Characteristics.CONCURRENT))
572 && (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) {
573 container = collector.supplier().get();
574 BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator();
575 forEach(u -> accumulator.accept(container, u));
576 }
577 else {
578 container = evaluate(ReduceOps.makeRef(collector));
579 }
580 return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
581 ? (R) container
582 : collector.finisher().apply(container);
583 }
584
585 @Override
586 public final <R> R collect(Supplier<R> supplier,
587 BiConsumer<R, ? super P_OUT> accumulator,
588 BiConsumer<R, R> combiner) {
589 return evaluate(ReduceOps.makeRef(supplier, accumulator, combiner));
590 }
591
592 @Override
593 public final Optional<P_OUT> max(Comparator<? super P_OUT> comparator) {
594 return reduce(BinaryOperator.maxBy(comparator));
595 }
596
597 @Override
598 public final Optional<P_OUT> min(Comparator<? super P_OUT> comparator) {
599 return reduce(BinaryOperator.minBy(comparator));
600
601 }
602
603 @Override
604 public final long count() {
605 return evaluate(ReduceOps.makeRefCounting());
606 }
607
608
609
610
617 static class Head<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> {
618
626 Head(Supplier<? extends Spliterator<?>> source,
627 int sourceFlags, boolean parallel) {
628 super(source, sourceFlags, parallel);
629 }
630
631
638 Head(Spliterator<?> source,
639 int sourceFlags, boolean parallel) {
640 super(source, sourceFlags, parallel);
641 }
642
643 @Override
644 final boolean opIsStateful() {
645 throw new UnsupportedOperationException();
646 }
647
648 @Override
649 final Sink<E_IN> opWrapSink(int flags, Sink<E_OUT> sink) {
650 throw new UnsupportedOperationException();
651 }
652
653
654
655 @Override
656 public void forEach(Consumer<? super E_OUT> action) {
657 if (!isParallel()) {
658 sourceStageSpliterator().forEachRemaining(action);
659 }
660 else {
661 super.forEach(action);
662 }
663 }
664
665 @Override
666 public void forEachOrdered(Consumer<? super E_OUT> action) {
667 if (!isParallel()) {
668 sourceStageSpliterator().forEachRemaining(action);
669 }
670 else {
671 super.forEachOrdered(action);
672 }
673 }
674 }
675
676
683 abstract static class StatelessOp<E_IN, E_OUT>
684 extends ReferencePipeline<E_IN, E_OUT> {
685
693 StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,
694 StreamShape inputShape,
695 int opFlags) {
696 super(upstream, opFlags);
697 assert upstream.getOutputShape() == inputShape;
698 }
699
700 @Override
701 final boolean opIsStateful() {
702 return false;
703 }
704 }
705
706
713 abstract static class StatefulOp<E_IN, E_OUT>
714 extends ReferencePipeline<E_IN, E_OUT> {
715
722 StatefulOp(AbstractPipeline<?, E_IN, ?> upstream,
723 StreamShape inputShape,
724 int opFlags) {
725 super(upstream, opFlags);
726 assert upstream.getOutputShape() == inputShape;
727 }
728
729 @Override
730 final boolean opIsStateful() {
731 return true;
732 }
733
734 @Override
735 abstract <P_IN> Node<E_OUT> opEvaluateParallel(PipelineHelper<E_OUT> helper,
736 Spliterator<P_IN> spliterator,
737 IntFunction<E_OUT[]> generator);
738 }
739 }
740