1 /* 2 * Copyright (c) 2013, 2020, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.util.stream; 26 27 import java.util.DoubleSummaryStatistics; 28 import java.util.Objects; 29 import java.util.OptionalDouble; 30 import java.util.PrimitiveIterator; 31 import java.util.Spliterator; 32 import java.util.Spliterators; 33 import java.util.function.BiConsumer; 34 import java.util.function.BinaryOperator; 35 import java.util.function.DoubleBinaryOperator; 36 import java.util.function.DoubleConsumer; 37 import java.util.function.DoubleFunction; 38 import java.util.function.DoublePredicate; 39 import java.util.function.DoubleToIntFunction; 40 import java.util.function.DoubleToLongFunction; 41 import java.util.function.DoubleUnaryOperator; 42 import java.util.function.IntFunction; 43 import java.util.function.ObjDoubleConsumer; 44 import java.util.function.Supplier; 45 46 /** 47 * Abstract base class for an intermediate pipeline stage or pipeline source 48 * stage implementing whose elements are of type {@code double}. 49 * 50 * @param <E_IN> type of elements in the upstream source 51 * 52 * @since 1.8 53 * @hide Visible for CTS testing only (OpenJDK8 tests). 54 */ 55 // Android-changed: Made public for CTS tests only. 56 public abstract class DoublePipeline<E_IN> 57 extends AbstractPipeline<E_IN, Double, DoubleStream> 58 implements DoubleStream { 59 60 /** 61 * Constructor for the head of a stream pipeline. 62 * 63 * @param source {@code Supplier<Spliterator>} describing the stream source 64 * @param sourceFlags the source flags for the stream source, described in 65 * {@link StreamOpFlag} 66 */ DoublePipeline(Supplier<? extends Spliterator<Double>> source, int sourceFlags, boolean parallel)67 DoublePipeline(Supplier<? extends Spliterator<Double>> source, 68 int sourceFlags, boolean parallel) { 69 super(source, sourceFlags, parallel); 70 } 71 72 /** 73 * Constructor for the head of a stream pipeline. 74 * 75 * @param source {@code Spliterator} describing the stream source 76 * @param sourceFlags the source flags for the stream source, described in 77 * {@link StreamOpFlag} 78 */ DoublePipeline(Spliterator<Double> source, int sourceFlags, boolean parallel)79 DoublePipeline(Spliterator<Double> source, 80 int sourceFlags, boolean parallel) { 81 super(source, sourceFlags, parallel); 82 } 83 84 /** 85 * Constructor for appending an intermediate operation onto an existing 86 * pipeline. 87 * 88 * @param upstream the upstream element source. 89 * @param opFlags the operation flags 90 */ DoublePipeline(AbstractPipeline<?, E_IN, ?> upstream, int opFlags)91 DoublePipeline(AbstractPipeline<?, E_IN, ?> upstream, int opFlags) { 92 super(upstream, opFlags); 93 } 94 95 /** 96 * Adapt a {@code Sink<Double> to a {@code DoubleConsumer}, ideally simply 97 * by casting. 98 */ adapt(Sink<Double> sink)99 private static DoubleConsumer adapt(Sink<Double> sink) { 100 if (sink instanceof DoubleConsumer) { 101 return (DoubleConsumer) sink; 102 } else { 103 if (Tripwire.ENABLED) 104 Tripwire.trip(AbstractPipeline.class, 105 "using DoubleStream.adapt(Sink<Double> s)"); 106 return sink::accept; 107 } 108 } 109 110 /** 111 * Adapt a {@code Spliterator<Double>} to a {@code Spliterator.OfDouble}. 112 * 113 * @implNote 114 * The implementation attempts to cast to a Spliterator.OfDouble, and throws 115 * an exception if this cast is not possible. 116 */ adapt(Spliterator<Double> s)117 private static Spliterator.OfDouble adapt(Spliterator<Double> s) { 118 if (s instanceof Spliterator.OfDouble) { 119 return (Spliterator.OfDouble) s; 120 } else { 121 if (Tripwire.ENABLED) 122 Tripwire.trip(AbstractPipeline.class, 123 "using DoubleStream.adapt(Spliterator<Double> s)"); 124 throw new UnsupportedOperationException("DoubleStream.adapt(Spliterator<Double> s)"); 125 } 126 } 127 128 129 // Shape-specific methods 130 131 @Override 132 // Android-changed: Make public, to match the method it's overriding. getOutputShape()133 public final StreamShape getOutputShape() { 134 return StreamShape.DOUBLE_VALUE; 135 } 136 137 @Override 138 // Android-changed: Make public, to match the method it's overriding. evaluateToNode(PipelineHelper<Double> helper, Spliterator<P_IN> spliterator, boolean flattenTree, IntFunction<Double[]> generator)139 public final <P_IN> Node<Double> evaluateToNode(PipelineHelper<Double> helper, 140 Spliterator<P_IN> spliterator, 141 boolean flattenTree, 142 IntFunction<Double[]> generator) { 143 return Nodes.collectDouble(helper, spliterator, flattenTree); 144 } 145 146 @Override 147 // Android-changed: Make public, to match the method it's overriding. wrap(PipelineHelper<Double> ph, Supplier<Spliterator<P_IN>> supplier, boolean isParallel)148 public final <P_IN> Spliterator<Double> wrap(PipelineHelper<Double> ph, 149 Supplier<Spliterator<P_IN>> supplier, 150 boolean isParallel) { 151 return new StreamSpliterators.DoubleWrappingSpliterator<>(ph, supplier, isParallel); 152 } 153 154 @Override 155 @SuppressWarnings("unchecked") 156 // Android-changed: Make public, to match the method it's overriding. lazySpliterator(Supplier<? extends Spliterator<Double>> supplier)157 public final Spliterator.OfDouble lazySpliterator(Supplier<? extends Spliterator<Double>> supplier) { 158 return new StreamSpliterators.DelegatingSpliterator.OfDouble((Supplier<Spliterator.OfDouble>) supplier); 159 } 160 161 @Override 162 // Android-changed: Make public, to match the method it's overriding. forEachWithCancel(Spliterator<Double> spliterator, Sink<Double> sink)163 public final boolean forEachWithCancel(Spliterator<Double> spliterator, Sink<Double> sink) { 164 Spliterator.OfDouble spl = adapt(spliterator); 165 DoubleConsumer adaptedSink = adapt(sink); 166 boolean cancelled; 167 do { } while (!(cancelled = sink.cancellationRequested()) && spl.tryAdvance(adaptedSink)); 168 return cancelled; 169 } 170 171 @Override 172 // Android-changed: Make public, to match the method it's overriding. makeNodeBuilder(long exactSizeIfKnown, IntFunction<Double[]> generator)173 public final Node.Builder<Double> makeNodeBuilder(long exactSizeIfKnown, IntFunction<Double[]> generator) { 174 return Nodes.doubleBuilder(exactSizeIfKnown); 175 } 176 mapToObj(DoubleFunction<? extends U> mapper, int opFlags)177 private <U> Stream<U> mapToObj(DoubleFunction<? extends U> mapper, int opFlags) { 178 return new ReferencePipeline.StatelessOp<Double, U>(this, StreamShape.DOUBLE_VALUE, opFlags) { 179 @Override 180 // Android-changed: Make public, to match the method it's overriding. 181 public Sink<Double> opWrapSink(int flags, Sink<U> sink) { 182 return new Sink.ChainedDouble<U>(sink) { 183 @Override 184 public void accept(double t) { 185 downstream.accept(mapper.apply(t)); 186 } 187 }; 188 } 189 }; 190 } 191 192 // DoubleStream 193 194 @Override 195 public final PrimitiveIterator.OfDouble iterator() { 196 return Spliterators.iterator(spliterator()); 197 } 198 199 @Override 200 public final Spliterator.OfDouble spliterator() { 201 return adapt(super.spliterator()); 202 } 203 204 // Stateless intermediate ops from DoubleStream 205 206 @Override 207 public final Stream<Double> boxed() { 208 return mapToObj(Double::valueOf, 0); 209 } 210 211 @Override 212 public final DoubleStream map(DoubleUnaryOperator mapper) { 213 Objects.requireNonNull(mapper); 214 return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE, 215 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 216 @Override 217 // Android-changed: Make public, to match the method it's overriding. 218 public Sink<Double> opWrapSink(int flags, Sink<Double> sink) { 219 return new Sink.ChainedDouble<Double>(sink) { 220 @Override 221 public void accept(double t) { 222 downstream.accept(mapper.applyAsDouble(t)); 223 } 224 }; 225 } 226 }; 227 } 228 229 @Override 230 public final <U> Stream<U> mapToObj(DoubleFunction<? extends U> mapper) { 231 Objects.requireNonNull(mapper); 232 return mapToObj(mapper, StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT); 233 } 234 235 @Override 236 public final IntStream mapToInt(DoubleToIntFunction mapper) { 237 Objects.requireNonNull(mapper); 238 return new IntPipeline.StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE, 239 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 240 @Override 241 // Android-changed: Make public, to match the method it's overriding. 242 public Sink<Double> opWrapSink(int flags, Sink<Integer> sink) { 243 return new Sink.ChainedDouble<Integer>(sink) { 244 @Override 245 public void accept(double t) { 246 downstream.accept(mapper.applyAsInt(t)); 247 } 248 }; 249 } 250 }; 251 } 252 253 @Override 254 public final LongStream mapToLong(DoubleToLongFunction mapper) { 255 Objects.requireNonNull(mapper); 256 return new LongPipeline.StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE, 257 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 258 @Override 259 // Android-changed: Make public, to match the method it's overriding. 260 public Sink<Double> opWrapSink(int flags, Sink<Long> sink) { 261 return new Sink.ChainedDouble<Long>(sink) { 262 @Override 263 public void accept(double t) { 264 downstream.accept(mapper.applyAsLong(t)); 265 } 266 }; 267 } 268 }; 269 } 270 271 @Override 272 public final DoubleStream flatMap(DoubleFunction<? extends DoubleStream> mapper) { 273 Objects.requireNonNull(mapper); 274 return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE, 275 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) { 276 @Override 277 // Android-changed: Make public, to match the method it's overriding. 278 public Sink<Double> opWrapSink(int flags, Sink<Double> sink) { 279 return new Sink.ChainedDouble<Double>(sink) { 280 // true if cancellationRequested() has been called 281 boolean cancellationRequestedCalled; 282 283 // cache the consumer to avoid creation on every accepted element 284 DoubleConsumer downstreamAsDouble = downstream::accept; 285 286 @Override 287 public void begin(long size) { 288 downstream.begin(-1); 289 } 290 291 @Override 292 public void accept(double t) { 293 try (DoubleStream result = mapper.apply(t)) { 294 if (result != null) { 295 if (!cancellationRequestedCalled) { 296 result.sequential().forEach(downstreamAsDouble); 297 } 298 else { 299 var s = result.sequential().spliterator(); 300 do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstreamAsDouble)); 301 } 302 } 303 } 304 } 305 306 @Override 307 public boolean cancellationRequested() { 308 // If this method is called then an operation within the stream 309 // pipeline is short-circuiting (see AbstractPipeline.copyInto). 310 // Note that we cannot differentiate between an upstream or 311 // downstream operation 312 cancellationRequestedCalled = true; 313 return downstream.cancellationRequested(); 314 } 315 }; 316 } 317 }; 318 } 319 320 @Override 321 public final DoubleStream mapMulti(DoubleMapMultiConsumer mapper) { 322 Objects.requireNonNull(mapper); 323 return new StatelessOp<>(this, StreamShape.DOUBLE_VALUE, 324 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) { 325 326 @Override 327 // Android-changed: Make public, to match the method it's overriding. 328 public Sink<Double> opWrapSink(int flags, Sink<Double> sink) { 329 return new Sink.ChainedDouble<>(sink) { 330 331 @Override 332 public void begin(long size) { 333 downstream.begin(-1); 334 } 335 336 @Override 337 @SuppressWarnings("unchecked") 338 public void accept(double t) { 339 mapper.accept(t, (DoubleConsumer) downstream); 340 } 341 }; 342 } 343 }; 344 } 345 346 @Override 347 public DoubleStream unordered() { 348 if (!isOrdered()) 349 return this; 350 return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE, StreamOpFlag.NOT_ORDERED) { 351 @Override 352 // Android-changed: Make public, to match the method it's overriding. 353 public Sink<Double> opWrapSink(int flags, Sink<Double> sink) { 354 return sink; 355 } 356 }; 357 } 358 359 @Override 360 public final DoubleStream filter(DoublePredicate predicate) { 361 Objects.requireNonNull(predicate); 362 return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE, 363 StreamOpFlag.NOT_SIZED) { 364 @Override 365 // Android-changed: Make public, to match the method it's overriding. 366 public Sink<Double> opWrapSink(int flags, Sink<Double> sink) { 367 return new Sink.ChainedDouble<Double>(sink) { 368 @Override 369 public void begin(long size) { 370 downstream.begin(-1); 371 } 372 373 @Override 374 public void accept(double t) { 375 if (predicate.test(t)) 376 downstream.accept(t); 377 } 378 }; 379 } 380 }; 381 } 382 383 @Override 384 public final DoubleStream peek(DoubleConsumer action) { 385 Objects.requireNonNull(action); 386 return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE, 387 0) { 388 @Override 389 // Android-changed: Make public, to match the method it's overriding. 390 public Sink<Double> opWrapSink(int flags, Sink<Double> sink) { 391 return new Sink.ChainedDouble<Double>(sink) { 392 @Override 393 public void accept(double t) { 394 action.accept(t); 395 downstream.accept(t); 396 } 397 }; 398 } 399 }; 400 } 401 402 // Stateful intermediate ops from DoubleStream 403 404 @Override 405 public final DoubleStream limit(long maxSize) { 406 if (maxSize < 0) 407 throw new IllegalArgumentException(Long.toString(maxSize)); 408 return SliceOps.makeDouble(this, (long) 0, maxSize); 409 } 410 411 @Override 412 public final DoubleStream skip(long n) { 413 if (n < 0) 414 throw new IllegalArgumentException(Long.toString(n)); 415 if (n == 0) 416 return this; 417 else { 418 long limit = -1; 419 return SliceOps.makeDouble(this, n, limit); 420 } 421 } 422 423 @Override 424 public final DoubleStream takeWhile(DoublePredicate predicate) { 425 return WhileOps.makeTakeWhileDouble(this, predicate); 426 } 427 428 @Override 429 public final DoubleStream dropWhile(DoublePredicate predicate) { 430 return WhileOps.makeDropWhileDouble(this, predicate); 431 } 432 433 @Override 434 public final DoubleStream sorted() { 435 return SortedOps.makeDouble(this); 436 } 437 438 @Override 439 public final DoubleStream distinct() { 440 // While functional and quick to implement, this approach is not very efficient. 441 // An efficient version requires a double-specific map/set implementation. 442 return boxed().distinct().mapToDouble(i -> (double) i); 443 } 444 445 // Terminal ops from DoubleStream 446 447 @Override 448 public void forEach(DoubleConsumer consumer) { 449 evaluate(ForEachOps.makeDouble(consumer, false)); 450 } 451 452 @Override 453 public void forEachOrdered(DoubleConsumer consumer) { 454 evaluate(ForEachOps.makeDouble(consumer, true)); 455 } 456 457 @Override 458 public final double sum() { 459 /* 460 * In the arrays allocated for the collect operation, index 0 461 * holds the high-order bits of the running sum, index 1 holds 462 * the negated low-order bits of the sum computed via compensated 463 * summation, and index 2 holds the simple sum used to compute 464 * the proper result if the stream contains infinite values of 465 * the same sign. 466 */ 467 double[] summation = collect(() -> new double[3], 468 (ll, d) -> { 469 Collectors.sumWithCompensation(ll, d); 470 ll[2] += d; 471 }, 472 (ll, rr) -> { 473 Collectors.sumWithCompensation(ll, rr[0]); 474 // Subtract compensation bits 475 Collectors.sumWithCompensation(ll, -rr[1]); 476 ll[2] += rr[2]; 477 }); 478 479 return Collectors.computeFinalSum(summation); 480 } 481 482 @Override 483 public final OptionalDouble min() { 484 return reduce(Math::min); 485 } 486 487 @Override 488 public final OptionalDouble max() { 489 return reduce(Math::max); 490 } 491 492 /** 493 * {@inheritDoc} 494 * 495 * @implNote The {@code double} format can represent all 496 * consecutive integers in the range -2<sup>53</sup> to 497 * 2<sup>53</sup>. If the pipeline has more than 2<sup>53</sup> 498 * values, the divisor in the average computation will saturate at 499 * 2<sup>53</sup>, leading to additional numerical errors. 500 */ 501 @Override 502 public final OptionalDouble average() { 503 /* 504 * In the arrays allocated for the collect operation, index 0 505 * holds the high-order bits of the running sum, index 1 holds 506 * the low-order bits of the sum computed via compensated 507 * summation, index 2 holds the number of values seen, index 3 508 * holds the simple sum. 509 */ 510 double[] avg = collect(() -> new double[4], 511 (ll, d) -> { 512 ll[2]++; 513 Collectors.sumWithCompensation(ll, d); 514 ll[3] += d; 515 }, 516 (ll, rr) -> { 517 Collectors.sumWithCompensation(ll, rr[0]); 518 // Subtract compensation bits 519 Collectors.sumWithCompensation(ll, -rr[1]); 520 ll[2] += rr[2]; 521 ll[3] += rr[3]; 522 }); 523 return avg[2] > 0 524 ? OptionalDouble.of(Collectors.computeFinalSum(avg) / avg[2]) 525 : OptionalDouble.empty(); 526 } 527 528 @Override 529 public final long count() { 530 return evaluate(ReduceOps.makeDoubleCounting()); 531 } 532 533 @Override 534 public final DoubleSummaryStatistics summaryStatistics() { 535 return collect(DoubleSummaryStatistics::new, DoubleSummaryStatistics::accept, 536 DoubleSummaryStatistics::combine); 537 } 538 539 @Override 540 public final double reduce(double identity, DoubleBinaryOperator op) { 541 return evaluate(ReduceOps.makeDouble(identity, op)); 542 } 543 544 @Override 545 public final OptionalDouble reduce(DoubleBinaryOperator op) { 546 return evaluate(ReduceOps.makeDouble(op)); 547 } 548 549 @Override 550 public final <R> R collect(Supplier<R> supplier, 551 ObjDoubleConsumer<R> accumulator, 552 BiConsumer<R, R> combiner) { 553 Objects.requireNonNull(combiner); 554 BinaryOperator<R> operator = (left, right) -> { 555 combiner.accept(left, right); 556 return left; 557 }; 558 return evaluate(ReduceOps.makeDouble(supplier, accumulator, operator)); 559 } 560 561 @Override 562 public final boolean anyMatch(DoublePredicate predicate) { 563 return evaluate(MatchOps.makeDouble(predicate, MatchOps.MatchKind.ANY)); 564 } 565 566 @Override 567 public final boolean allMatch(DoublePredicate predicate) { 568 return evaluate(MatchOps.makeDouble(predicate, MatchOps.MatchKind.ALL)); 569 } 570 571 @Override 572 public final boolean noneMatch(DoublePredicate predicate) { 573 return evaluate(MatchOps.makeDouble(predicate, MatchOps.MatchKind.NONE)); 574 } 575 576 @Override 577 public final OptionalDouble findFirst() { 578 return evaluate(FindOps.makeDouble(true)); 579 } 580 581 @Override 582 public final OptionalDouble findAny() { 583 return evaluate(FindOps.makeDouble(false)); 584 } 585 586 @Override 587 public final double[] toArray() { 588 return Nodes.flattenDouble((Node.OfDouble) evaluateToArrayNode(Double[]::new)) 589 .asPrimitiveArray(); 590 } 591 592 // 593 594 /** 595 * Source stage of a DoubleStream 596 * 597 * @param <E_IN> type of elements in the upstream source 598 * @hide Made public for CTS tests only (OpenJDK 8 streams tests). 599 */ 600 // Android-changed: Made public for CTS tests only. 601 public static class Head<E_IN> extends DoublePipeline<E_IN> { 602 /** 603 * Constructor for the source stage of a DoubleStream. 604 * 605 * @param source {@code Supplier<Spliterator>} describing the stream 606 * source 607 * @param sourceFlags the source flags for the stream source, described 608 * in {@link StreamOpFlag} 609 * @param parallel {@code true} if the pipeline is parallel 610 */ 611 // Android-changed: Made public for CTS tests only. 612 public Head(Supplier<? extends Spliterator<Double>> source, 613 int sourceFlags, boolean parallel) { 614 super(source, sourceFlags, parallel); 615 } 616 617 /** 618 * Constructor for the source stage of a DoubleStream. 619 * 620 * @param source {@code Spliterator} describing the stream source 621 * @param sourceFlags the source flags for the stream source, described 622 * in {@link StreamOpFlag} 623 * @param parallel {@code true} if the pipeline is parallel 624 */ 625 // Android-changed: Made public for CTS tests only. 626 public Head(Spliterator<Double> source, 627 int sourceFlags, boolean parallel) { 628 super(source, sourceFlags, parallel); 629 } 630 631 @Override 632 // Android-changed: Made public for CTS tests only. 633 public final boolean opIsStateful() { 634 throw new UnsupportedOperationException(); 635 } 636 637 @Override 638 // Android-changed: Made public for CTS tests only. 639 public final Sink<E_IN> opWrapSink(int flags, Sink<Double> sink) { 640 throw new UnsupportedOperationException(); 641 } 642 643 // Optimized sequential terminal operations for the head of the pipeline 644 645 @Override 646 public void forEach(DoubleConsumer consumer) { 647 if (!isParallel()) { 648 adapt(sourceStageSpliterator()).forEachRemaining(consumer); 649 } 650 else { 651 super.forEach(consumer); 652 } 653 } 654 655 @Override 656 public void forEachOrdered(DoubleConsumer consumer) { 657 if (!isParallel()) { 658 adapt(sourceStageSpliterator()).forEachRemaining(consumer); 659 } 660 else { 661 super.forEachOrdered(consumer); 662 } 663 } 664 665 } 666 667 /** 668 * Base class for a stateless intermediate stage of a DoubleStream. 669 * 670 * @param <E_IN> type of elements in the upstream source 671 * @since 1.8 672 * @hide Visible for CTS testing only (OpenJDK8 tests). 673 */ 674 // Android-changed: Made public for CTS tests only. 675 public abstract static class StatelessOp<E_IN> extends DoublePipeline<E_IN> { 676 /** 677 * Construct a new DoubleStream by appending a stateless intermediate 678 * operation to an existing stream. 679 * 680 * @param upstream the upstream pipeline stage 681 * @param inputShape the stream shape for the upstream pipeline stage 682 * @param opFlags operation flags for the new stage 683 */ 684 // Android-changed: Made public for CTS tests only. 685 public StatelessOp(AbstractPipeline<?, E_IN, ?> upstream, 686 StreamShape inputShape, 687 int opFlags) { 688 super(upstream, opFlags); 689 assert upstream.getOutputShape() == inputShape; 690 } 691 692 @Override 693 // Android-changed: Make public, to match the method it's overriding. 694 public final boolean opIsStateful() { 695 return false; 696 } 697 } 698 699 /** 700 * Base class for a stateful intermediate stage of a DoubleStream. 701 * 702 * @param <E_IN> type of elements in the upstream source 703 * @since 1.8 704 * @hide Visible for CTS testing only (OpenJDK8 tests). 705 */ 706 // Android-changed: Made public for CTS tests only. 707 public abstract static class StatefulOp<E_IN> extends DoublePipeline<E_IN> { 708 /** 709 * Construct a new DoubleStream by appending a stateful intermediate 710 * operation to an existing stream. 711 * 712 * @param upstream the upstream pipeline stage 713 * @param inputShape the stream shape for the upstream pipeline stage 714 * @param opFlags operation flags for the new stage 715 */ 716 // Android-changed: Made public for CTS tests only. 717 public StatefulOp(AbstractPipeline<?, E_IN, ?> upstream, 718 StreamShape inputShape, 719 int opFlags) { 720 super(upstream, opFlags); 721 assert upstream.getOutputShape() == inputShape; 722 } 723 724 @Override 725 // Android-changed: Make public, to match the method it's overriding. 726 public final boolean opIsStateful() { 727 return true; 728 } 729 730 @Override 731 // Android-changed: Make public, to match the method it's overriding. 732 public abstract <P_IN> Node<Double> opEvaluateParallel(PipelineHelper<Double> helper, 733 Spliterator<P_IN> spliterator, 734 IntFunction<Double[]> generator); 735 } 736 } 737