1 /* 2 * Copyright (c) 2012, 2017, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.util.stream; 26 27 import java.util.ArrayList; 28 import java.util.Arrays; 29 import java.util.Comparator; 30 import java.util.Objects; 31 import java.util.Spliterator; 32 import java.util.function.IntFunction; 33 34 35 /** 36 * Factory methods for transforming streams into sorted streams. 37 * 38 * @since 1.8 39 */ 40 final class SortedOps { 41 SortedOps()42 private SortedOps() { } 43 44 /** 45 * Appends a "sorted" operation to the provided stream. 46 * 47 * @param <T> the type of both input and output elements 48 * @param upstream a reference stream with element type T 49 */ makeRef(AbstractPipeline<?, T, ?> upstream)50 static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream) { 51 return new OfRef<>(upstream); 52 } 53 54 /** 55 * Appends a "sorted" operation to the provided stream. 56 * 57 * @param <T> the type of both input and output elements 58 * @param upstream a reference stream with element type T 59 * @param comparator the comparator to order elements by 60 */ makeRef(AbstractPipeline<?, T, ?> upstream, Comparator<? super T> comparator)61 static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream, 62 Comparator<? super T> comparator) { 63 return new OfRef<>(upstream, comparator); 64 } 65 66 /** 67 * Appends a "sorted" operation to the provided stream. 68 * 69 * @param <T> the type of both input and output elements 70 * @param upstream a reference stream with element type T 71 */ makeInt(AbstractPipeline<?, Integer, ?> upstream)72 static <T> IntStream makeInt(AbstractPipeline<?, Integer, ?> upstream) { 73 return new OfInt(upstream); 74 } 75 76 /** 77 * Appends a "sorted" operation to the provided stream. 78 * 79 * @param <T> the type of both input and output elements 80 * @param upstream a reference stream with element type T 81 */ makeLong(AbstractPipeline<?, Long, ?> upstream)82 static <T> LongStream makeLong(AbstractPipeline<?, Long, ?> upstream) { 83 return new OfLong(upstream); 84 } 85 86 /** 87 * Appends a "sorted" operation to the provided stream. 88 * 89 * @param <T> the type of both input and output elements 90 * @param upstream a reference stream with element type T 91 */ makeDouble(AbstractPipeline<?, Double, ?> upstream)92 static <T> DoubleStream makeDouble(AbstractPipeline<?, Double, ?> upstream) { 93 return new OfDouble(upstream); 94 } 95 96 /** 97 * Specialized subtype for sorting reference streams 98 */ 99 private static final class OfRef<T> extends ReferencePipeline.StatefulOp<T, T> { 100 /** 101 * Comparator used for sorting 102 */ 103 private final boolean isNaturalSort; 104 private final Comparator<? super T> comparator; 105 106 /** 107 * Sort using natural order of {@literal <T>} which must be 108 * {@code Comparable}. 109 */ OfRef(AbstractPipeline<?, T, ?> upstream)110 OfRef(AbstractPipeline<?, T, ?> upstream) { 111 super(upstream, StreamShape.REFERENCE, 112 StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED); 113 this.isNaturalSort = true; 114 // Will throw CCE when we try to sort if T is not Comparable 115 @SuppressWarnings("unchecked") 116 Comparator<? super T> comp = (Comparator<? super T>) Comparator.naturalOrder(); 117 this.comparator = comp; 118 } 119 120 /** 121 * Sort using the provided comparator. 122 * 123 * @param comparator The comparator to be used to evaluate ordering. 124 */ OfRef(AbstractPipeline<?, T, ?> upstream, Comparator<? super T> comparator)125 OfRef(AbstractPipeline<?, T, ?> upstream, Comparator<? super T> comparator) { 126 super(upstream, StreamShape.REFERENCE, 127 StreamOpFlag.IS_ORDERED | StreamOpFlag.NOT_SORTED); 128 this.isNaturalSort = false; 129 this.comparator = Objects.requireNonNull(comparator); 130 } 131 132 @Override opWrapSink(int flags, Sink<T> sink)133 public Sink<T> opWrapSink(int flags, Sink<T> sink) { 134 Objects.requireNonNull(sink); 135 136 // If the input is already naturally sorted and this operation 137 // also naturally sorted then this is a no-op 138 if (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort) 139 return sink; 140 else if (StreamOpFlag.SIZED.isKnown(flags)) 141 return new SizedRefSortingSink<>(sink, comparator); 142 else 143 return new RefSortingSink<>(sink, comparator); 144 } 145 146 @Override opEvaluateParallel(PipelineHelper<T> helper, Spliterator<P_IN> spliterator, IntFunction<T[]> generator)147 public <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper, 148 Spliterator<P_IN> spliterator, 149 IntFunction<T[]> generator) { 150 // If the input is already naturally sorted and this operation 151 // naturally sorts then collect the output 152 if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags()) && isNaturalSort) { 153 return helper.evaluate(spliterator, false, generator); 154 } 155 else { 156 // @@@ Weak two-pass parallel implementation; parallel collect, parallel sort 157 T[] flattenedData = helper.evaluate(spliterator, true, generator).asArray(generator); 158 Arrays.parallelSort(flattenedData, comparator); 159 return Nodes.node(flattenedData); 160 } 161 } 162 } 163 164 /** 165 * Specialized subtype for sorting int streams. 166 */ 167 private static final class OfInt extends IntPipeline.StatefulOp<Integer> { OfInt(AbstractPipeline<?, Integer, ?> upstream)168 OfInt(AbstractPipeline<?, Integer, ?> upstream) { 169 super(upstream, StreamShape.INT_VALUE, 170 StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED); 171 } 172 173 @Override opWrapSink(int flags, Sink<Integer> sink)174 public Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) { 175 Objects.requireNonNull(sink); 176 177 if (StreamOpFlag.SORTED.isKnown(flags)) 178 return sink; 179 else if (StreamOpFlag.SIZED.isKnown(flags)) 180 return new SizedIntSortingSink(sink); 181 else 182 return new IntSortingSink(sink); 183 } 184 185 @Override opEvaluateParallel(PipelineHelper<Integer> helper, Spliterator<P_IN> spliterator, IntFunction<Integer[]> generator)186 public <P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper, 187 Spliterator<P_IN> spliterator, 188 IntFunction<Integer[]> generator) { 189 if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags())) { 190 return helper.evaluate(spliterator, false, generator); 191 } 192 else { 193 Node.OfInt n = (Node.OfInt) helper.evaluate(spliterator, true, generator); 194 195 int[] content = n.asPrimitiveArray(); 196 Arrays.parallelSort(content); 197 198 return Nodes.node(content); 199 } 200 } 201 } 202 203 /** 204 * Specialized subtype for sorting long streams. 205 */ 206 private static final class OfLong extends LongPipeline.StatefulOp<Long> { OfLong(AbstractPipeline<?, Long, ?> upstream)207 OfLong(AbstractPipeline<?, Long, ?> upstream) { 208 super(upstream, StreamShape.LONG_VALUE, 209 StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED); 210 } 211 212 @Override opWrapSink(int flags, Sink<Long> sink)213 public Sink<Long> opWrapSink(int flags, Sink<Long> sink) { 214 Objects.requireNonNull(sink); 215 216 if (StreamOpFlag.SORTED.isKnown(flags)) 217 return sink; 218 else if (StreamOpFlag.SIZED.isKnown(flags)) 219 return new SizedLongSortingSink(sink); 220 else 221 return new LongSortingSink(sink); 222 } 223 224 @Override opEvaluateParallel(PipelineHelper<Long> helper, Spliterator<P_IN> spliterator, IntFunction<Long[]> generator)225 public <P_IN> Node<Long> opEvaluateParallel(PipelineHelper<Long> helper, 226 Spliterator<P_IN> spliterator, 227 IntFunction<Long[]> generator) { 228 if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags())) { 229 return helper.evaluate(spliterator, false, generator); 230 } 231 else { 232 Node.OfLong n = (Node.OfLong) helper.evaluate(spliterator, true, generator); 233 234 long[] content = n.asPrimitiveArray(); 235 Arrays.parallelSort(content); 236 237 return Nodes.node(content); 238 } 239 } 240 } 241 242 /** 243 * Specialized subtype for sorting double streams. 244 */ 245 private static final class OfDouble extends DoublePipeline.StatefulOp<Double> { OfDouble(AbstractPipeline<?, Double, ?> upstream)246 OfDouble(AbstractPipeline<?, Double, ?> upstream) { 247 super(upstream, StreamShape.DOUBLE_VALUE, 248 StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED); 249 } 250 251 @Override opWrapSink(int flags, Sink<Double> sink)252 public Sink<Double> opWrapSink(int flags, Sink<Double> sink) { 253 Objects.requireNonNull(sink); 254 255 if (StreamOpFlag.SORTED.isKnown(flags)) 256 return sink; 257 else if (StreamOpFlag.SIZED.isKnown(flags)) 258 return new SizedDoubleSortingSink(sink); 259 else 260 return new DoubleSortingSink(sink); 261 } 262 263 @Override opEvaluateParallel(PipelineHelper<Double> helper, Spliterator<P_IN> spliterator, IntFunction<Double[]> generator)264 public <P_IN> Node<Double> opEvaluateParallel(PipelineHelper<Double> helper, 265 Spliterator<P_IN> spliterator, 266 IntFunction<Double[]> generator) { 267 if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags())) { 268 return helper.evaluate(spliterator, false, generator); 269 } 270 else { 271 Node.OfDouble n = (Node.OfDouble) helper.evaluate(spliterator, true, generator); 272 273 double[] content = n.asPrimitiveArray(); 274 Arrays.parallelSort(content); 275 276 return Nodes.node(content); 277 } 278 } 279 } 280 281 /** 282 * Abstract {@link Sink} for implementing sort on reference streams. 283 * 284 * <p> 285 * Note: documentation below applies to reference and all primitive sinks. 286 * <p> 287 * Sorting sinks first accept all elements, buffering then into an array 288 * or a re-sizable data structure, if the size of the pipeline is known or 289 * unknown respectively. At the end of the sink protocol those elements are 290 * sorted and then pushed downstream. 291 * This class records if {@link #cancellationRequested} is called. If so it 292 * can be inferred that the source pushing source elements into the pipeline 293 * knows that the pipeline is short-circuiting. In such cases sub-classes 294 * pushing elements downstream will preserve the short-circuiting protocol 295 * by calling {@code downstream.cancellationRequested()} and checking the 296 * result is {@code false} before an element is pushed. 297 * <p> 298 * Note that the above behaviour is an optimization for sorting with 299 * sequential streams. It is not an error that more elements, than strictly 300 * required to produce a result, may flow through the pipeline. This can 301 * occur, in general (not restricted to just sorting), for short-circuiting 302 * parallel pipelines. 303 */ 304 private abstract static class AbstractRefSortingSink<T> extends Sink.ChainedReference<T, T> { 305 protected final Comparator<? super T> comparator; 306 // @@@ could be a lazy final value, if/when support is added 307 // true if cancellationRequested() has been called 308 protected boolean cancellationRequestedCalled; 309 AbstractRefSortingSink(Sink<? super T> downstream, Comparator<? super T> comparator)310 AbstractRefSortingSink(Sink<? super T> downstream, Comparator<? super T> comparator) { 311 super(downstream); 312 this.comparator = comparator; 313 } 314 315 /** 316 * Records is cancellation is requested so short-circuiting behaviour 317 * can be preserved when the sorted elements are pushed downstream. 318 * 319 * @return false, as this sink never short-circuits. 320 */ 321 @Override cancellationRequested()322 public final boolean cancellationRequested() { 323 // If this method is called then an operation within the stream 324 // pipeline is short-circuiting (see AbstractPipeline.copyInto). 325 // Note that we cannot differentiate between an upstream or 326 // downstream operation 327 cancellationRequestedCalled = true; 328 return false; 329 } 330 } 331 332 /** 333 * {@link Sink} for implementing sort on SIZED reference streams. 334 */ 335 private static final class SizedRefSortingSink<T> extends AbstractRefSortingSink<T> { 336 private T[] array; 337 private int offset; 338 SizedRefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator)339 SizedRefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) { 340 super(sink, comparator); 341 } 342 343 @Override 344 @SuppressWarnings("unchecked") begin(long size)345 public void begin(long size) { 346 if (size >= Nodes.MAX_ARRAY_SIZE) 347 throw new IllegalArgumentException(Nodes.BAD_SIZE); 348 array = (T[]) new Object[(int) size]; 349 } 350 351 @Override end()352 public void end() { 353 Arrays.sort(array, 0, offset, comparator); 354 downstream.begin(offset); 355 if (!cancellationRequestedCalled) { 356 for (int i = 0; i < offset; i++) 357 downstream.accept(array[i]); 358 } 359 else { 360 for (int i = 0; i < offset && !downstream.cancellationRequested(); i++) 361 downstream.accept(array[i]); 362 } 363 downstream.end(); 364 array = null; 365 } 366 367 @Override accept(T t)368 public void accept(T t) { 369 array[offset++] = t; 370 } 371 } 372 373 /** 374 * {@link Sink} for implementing sort on reference streams. 375 */ 376 private static final class RefSortingSink<T> extends AbstractRefSortingSink<T> { 377 private ArrayList<T> list; 378 RefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator)379 RefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) { 380 super(sink, comparator); 381 } 382 383 @Override begin(long size)384 public void begin(long size) { 385 if (size >= Nodes.MAX_ARRAY_SIZE) 386 throw new IllegalArgumentException(Nodes.BAD_SIZE); 387 list = (size >= 0) ? new ArrayList<>((int) size) : new ArrayList<>(); 388 } 389 390 @Override end()391 public void end() { 392 list.sort(comparator); 393 downstream.begin(list.size()); 394 if (!cancellationRequestedCalled) { 395 list.forEach(downstream::accept); 396 } 397 else { 398 for (T t : list) { 399 if (downstream.cancellationRequested()) break; 400 downstream.accept(t); 401 } 402 } 403 downstream.end(); 404 list = null; 405 } 406 407 @Override accept(T t)408 public void accept(T t) { 409 list.add(t); 410 } 411 } 412 413 /** 414 * Abstract {@link Sink} for implementing sort on int streams. 415 */ 416 private abstract static class AbstractIntSortingSink extends Sink.ChainedInt<Integer> { 417 // true if cancellationRequested() has been called 418 protected boolean cancellationRequestedCalled; 419 AbstractIntSortingSink(Sink<? super Integer> downstream)420 AbstractIntSortingSink(Sink<? super Integer> downstream) { 421 super(downstream); 422 } 423 424 @Override cancellationRequested()425 public final boolean cancellationRequested() { 426 cancellationRequestedCalled = true; 427 return false; 428 } 429 } 430 431 /** 432 * {@link Sink} for implementing sort on SIZED int streams. 433 */ 434 private static final class SizedIntSortingSink extends AbstractIntSortingSink { 435 private int[] array; 436 private int offset; 437 SizedIntSortingSink(Sink<? super Integer> downstream)438 SizedIntSortingSink(Sink<? super Integer> downstream) { 439 super(downstream); 440 } 441 442 @Override begin(long size)443 public void begin(long size) { 444 if (size >= Nodes.MAX_ARRAY_SIZE) 445 throw new IllegalArgumentException(Nodes.BAD_SIZE); 446 array = new int[(int) size]; 447 } 448 449 @Override end()450 public void end() { 451 Arrays.sort(array, 0, offset); 452 downstream.begin(offset); 453 if (!cancellationRequestedCalled) { 454 for (int i = 0; i < offset; i++) 455 downstream.accept(array[i]); 456 } 457 else { 458 for (int i = 0; i < offset && !downstream.cancellationRequested(); i++) 459 downstream.accept(array[i]); 460 } 461 downstream.end(); 462 array = null; 463 } 464 465 @Override accept(int t)466 public void accept(int t) { 467 array[offset++] = t; 468 } 469 } 470 471 /** 472 * {@link Sink} for implementing sort on int streams. 473 */ 474 private static final class IntSortingSink extends AbstractIntSortingSink { 475 private SpinedBuffer.OfInt b; 476 IntSortingSink(Sink<? super Integer> sink)477 IntSortingSink(Sink<? super Integer> sink) { 478 super(sink); 479 } 480 481 @Override begin(long size)482 public void begin(long size) { 483 if (size >= Nodes.MAX_ARRAY_SIZE) 484 throw new IllegalArgumentException(Nodes.BAD_SIZE); 485 b = (size > 0) ? new SpinedBuffer.OfInt((int) size) : new SpinedBuffer.OfInt(); 486 } 487 488 @Override end()489 public void end() { 490 int[] ints = b.asPrimitiveArray(); 491 Arrays.sort(ints); 492 downstream.begin(ints.length); 493 if (!cancellationRequestedCalled) { 494 for (int anInt : ints) 495 downstream.accept(anInt); 496 } 497 else { 498 for (int anInt : ints) { 499 if (downstream.cancellationRequested()) break; 500 downstream.accept(anInt); 501 } 502 } 503 downstream.end(); 504 } 505 506 @Override accept(int t)507 public void accept(int t) { 508 b.accept(t); 509 } 510 } 511 512 /** 513 * Abstract {@link Sink} for implementing sort on long streams. 514 */ 515 private abstract static class AbstractLongSortingSink extends Sink.ChainedLong<Long> { 516 // true if cancellationRequested() has been called 517 protected boolean cancellationRequestedCalled; 518 AbstractLongSortingSink(Sink<? super Long> downstream)519 AbstractLongSortingSink(Sink<? super Long> downstream) { 520 super(downstream); 521 } 522 523 @Override cancellationRequested()524 public final boolean cancellationRequested() { 525 cancellationRequestedCalled = true; 526 return false; 527 } 528 } 529 530 /** 531 * {@link Sink} for implementing sort on SIZED long streams. 532 */ 533 private static final class SizedLongSortingSink extends AbstractLongSortingSink { 534 private long[] array; 535 private int offset; 536 SizedLongSortingSink(Sink<? super Long> downstream)537 SizedLongSortingSink(Sink<? super Long> downstream) { 538 super(downstream); 539 } 540 541 @Override begin(long size)542 public void begin(long size) { 543 if (size >= Nodes.MAX_ARRAY_SIZE) 544 throw new IllegalArgumentException(Nodes.BAD_SIZE); 545 array = new long[(int) size]; 546 } 547 548 @Override end()549 public void end() { 550 Arrays.sort(array, 0, offset); 551 downstream.begin(offset); 552 if (!cancellationRequestedCalled) { 553 for (int i = 0; i < offset; i++) 554 downstream.accept(array[i]); 555 } 556 else { 557 for (int i = 0; i < offset && !downstream.cancellationRequested(); i++) 558 downstream.accept(array[i]); 559 } 560 downstream.end(); 561 array = null; 562 } 563 564 @Override accept(long t)565 public void accept(long t) { 566 array[offset++] = t; 567 } 568 } 569 570 /** 571 * {@link Sink} for implementing sort on long streams. 572 */ 573 private static final class LongSortingSink extends AbstractLongSortingSink { 574 private SpinedBuffer.OfLong b; 575 LongSortingSink(Sink<? super Long> sink)576 LongSortingSink(Sink<? super Long> sink) { 577 super(sink); 578 } 579 580 @Override begin(long size)581 public void begin(long size) { 582 if (size >= Nodes.MAX_ARRAY_SIZE) 583 throw new IllegalArgumentException(Nodes.BAD_SIZE); 584 b = (size > 0) ? new SpinedBuffer.OfLong((int) size) : new SpinedBuffer.OfLong(); 585 } 586 587 @Override end()588 public void end() { 589 long[] longs = b.asPrimitiveArray(); 590 Arrays.sort(longs); 591 downstream.begin(longs.length); 592 if (!cancellationRequestedCalled) { 593 for (long aLong : longs) 594 downstream.accept(aLong); 595 } 596 else { 597 for (long aLong : longs) { 598 if (downstream.cancellationRequested()) break; 599 downstream.accept(aLong); 600 } 601 } 602 downstream.end(); 603 } 604 605 @Override accept(long t)606 public void accept(long t) { 607 b.accept(t); 608 } 609 } 610 611 /** 612 * Abstract {@link Sink} for implementing sort on long streams. 613 */ 614 private abstract static class AbstractDoubleSortingSink extends Sink.ChainedDouble<Double> { 615 // true if cancellationRequested() has been called 616 protected boolean cancellationRequestedCalled; 617 AbstractDoubleSortingSink(Sink<? super Double> downstream)618 AbstractDoubleSortingSink(Sink<? super Double> downstream) { 619 super(downstream); 620 } 621 622 @Override cancellationRequested()623 public final boolean cancellationRequested() { 624 cancellationRequestedCalled = true; 625 return false; 626 } 627 } 628 629 /** 630 * {@link Sink} for implementing sort on SIZED double streams. 631 */ 632 private static final class SizedDoubleSortingSink extends AbstractDoubleSortingSink { 633 private double[] array; 634 private int offset; 635 SizedDoubleSortingSink(Sink<? super Double> downstream)636 SizedDoubleSortingSink(Sink<? super Double> downstream) { 637 super(downstream); 638 } 639 640 @Override begin(long size)641 public void begin(long size) { 642 if (size >= Nodes.MAX_ARRAY_SIZE) 643 throw new IllegalArgumentException(Nodes.BAD_SIZE); 644 array = new double[(int) size]; 645 } 646 647 @Override end()648 public void end() { 649 Arrays.sort(array, 0, offset); 650 downstream.begin(offset); 651 if (!cancellationRequestedCalled) { 652 for (int i = 0; i < offset; i++) 653 downstream.accept(array[i]); 654 } 655 else { 656 for (int i = 0; i < offset && !downstream.cancellationRequested(); i++) 657 downstream.accept(array[i]); 658 } 659 downstream.end(); 660 array = null; 661 } 662 663 @Override accept(double t)664 public void accept(double t) { 665 array[offset++] = t; 666 } 667 } 668 669 /** 670 * {@link Sink} for implementing sort on double streams. 671 */ 672 private static final class DoubleSortingSink extends AbstractDoubleSortingSink { 673 private SpinedBuffer.OfDouble b; 674 DoubleSortingSink(Sink<? super Double> sink)675 DoubleSortingSink(Sink<? super Double> sink) { 676 super(sink); 677 } 678 679 @Override begin(long size)680 public void begin(long size) { 681 if (size >= Nodes.MAX_ARRAY_SIZE) 682 throw new IllegalArgumentException(Nodes.BAD_SIZE); 683 b = (size > 0) ? new SpinedBuffer.OfDouble((int) size) : new SpinedBuffer.OfDouble(); 684 } 685 686 @Override end()687 public void end() { 688 double[] doubles = b.asPrimitiveArray(); 689 Arrays.sort(doubles); 690 downstream.begin(doubles.length); 691 if (!cancellationRequestedCalled) { 692 for (double aDouble : doubles) 693 downstream.accept(aDouble); 694 } 695 else { 696 for (double aDouble : doubles) { 697 if (downstream.cancellationRequested()) break; 698 downstream.accept(aDouble); 699 } 700 } 701 downstream.end(); 702 } 703 704 @Override accept(double t)705 public void accept(double t) { 706 b.accept(t); 707 } 708 } 709 } 710