1 /*
2  * Copyright (c) 2012, 2017, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.  Oracle designates this
8  * particular file as subject to the "Classpath" exception as provided
9  * by Oracle in the LICENSE file that accompanied this code.
10  *
11  * This code is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14  * version 2 for more details (a copy is included in the LICENSE file that
15  * accompanied this code).
16  *
17  * You should have received a copy of the GNU General Public License version
18  * 2 along with this work; if not, write to the Free Software Foundation,
19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20  *
21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22  * or visit www.oracle.com if you need additional information or have any
23  * questions.
24  */
25 package java.util.stream;
26 
27 import java.util.ArrayList;
28 import java.util.Arrays;
29 import java.util.Comparator;
30 import java.util.Objects;
31 import java.util.Spliterator;
32 import java.util.function.IntFunction;
33 
34 
35 /**
36  * Factory methods for transforming streams into sorted streams.
37  *
38  * @since 1.8
39  */
40 final class SortedOps {
41 
SortedOps()42     private SortedOps() { }
43 
44     /**
45      * Appends a "sorted" operation to the provided stream.
46      *
47      * @param <T> the type of both input and output elements
48      * @param upstream a reference stream with element type T
49      */
makeRef(AbstractPipeline<?, T, ?> upstream)50     static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream) {
51         return new OfRef<>(upstream);
52     }
53 
54     /**
55      * Appends a "sorted" operation to the provided stream.
56      *
57      * @param <T> the type of both input and output elements
58      * @param upstream a reference stream with element type T
59      * @param comparator the comparator to order elements by
60      */
makeRef(AbstractPipeline<?, T, ?> upstream, Comparator<? super T> comparator)61     static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream,
62                                 Comparator<? super T> comparator) {
63         return new OfRef<>(upstream, comparator);
64     }
65 
66     /**
67      * Appends a "sorted" operation to the provided stream.
68      *
69      * @param <T> the type of both input and output elements
70      * @param upstream a reference stream with element type T
71      */
makeInt(AbstractPipeline<?, Integer, ?> upstream)72     static <T> IntStream makeInt(AbstractPipeline<?, Integer, ?> upstream) {
73         return new OfInt(upstream);
74     }
75 
76     /**
77      * Appends a "sorted" operation to the provided stream.
78      *
79      * @param <T> the type of both input and output elements
80      * @param upstream a reference stream with element type T
81      */
makeLong(AbstractPipeline<?, Long, ?> upstream)82     static <T> LongStream makeLong(AbstractPipeline<?, Long, ?> upstream) {
83         return new OfLong(upstream);
84     }
85 
86     /**
87      * Appends a "sorted" operation to the provided stream.
88      *
89      * @param <T> the type of both input and output elements
90      * @param upstream a reference stream with element type T
91      */
makeDouble(AbstractPipeline<?, Double, ?> upstream)92     static <T> DoubleStream makeDouble(AbstractPipeline<?, Double, ?> upstream) {
93         return new OfDouble(upstream);
94     }
95 
96     /**
97      * Specialized subtype for sorting reference streams
98      */
99     private static final class OfRef<T> extends ReferencePipeline.StatefulOp<T, T> {
100         /**
101          * Comparator used for sorting
102          */
103         private final boolean isNaturalSort;
104         private final Comparator<? super T> comparator;
105 
106         /**
107          * Sort using natural order of {@literal <T>} which must be
108          * {@code Comparable}.
109          */
OfRef(AbstractPipeline<?, T, ?> upstream)110         OfRef(AbstractPipeline<?, T, ?> upstream) {
111             super(upstream, StreamShape.REFERENCE,
112                   StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED);
113             this.isNaturalSort = true;
114             // Will throw CCE when we try to sort if T is not Comparable
115             @SuppressWarnings("unchecked")
116             Comparator<? super T> comp = (Comparator<? super T>) Comparator.naturalOrder();
117             this.comparator = comp;
118         }
119 
120         /**
121          * Sort using the provided comparator.
122          *
123          * @param comparator The comparator to be used to evaluate ordering.
124          */
OfRef(AbstractPipeline<?, T, ?> upstream, Comparator<? super T> comparator)125         OfRef(AbstractPipeline<?, T, ?> upstream, Comparator<? super T> comparator) {
126             super(upstream, StreamShape.REFERENCE,
127                   StreamOpFlag.IS_ORDERED | StreamOpFlag.NOT_SORTED);
128             this.isNaturalSort = false;
129             this.comparator = Objects.requireNonNull(comparator);
130         }
131 
132         @Override
opWrapSink(int flags, Sink<T> sink)133         public Sink<T> opWrapSink(int flags, Sink<T> sink) {
134             Objects.requireNonNull(sink);
135 
136             // If the input is already naturally sorted and this operation
137             // also naturally sorted then this is a no-op
138             if (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort)
139                 return sink;
140             else if (StreamOpFlag.SIZED.isKnown(flags))
141                 return new SizedRefSortingSink<>(sink, comparator);
142             else
143                 return new RefSortingSink<>(sink, comparator);
144         }
145 
146         @Override
opEvaluateParallel(PipelineHelper<T> helper, Spliterator<P_IN> spliterator, IntFunction<T[]> generator)147         public <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper,
148                                                  Spliterator<P_IN> spliterator,
149                                                  IntFunction<T[]> generator) {
150             // If the input is already naturally sorted and this operation
151             // naturally sorts then collect the output
152             if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags()) && isNaturalSort) {
153                 return helper.evaluate(spliterator, false, generator);
154             }
155             else {
156                 // @@@ Weak two-pass parallel implementation; parallel collect, parallel sort
157                 T[] flattenedData = helper.evaluate(spliterator, true, generator).asArray(generator);
158                 Arrays.parallelSort(flattenedData, comparator);
159                 return Nodes.node(flattenedData);
160             }
161         }
162     }
163 
164     /**
165      * Specialized subtype for sorting int streams.
166      */
167     private static final class OfInt extends IntPipeline.StatefulOp<Integer> {
OfInt(AbstractPipeline<?, Integer, ?> upstream)168         OfInt(AbstractPipeline<?, Integer, ?> upstream) {
169             super(upstream, StreamShape.INT_VALUE,
170                   StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED);
171         }
172 
173         @Override
opWrapSink(int flags, Sink<Integer> sink)174         public Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
175             Objects.requireNonNull(sink);
176 
177             if (StreamOpFlag.SORTED.isKnown(flags))
178                 return sink;
179             else if (StreamOpFlag.SIZED.isKnown(flags))
180                 return new SizedIntSortingSink(sink);
181             else
182                 return new IntSortingSink(sink);
183         }
184 
185         @Override
opEvaluateParallel(PipelineHelper<Integer> helper, Spliterator<P_IN> spliterator, IntFunction<Integer[]> generator)186         public <P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper,
187                                                        Spliterator<P_IN> spliterator,
188                                                        IntFunction<Integer[]> generator) {
189             if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags())) {
190                 return helper.evaluate(spliterator, false, generator);
191             }
192             else {
193                 Node.OfInt n = (Node.OfInt) helper.evaluate(spliterator, true, generator);
194 
195                 int[] content = n.asPrimitiveArray();
196                 Arrays.parallelSort(content);
197 
198                 return Nodes.node(content);
199             }
200         }
201     }
202 
203     /**
204      * Specialized subtype for sorting long streams.
205      */
206     private static final class OfLong extends LongPipeline.StatefulOp<Long> {
OfLong(AbstractPipeline<?, Long, ?> upstream)207         OfLong(AbstractPipeline<?, Long, ?> upstream) {
208             super(upstream, StreamShape.LONG_VALUE,
209                   StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED);
210         }
211 
212         @Override
opWrapSink(int flags, Sink<Long> sink)213         public Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
214             Objects.requireNonNull(sink);
215 
216             if (StreamOpFlag.SORTED.isKnown(flags))
217                 return sink;
218             else if (StreamOpFlag.SIZED.isKnown(flags))
219                 return new SizedLongSortingSink(sink);
220             else
221                 return new LongSortingSink(sink);
222         }
223 
224         @Override
opEvaluateParallel(PipelineHelper<Long> helper, Spliterator<P_IN> spliterator, IntFunction<Long[]> generator)225         public <P_IN> Node<Long> opEvaluateParallel(PipelineHelper<Long> helper,
226                                                     Spliterator<P_IN> spliterator,
227                                                     IntFunction<Long[]> generator) {
228             if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags())) {
229                 return helper.evaluate(spliterator, false, generator);
230             }
231             else {
232                 Node.OfLong n = (Node.OfLong) helper.evaluate(spliterator, true, generator);
233 
234                 long[] content = n.asPrimitiveArray();
235                 Arrays.parallelSort(content);
236 
237                 return Nodes.node(content);
238             }
239         }
240     }
241 
242     /**
243      * Specialized subtype for sorting double streams.
244      */
245     private static final class OfDouble extends DoublePipeline.StatefulOp<Double> {
OfDouble(AbstractPipeline<?, Double, ?> upstream)246         OfDouble(AbstractPipeline<?, Double, ?> upstream) {
247             super(upstream, StreamShape.DOUBLE_VALUE,
248                   StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED);
249         }
250 
251         @Override
opWrapSink(int flags, Sink<Double> sink)252         public Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
253             Objects.requireNonNull(sink);
254 
255             if (StreamOpFlag.SORTED.isKnown(flags))
256                 return sink;
257             else if (StreamOpFlag.SIZED.isKnown(flags))
258                 return new SizedDoubleSortingSink(sink);
259             else
260                 return new DoubleSortingSink(sink);
261         }
262 
263         @Override
opEvaluateParallel(PipelineHelper<Double> helper, Spliterator<P_IN> spliterator, IntFunction<Double[]> generator)264         public <P_IN> Node<Double> opEvaluateParallel(PipelineHelper<Double> helper,
265                                                       Spliterator<P_IN> spliterator,
266                                                       IntFunction<Double[]> generator) {
267             if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags())) {
268                 return helper.evaluate(spliterator, false, generator);
269             }
270             else {
271                 Node.OfDouble n = (Node.OfDouble) helper.evaluate(spliterator, true, generator);
272 
273                 double[] content = n.asPrimitiveArray();
274                 Arrays.parallelSort(content);
275 
276                 return Nodes.node(content);
277             }
278         }
279     }
280 
281     /**
282      * Abstract {@link Sink} for implementing sort on reference streams.
283      *
284      * <p>
285      * Note: documentation below applies to reference and all primitive sinks.
286      * <p>
287      * Sorting sinks first accept all elements, buffering then into an array
288      * or a re-sizable data structure, if the size of the pipeline is known or
289      * unknown respectively.  At the end of the sink protocol those elements are
290      * sorted and then pushed downstream.
291      * This class records if {@link #cancellationRequested} is called.  If so it
292      * can be inferred that the source pushing source elements into the pipeline
293      * knows that the pipeline is short-circuiting.  In such cases sub-classes
294      * pushing elements downstream will preserve the short-circuiting protocol
295      * by calling {@code downstream.cancellationRequested()} and checking the
296      * result is {@code false} before an element is pushed.
297      * <p>
298      * Note that the above behaviour is an optimization for sorting with
299      * sequential streams.  It is not an error that more elements, than strictly
300      * required to produce a result, may flow through the pipeline.  This can
301      * occur, in general (not restricted to just sorting), for short-circuiting
302      * parallel pipelines.
303      */
304     private abstract static class AbstractRefSortingSink<T> extends Sink.ChainedReference<T, T> {
305         protected final Comparator<? super T> comparator;
306         // @@@ could be a lazy final value, if/when support is added
307         // true if cancellationRequested() has been called
308         protected boolean cancellationRequestedCalled;
309 
AbstractRefSortingSink(Sink<? super T> downstream, Comparator<? super T> comparator)310         AbstractRefSortingSink(Sink<? super T> downstream, Comparator<? super T> comparator) {
311             super(downstream);
312             this.comparator = comparator;
313         }
314 
315         /**
316          * Records is cancellation is requested so short-circuiting behaviour
317          * can be preserved when the sorted elements are pushed downstream.
318          *
319          * @return false, as this sink never short-circuits.
320          */
321         @Override
cancellationRequested()322         public final boolean cancellationRequested() {
323             // If this method is called then an operation within the stream
324             // pipeline is short-circuiting (see AbstractPipeline.copyInto).
325             // Note that we cannot differentiate between an upstream or
326             // downstream operation
327             cancellationRequestedCalled = true;
328             return false;
329         }
330     }
331 
332     /**
333      * {@link Sink} for implementing sort on SIZED reference streams.
334      */
335     private static final class SizedRefSortingSink<T> extends AbstractRefSortingSink<T> {
336         private T[] array;
337         private int offset;
338 
SizedRefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator)339         SizedRefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) {
340             super(sink, comparator);
341         }
342 
343         @Override
344         @SuppressWarnings("unchecked")
begin(long size)345         public void begin(long size) {
346             if (size >= Nodes.MAX_ARRAY_SIZE)
347                 throw new IllegalArgumentException(Nodes.BAD_SIZE);
348             array = (T[]) new Object[(int) size];
349         }
350 
351         @Override
end()352         public void end() {
353             Arrays.sort(array, 0, offset, comparator);
354             downstream.begin(offset);
355             if (!cancellationRequestedCalled) {
356                 for (int i = 0; i < offset; i++)
357                     downstream.accept(array[i]);
358             }
359             else {
360                 for (int i = 0; i < offset && !downstream.cancellationRequested(); i++)
361                     downstream.accept(array[i]);
362             }
363             downstream.end();
364             array = null;
365         }
366 
367         @Override
accept(T t)368         public void accept(T t) {
369             array[offset++] = t;
370         }
371     }
372 
373     /**
374      * {@link Sink} for implementing sort on reference streams.
375      */
376     private static final class RefSortingSink<T> extends AbstractRefSortingSink<T> {
377         private ArrayList<T> list;
378 
RefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator)379         RefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) {
380             super(sink, comparator);
381         }
382 
383         @Override
begin(long size)384         public void begin(long size) {
385             if (size >= Nodes.MAX_ARRAY_SIZE)
386                 throw new IllegalArgumentException(Nodes.BAD_SIZE);
387             list = (size >= 0) ? new ArrayList<>((int) size) : new ArrayList<>();
388         }
389 
390         @Override
end()391         public void end() {
392             list.sort(comparator);
393             downstream.begin(list.size());
394             if (!cancellationRequestedCalled) {
395                 list.forEach(downstream::accept);
396             }
397             else {
398                 for (T t : list) {
399                     if (downstream.cancellationRequested()) break;
400                     downstream.accept(t);
401                 }
402             }
403             downstream.end();
404             list = null;
405         }
406 
407         @Override
accept(T t)408         public void accept(T t) {
409             list.add(t);
410         }
411     }
412 
413     /**
414      * Abstract {@link Sink} for implementing sort on int streams.
415      */
416     private abstract static class AbstractIntSortingSink extends Sink.ChainedInt<Integer> {
417         // true if cancellationRequested() has been called
418         protected boolean cancellationRequestedCalled;
419 
AbstractIntSortingSink(Sink<? super Integer> downstream)420         AbstractIntSortingSink(Sink<? super Integer> downstream) {
421             super(downstream);
422         }
423 
424         @Override
cancellationRequested()425         public final boolean cancellationRequested() {
426             cancellationRequestedCalled = true;
427             return false;
428         }
429     }
430 
431     /**
432      * {@link Sink} for implementing sort on SIZED int streams.
433      */
434     private static final class SizedIntSortingSink extends AbstractIntSortingSink {
435         private int[] array;
436         private int offset;
437 
SizedIntSortingSink(Sink<? super Integer> downstream)438         SizedIntSortingSink(Sink<? super Integer> downstream) {
439             super(downstream);
440         }
441 
442         @Override
begin(long size)443         public void begin(long size) {
444             if (size >= Nodes.MAX_ARRAY_SIZE)
445                 throw new IllegalArgumentException(Nodes.BAD_SIZE);
446             array = new int[(int) size];
447         }
448 
449         @Override
end()450         public void end() {
451             Arrays.sort(array, 0, offset);
452             downstream.begin(offset);
453             if (!cancellationRequestedCalled) {
454                 for (int i = 0; i < offset; i++)
455                     downstream.accept(array[i]);
456             }
457             else {
458                 for (int i = 0; i < offset && !downstream.cancellationRequested(); i++)
459                     downstream.accept(array[i]);
460             }
461             downstream.end();
462             array = null;
463         }
464 
465         @Override
accept(int t)466         public void accept(int t) {
467             array[offset++] = t;
468         }
469     }
470 
471     /**
472      * {@link Sink} for implementing sort on int streams.
473      */
474     private static final class IntSortingSink extends AbstractIntSortingSink {
475         private SpinedBuffer.OfInt b;
476 
IntSortingSink(Sink<? super Integer> sink)477         IntSortingSink(Sink<? super Integer> sink) {
478             super(sink);
479         }
480 
481         @Override
begin(long size)482         public void begin(long size) {
483             if (size >= Nodes.MAX_ARRAY_SIZE)
484                 throw new IllegalArgumentException(Nodes.BAD_SIZE);
485             b = (size > 0) ? new SpinedBuffer.OfInt((int) size) : new SpinedBuffer.OfInt();
486         }
487 
488         @Override
end()489         public void end() {
490             int[] ints = b.asPrimitiveArray();
491             Arrays.sort(ints);
492             downstream.begin(ints.length);
493             if (!cancellationRequestedCalled) {
494                 for (int anInt : ints)
495                     downstream.accept(anInt);
496             }
497             else {
498                 for (int anInt : ints) {
499                     if (downstream.cancellationRequested()) break;
500                     downstream.accept(anInt);
501                 }
502             }
503             downstream.end();
504         }
505 
506         @Override
accept(int t)507         public void accept(int t) {
508             b.accept(t);
509         }
510     }
511 
512     /**
513      * Abstract {@link Sink} for implementing sort on long streams.
514      */
515     private abstract static class AbstractLongSortingSink extends Sink.ChainedLong<Long> {
516         // true if cancellationRequested() has been called
517         protected boolean cancellationRequestedCalled;
518 
AbstractLongSortingSink(Sink<? super Long> downstream)519         AbstractLongSortingSink(Sink<? super Long> downstream) {
520             super(downstream);
521         }
522 
523         @Override
cancellationRequested()524         public final boolean cancellationRequested() {
525             cancellationRequestedCalled = true;
526             return false;
527         }
528     }
529 
530     /**
531      * {@link Sink} for implementing sort on SIZED long streams.
532      */
533     private static final class SizedLongSortingSink extends AbstractLongSortingSink {
534         private long[] array;
535         private int offset;
536 
SizedLongSortingSink(Sink<? super Long> downstream)537         SizedLongSortingSink(Sink<? super Long> downstream) {
538             super(downstream);
539         }
540 
541         @Override
begin(long size)542         public void begin(long size) {
543             if (size >= Nodes.MAX_ARRAY_SIZE)
544                 throw new IllegalArgumentException(Nodes.BAD_SIZE);
545             array = new long[(int) size];
546         }
547 
548         @Override
end()549         public void end() {
550             Arrays.sort(array, 0, offset);
551             downstream.begin(offset);
552             if (!cancellationRequestedCalled) {
553                 for (int i = 0; i < offset; i++)
554                     downstream.accept(array[i]);
555             }
556             else {
557                 for (int i = 0; i < offset && !downstream.cancellationRequested(); i++)
558                     downstream.accept(array[i]);
559             }
560             downstream.end();
561             array = null;
562         }
563 
564         @Override
accept(long t)565         public void accept(long t) {
566             array[offset++] = t;
567         }
568     }
569 
570     /**
571      * {@link Sink} for implementing sort on long streams.
572      */
573     private static final class LongSortingSink extends AbstractLongSortingSink {
574         private SpinedBuffer.OfLong b;
575 
LongSortingSink(Sink<? super Long> sink)576         LongSortingSink(Sink<? super Long> sink) {
577             super(sink);
578         }
579 
580         @Override
begin(long size)581         public void begin(long size) {
582             if (size >= Nodes.MAX_ARRAY_SIZE)
583                 throw new IllegalArgumentException(Nodes.BAD_SIZE);
584             b = (size > 0) ? new SpinedBuffer.OfLong((int) size) : new SpinedBuffer.OfLong();
585         }
586 
587         @Override
end()588         public void end() {
589             long[] longs = b.asPrimitiveArray();
590             Arrays.sort(longs);
591             downstream.begin(longs.length);
592             if (!cancellationRequestedCalled) {
593                 for (long aLong : longs)
594                     downstream.accept(aLong);
595             }
596             else {
597                 for (long aLong : longs) {
598                     if (downstream.cancellationRequested()) break;
599                     downstream.accept(aLong);
600                 }
601             }
602             downstream.end();
603         }
604 
605         @Override
accept(long t)606         public void accept(long t) {
607             b.accept(t);
608         }
609     }
610 
611     /**
612      * Abstract {@link Sink} for implementing sort on long streams.
613      */
614     private abstract static class AbstractDoubleSortingSink extends Sink.ChainedDouble<Double> {
615         // true if cancellationRequested() has been called
616         protected boolean cancellationRequestedCalled;
617 
AbstractDoubleSortingSink(Sink<? super Double> downstream)618         AbstractDoubleSortingSink(Sink<? super Double> downstream) {
619             super(downstream);
620         }
621 
622         @Override
cancellationRequested()623         public final boolean cancellationRequested() {
624             cancellationRequestedCalled = true;
625             return false;
626         }
627     }
628 
629     /**
630      * {@link Sink} for implementing sort on SIZED double streams.
631      */
632     private static final class SizedDoubleSortingSink extends AbstractDoubleSortingSink {
633         private double[] array;
634         private int offset;
635 
SizedDoubleSortingSink(Sink<? super Double> downstream)636         SizedDoubleSortingSink(Sink<? super Double> downstream) {
637             super(downstream);
638         }
639 
640         @Override
begin(long size)641         public void begin(long size) {
642             if (size >= Nodes.MAX_ARRAY_SIZE)
643                 throw new IllegalArgumentException(Nodes.BAD_SIZE);
644             array = new double[(int) size];
645         }
646 
647         @Override
end()648         public void end() {
649             Arrays.sort(array, 0, offset);
650             downstream.begin(offset);
651             if (!cancellationRequestedCalled) {
652                 for (int i = 0; i < offset; i++)
653                     downstream.accept(array[i]);
654             }
655             else {
656                 for (int i = 0; i < offset && !downstream.cancellationRequested(); i++)
657                     downstream.accept(array[i]);
658             }
659             downstream.end();
660             array = null;
661         }
662 
663         @Override
accept(double t)664         public void accept(double t) {
665             array[offset++] = t;
666         }
667     }
668 
669     /**
670      * {@link Sink} for implementing sort on double streams.
671      */
672     private static final class DoubleSortingSink extends AbstractDoubleSortingSink {
673         private SpinedBuffer.OfDouble b;
674 
DoubleSortingSink(Sink<? super Double> sink)675         DoubleSortingSink(Sink<? super Double> sink) {
676             super(sink);
677         }
678 
679         @Override
begin(long size)680         public void begin(long size) {
681             if (size >= Nodes.MAX_ARRAY_SIZE)
682                 throw new IllegalArgumentException(Nodes.BAD_SIZE);
683             b = (size > 0) ? new SpinedBuffer.OfDouble((int) size) : new SpinedBuffer.OfDouble();
684         }
685 
686         @Override
end()687         public void end() {
688             double[] doubles = b.asPrimitiveArray();
689             Arrays.sort(doubles);
690             downstream.begin(doubles.length);
691             if (!cancellationRequestedCalled) {
692                 for (double aDouble : doubles)
693                     downstream.accept(aDouble);
694             }
695             else {
696                 for (double aDouble : doubles) {
697                     if (downstream.cancellationRequested()) break;
698                     downstream.accept(aDouble);
699                 }
700             }
701             downstream.end();
702         }
703 
704         @Override
accept(double t)705         public void accept(double t) {
706             b.accept(t);
707         }
708     }
709 }
710