• Home
  • History
  • Annotate
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2023 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package libcore.java.util.concurrent;
18 
19 import static org.junit.Assert.assertEquals;
20 import static org.junit.Assert.assertFalse;
21 import static org.junit.Assert.assertSame;
22 import static org.junit.Assert.assertTrue;
23 
24 import java.util.concurrent.CompletableFuture;
25 import java.util.concurrent.CompletionStage;
26 import java.util.concurrent.Executor;
27 import java.util.function.BiConsumer;
28 import java.util.function.BiFunction;
29 import java.util.function.Consumer;
30 import java.util.function.Function;
31 import java.util.function.Supplier;
32 
33 import org.junit.Test;
34 import org.junit.runner.RunWith;
35 import org.junit.runners.JUnit4;
36 
37 @RunWith(JUnit4.class)
38 public class CompletionStageTest {
39 
40     static abstract class BaseStage<T> implements CompletionStage<T> {
41         private T resultObj = null;
42         private Throwable exception = null;
43         private Executor executor = null;
44 
BaseStage()45         BaseStage() {
46         }
47 
BaseStage(T result)48         BaseStage(T result) {
49             resultObj = result;
50         }
51 
BaseStage(Executor executor)52         BaseStage(Executor executor) {
53             this.executor = executor;
54         }
55 
hasExecutor()56         protected boolean hasExecutor() {
57             return executor != null;
58         }
59 
execute()60         protected void execute() throws Throwable {
61             executor.execute(() -> {
62                 try {
63                     resultObj = doGet();
64                 } catch (Throwable ex) {
65                     exception = ex;
66                 }
67             });
68             if (exception != null) {
69                 throw exception;
70             }
71         }
72 
isAsync()73         boolean isAsync() {
74             return false;
75         }
76 
completeExceptionally(Throwable ex)77         public void completeExceptionally(Throwable ex) {
78             this.exception = ex;
79         }
80 
doGet()81         protected abstract T doGet() throws Throwable;
82 
get()83         public T get() throws Throwable {
84             if (this.exception != null) {
85                 throw new TestCompletionException(exception);
86             }
87             if (resultObj != null) {
88                 return resultObj;
89             }
90 
91             if (!hasExecutor()) {
92                 resultObj = doGet();
93             } else {
94                 execute();
95             }
96             return resultObj;
97         }
98 
99         @Override
thenApply(Function<? super T,? extends U> fn)100         public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn) {
101             throw new UnsupportedOperationException();
102         }
103         @Override
thenApplyAsync(Function<? super T,? extends U> fn)104         public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn) {
105             throw new UnsupportedOperationException();
106         }
107         @Override
thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)108         public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn,
109                 Executor executor) {
110             throw new UnsupportedOperationException();
111         }
112         @Override
thenAccept(Consumer<? super T> action)113         public CompletionStage<Void> thenAccept(Consumer<? super T> action) {
114             throw new UnsupportedOperationException();
115         }
116         @Override
thenAcceptAsync(Consumer<? super T> action)117         public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action) {
118             throw new UnsupportedOperationException();
119         }
120         @Override
thenAcceptAsync(Consumer<? super T> action, Executor executor)121         public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,
122                 Executor executor) {
123             throw new UnsupportedOperationException();
124         }
125         @Override
thenRun(Runnable action)126         public CompletionStage<Void> thenRun(Runnable action) {
127             throw new UnsupportedOperationException();
128         }
129         @Override
thenRunAsync(Runnable action)130         public CompletionStage<Void> thenRunAsync(Runnable action) {
131             throw new UnsupportedOperationException();
132         }
133         @Override
thenRunAsync(Runnable action, Executor executor)134         public CompletionStage<Void> thenRunAsync(Runnable action,
135                 Executor executor) {
136             throw new UnsupportedOperationException();
137         }
138         @Override
thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)139         public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,
140                 BiFunction<? super T,? super U,? extends V> fn) {
141             throw new UnsupportedOperationException();
142         }
143         @Override
thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)144         public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,
145                 BiFunction<? super T,? super U,? extends V> fn) {
146             throw new UnsupportedOperationException();
147         }
148         @Override
thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)149         public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,
150                 BiFunction<? super T,? super U,? extends V> fn,
151                 Executor executor) {
152             throw new UnsupportedOperationException();
153         }
154         @Override
thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)155         public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,
156                 BiConsumer<? super T, ? super U> action) {
157             throw new UnsupportedOperationException();
158         }
159         @Override
thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)160         public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,
161                 BiConsumer<? super T, ? super U> action) {
162             throw new UnsupportedOperationException();
163         }
164         @Override
thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor)165         public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,
166                 BiConsumer<? super T, ? super U> action,
167                 Executor executor) {
168             throw new UnsupportedOperationException();
169         }
170         @Override
runAfterBoth(CompletionStage<?> other, Runnable action)171         public CompletionStage<Void> runAfterBoth(CompletionStage<?> other,
172                 Runnable action) {
173             throw new UnsupportedOperationException();
174         }
175         @Override
runAfterBothAsync(CompletionStage<?> other, Runnable action)176         public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,
177                 Runnable action) {
178             throw new UnsupportedOperationException();
179         }
180         @Override
runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor)181         public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,
182                 Runnable action,
183                 Executor executor) {
184             throw new UnsupportedOperationException();
185         }
186         @Override
applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn)187         public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,
188                 Function<? super T, U> fn) {
189             throw new UnsupportedOperationException();
190         }
191         @Override
applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn)192         public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,
193                 Function<? super T, U> fn) {
194             throw new UnsupportedOperationException();
195         }
196         @Override
applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn, Executor executor)197         public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,
198                 Function<? super T, U> fn,
199                 Executor executor) {
200             throw new UnsupportedOperationException();
201         }
202         @Override
acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)203         public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other,
204                 Consumer<? super T> action) {
205             throw new UnsupportedOperationException();
206         }
207         @Override
acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)208         public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,
209                 Consumer<? super T> action) {
210             throw new UnsupportedOperationException();
211         }
212         @Override
acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)213         public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,
214                 Consumer<? super T> action,
215                 Executor executor) {
216             throw new UnsupportedOperationException();
217         }
218         @Override
runAfterEither(CompletionStage<?> other, Runnable action)219         public CompletionStage<Void> runAfterEither(CompletionStage<?> other,
220                 Runnable action) {
221             throw new UnsupportedOperationException();
222         }
223         @Override
runAfterEitherAsync(CompletionStage<?> other, Runnable action)224         public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,
225                 Runnable action) {
226             throw new UnsupportedOperationException();
227         }
228         @Override
runAfterEitherAsync(CompletionStage<?> other, Runnable action, Executor executor)229         public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,
230                 Runnable action,
231                 Executor executor) {
232             throw new UnsupportedOperationException();
233         }
234 
235         @Override
thenCompose( Function<? super T, ? extends CompletionStage<U>> fn)236         public <U> CompletionStage<U> thenCompose(
237                 Function<? super T, ? extends CompletionStage<U>> fn) {
238             return new ComposeStage<T, U>(this, fn);
239         }
240 
241         @Override
thenComposeAsync( Function<? super T, ? extends CompletionStage<U>> fn)242         public <U> CompletionStage<U> thenComposeAsync(
243                 Function<? super T, ? extends CompletionStage<U>> fn) {
244             throw new UnsupportedOperationException();
245         }
246         @Override
thenComposeAsync( Function<? super T, ? extends CompletionStage<U>> fn, Executor executor)247         public <U> CompletionStage<U> thenComposeAsync(
248                 Function<? super T, ? extends CompletionStage<U>> fn,
249                 Executor executor) {
250             throw new UnsupportedOperationException();
251         }
252 
253         @Override
handle(BiFunction<? super T, Throwable, ? extends U> fn)254         public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn) {
255             return new HandleStage<T, U>(this, fn);
256         }
257 
258         @Override
handleAsync( BiFunction<? super T, Throwable, ? extends U> fn)259         public <U> CompletionStage<U> handleAsync(
260                 BiFunction<? super T, Throwable, ? extends U> fn) {
261             return new AsyncHandleStage<T, U>(this, fn);
262         }
263 
264         @Override
handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)265         public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,
266                 Executor executor) {
267             return new AsyncHandleStage<T, U>(this, fn, executor);
268         }
269 
270         @Override
whenComplete(BiConsumer<? super T, ? super Throwable> action)271         public CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> action) {
272             throw new UnsupportedOperationException();
273         }
274         @Override
whenCompleteAsync( BiConsumer<? super T, ? super Throwable> action)275         public CompletionStage<T> whenCompleteAsync(
276                 BiConsumer<? super T, ? super Throwable> action) {
277             throw new UnsupportedOperationException();
278         }
279         @Override
whenCompleteAsync( BiConsumer<? super T, ? super Throwable> action, Executor executor)280         public CompletionStage<T> whenCompleteAsync(
281                 BiConsumer<? super T, ? super Throwable> action,
282                 Executor executor) {
283             throw new UnsupportedOperationException();
284         }
285         @Override
exceptionally(Function<Throwable, ? extends T> fn)286         public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn) {
287             throw new UnsupportedOperationException();
288         }
289         @Override
toCompletableFuture()290         public CompletableFuture<T> toCompletableFuture() {
291             throw new UnsupportedOperationException();
292         }
293     }
294 
295     static abstract class ChildStage<T, U> extends BaseStage<U> {
296         private final BaseStage<T> parent;
297 
ChildStage(BaseStage<T> parent)298         ChildStage(BaseStage<T> parent) {
299             this.parent = parent;
300         }
301 
ChildStage(BaseStage<T> parent, Executor executor)302         ChildStage(BaseStage<T> parent, Executor executor) {
303             super(executor);
304             this.parent = parent;
305         }
306 
parentGet()307         protected T parentGet() throws Throwable {
308             return parent.get();
309         }
310     }
311 
312     static class HandleStage<T, U> extends ChildStage<T, U> {
313         private final BiFunction<? super T, Throwable, ? extends U> fn;
314 
HandleStage(BaseStage<T> parent, BiFunction<? super T, Throwable, ? extends U> fn)315         HandleStage(BaseStage<T> parent,
316                 BiFunction<? super T, Throwable, ? extends U> fn) {
317             super(parent);
318             this.fn = fn;
319         }
320 
HandleStage(BaseStage<T> parent, BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)321         HandleStage(BaseStage<T> parent,
322                 BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) {
323             super(parent, executor);
324             this.fn = fn;
325         }
326 
327         @Override
doGet()328         protected U doGet() throws Throwable {
329             T parentResult = null;
330             TestCompletionException thrownException = null;
331             try {
332                 parentResult = parentGet();
333             } catch (TestCompletionException ex) {
334                 thrownException = ex;
335                 if (isAsync()) {
336                     thrownException.markAsync();
337                 }
338             }
339 
340             return fn.apply(parentResult, thrownException);
341         }
342     }
343 
344     static class AsyncHandleStage<T, U> extends HandleStage<T, U> {
345 
AsyncHandleStage(BaseStage<T> parent, BiFunction<? super T, Throwable, ? extends U> fn)346         AsyncHandleStage(BaseStage<T> parent,
347                 BiFunction<? super T, Throwable, ? extends U> fn) {
348             super(parent, fn);
349         }
350 
AsyncHandleStage(BaseStage<T> parent, BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)351         AsyncHandleStage(BaseStage<T> parent,
352                 BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) {
353             super(parent, fn, executor);
354         }
355 
356         @Override
isAsync()357         boolean isAsync() {
358             return true;
359         }
360     }
361 
362     static final class ComposeStage<T, U> extends ChildStage<T, U> {
363         private final Function<? super T, ? extends CompletionStage<U>> fn;
364 
ComposeStage(BaseStage<T> parent, Function<? super T, ? extends CompletionStage<U>> fn)365         ComposeStage(BaseStage<T> parent,
366                 Function<? super T, ? extends CompletionStage<U>> fn) {
367             super(parent);
368             this.fn = fn;
369         }
370 
371         @Override
doGet()372         protected U doGet() throws Throwable {
373             T parentResult = null;
374             parentResult = parentGet();
375             BaseStage<U> next = (BaseStage<U>) fn.apply(parentResult);
376             return next.get();
377         }
378     }
379 
380     static final class SupplierStage<T> extends BaseStage<T> {
381         private Supplier<T> fn = null;
382 
SupplierStage()383         SupplierStage() {
384         }
385 
SupplierStage(Supplier<T> fn)386         SupplierStage(Supplier<T> fn) {
387             this.fn = fn;
388         }
389 
390         @Override
doGet()391         public T doGet() throws Throwable {
392 
393             if (fn != null) {
394                 return fn.get();
395             }
396 
397             return null;
398         }
399     }
400 
401     static final class ResolvedStage<T> extends BaseStage<T> {
ResolvedStage(T result)402         ResolvedStage(T result) {
403             super(result);
404         }
405 
406         @Override
doGet()407         public T doGet() throws Throwable {
408             // This shouldn't be reached as resultObj is already set.
409             return null;
410         }
411     }
412 
413     static final class TestCompletionException extends Throwable {
414         private boolean asyncHandled = false;
415 
TestCompletionException(Throwable cause)416         TestCompletionException(Throwable cause) {
417             super(cause);
418         }
419 
markAsync()420         void markAsync() {
421             asyncHandled = true;
422         }
423 
isAsyncHandled()424         boolean isAsyncHandled() {
425             return asyncHandled;
426         }
427     }
428 
429     static final class TestExecutor implements Executor {
430         private int executions = 0;
431 
432         @Override
execute(Runnable command)433         public void execute(Runnable command) {
434             ++executions;
435             command.run();
436         }
437 
wasRun()438         boolean wasRun() {
439             return executions > 0;
440         }
441     }
442 
443     static abstract class ExceptionalFunction<T, U>
444                     implements Function<TestCompletionException, U> {
445         private int invocationCount = 0;
446         private Throwable exception = null;
447         private boolean asyncCall = false;
448         final private T resultObj;
449 
ExceptionalFunction(T result)450         ExceptionalFunction(T result) {
451             resultObj = result;
452         }
453 
markInvoked(TestCompletionException ex)454         protected void markInvoked(TestCompletionException ex) {
455             invocationCount++;
456             exception = ex.getCause();
457             if (ex.isAsyncHandled()) {
458                 markAsync();
459             }
460         }
461 
getResult()462         protected T getResult() {
463             return resultObj;
464         }
465 
wasInvoked()466         boolean wasInvoked() {
467             return invocationCount > 0;
468         }
469 
wasAsync()470         boolean wasAsync() {
471             return asyncCall;
472         }
473 
markAsync()474         void markAsync() {
475             asyncCall = true;
476         }
477 
getException()478         Throwable getException() {
479             return exception;
480         }
481     }
482 
483     static final class ExceptionalCompletionFunction<T> extends ExceptionalFunction<T, T> {
484 
ExceptionalCompletionFunction(T result)485         ExceptionalCompletionFunction(T result) {
486             super(result);
487         }
488 
489         @Override
apply(TestCompletionException ex)490         public T apply(TestCompletionException ex) {
491             markInvoked(ex);
492             return getResult();
493         }
494     }
495 
496     static final class ExceptionalCompletionStageFunction<T>
497                     extends ExceptionalFunction<T, CompletionStage<T>> {
498 
ExceptionalCompletionStageFunction(T result)499         ExceptionalCompletionStageFunction(T result) {
500             super(result);
501         }
502 
503         @Override
apply(TestCompletionException ex)504         public CompletionStage<T> apply(TestCompletionException ex) {
505             markInvoked(ex);
506             return new ResolvedStage<T>(getResult());
507         }
508     }
509 
510     /**
511      * default exceptionallyAsync will provide for a stage with a new CompletionStage which will
512      * run asynchronously if the stage completes exceptionally.
513      *
514      * Validate that the supplied function was run on exceptional completion and it received the
515      * original exception.
516      */
517     @Test
testExceptionallyAsync()518     public void testExceptionallyAsync() throws Throwable {
519         BaseStage stage = new SupplierStage();
520         Object expectedResult = new Object();
521         Throwable expectedException = new RuntimeException();
522         ExceptionalCompletionFunction fn = new ExceptionalCompletionFunction(expectedResult);
523 
524         stage.completeExceptionally(expectedException);
525         stage = (BaseStage) stage.exceptionallyAsync(fn);
526         Object result = stage.get();
527 
528         assertTrue(fn.wasInvoked());
529         assertTrue(fn.wasAsync());
530         assertSame(expectedException, fn.getException());
531         assertSame(expectedResult, result);
532     }
533 
534     /**
535      * default exceptionallyAsync will provide for a stage with a new CompletionStage which will
536      * run asynchronously if the stage completes exceptionally. The new CompletionStage will be run
537      * in a provided executor.
538      *
539      * Validate that the supplied function was run on exceptional completion and it received the
540      * original exception. The run of the function occurred within the provided executor.
541      */
542     @Test
testExceptionallyAsyncWithExecutor()543     public void testExceptionallyAsyncWithExecutor() throws Throwable {
544         BaseStage stage = new SupplierStage();
545         Object expectedResult = new Object();
546         Throwable expectedException = new RuntimeException();
547         ExceptionalCompletionFunction fn = new ExceptionalCompletionFunction(expectedResult);
548         TestExecutor executor = new TestExecutor();
549 
550         stage.completeExceptionally(expectedException);
551         stage = (BaseStage) stage.exceptionallyAsync(fn, executor);
552         Object result = stage.get();
553 
554         assertTrue(fn.wasInvoked());
555         assertTrue(fn.wasAsync());
556         assertSame(expectedException, fn.getException());
557         assertSame(expectedResult, result);
558         assertTrue(executor.wasRun());
559     }
560 
561     /**
562      * default exceptionallyComposeAsync will compose a stage with a new CompletionStage which will
563      * run if the stage completes exceptionally.
564      *
565      * Validate that the supplied function was run on exceptional completion and it received the
566      * original exception.
567      */
568     @Test
testExceptionallyCompose()569     public void testExceptionallyCompose() throws Throwable {
570         BaseStage stage = new SupplierStage();
571         Object expectedResult = new Object();
572         Throwable expectedException = new RuntimeException();
573         ExceptionalCompletionStageFunction fn = new ExceptionalCompletionStageFunction(expectedResult);
574 
575         stage.completeExceptionally(expectedException);
576         stage = (BaseStage) stage.exceptionallyCompose(fn);
577         Object result = stage.get();
578 
579         assertTrue(fn.wasInvoked());
580         assertFalse(fn.wasAsync());
581         assertSame(expectedException, fn.getException());
582         assertSame(expectedResult, result);
583     }
584 
585     /**
586      * default exceptionallyComposeAsync will compose a stage with a new CompletionStage which will
587      * run asynchronously if the stage completes exceptionally.
588      *
589      * Validate that the supplied function was run on exceptional completion and it received the
590      * original exception.
591      */
592     @Test
testExceptionallyComposeAsync()593     public void testExceptionallyComposeAsync() throws Throwable {
594         BaseStage stage = new SupplierStage();
595         Object expectedResult = new Object();
596         Throwable expectedException = new RuntimeException();
597         ExceptionalCompletionStageFunction fn = new ExceptionalCompletionStageFunction(expectedResult);
598 
599         stage.completeExceptionally(expectedException);
600         stage = (BaseStage) stage.exceptionallyComposeAsync(fn);
601         Object result = stage.get();
602 
603         assertTrue(fn.wasInvoked());
604         assertTrue(fn.wasAsync());
605         assertSame(expectedException, fn.getException());
606         assertSame(expectedResult, result);
607     }
608 
609     /**
610      * default exceptionallyComposeAsync will compose a stage with a new CompletionStage which will
611      * run asynchronously if the stage completes exceptionally. The new CompletionStage will be run
612      * in a provided executor.
613      *
614      * Validate that the supplied function was run on exceptional completion and it received the
615      * original exception. The run of the function occurred within the provided executor.
616      */
617     @Test
testExceptionallyComposeAsyncWithExecutor()618     public void testExceptionallyComposeAsyncWithExecutor() throws Throwable {
619         BaseStage stage = new SupplierStage();
620         Object expectedResult = new Object();
621         Throwable expectedException = new RuntimeException();
622         ExceptionalCompletionStageFunction fn = new ExceptionalCompletionStageFunction(expectedResult);
623         TestExecutor executor = new TestExecutor();
624 
625         stage.completeExceptionally(expectedException);
626         stage = (BaseStage) stage.exceptionallyComposeAsync(fn, executor);
627         Object result = stage.get();
628 
629         assertTrue(fn.wasInvoked());
630         assertTrue(fn.wasAsync());
631         assertSame(expectedException, fn.getException());
632         assertSame(expectedResult, result);
633         assertTrue(executor.wasRun());
634     }
635 
636 }
637