1 /* 2 * Copyright (C) 2023 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package libcore.java.util.concurrent; 18 19 import static org.junit.Assert.assertEquals; 20 import static org.junit.Assert.assertFalse; 21 import static org.junit.Assert.assertSame; 22 import static org.junit.Assert.assertTrue; 23 24 import java.util.concurrent.CompletableFuture; 25 import java.util.concurrent.CompletionStage; 26 import java.util.concurrent.Executor; 27 import java.util.function.BiConsumer; 28 import java.util.function.BiFunction; 29 import java.util.function.Consumer; 30 import java.util.function.Function; 31 import java.util.function.Supplier; 32 33 import org.junit.Test; 34 import org.junit.runner.RunWith; 35 import org.junit.runners.JUnit4; 36 37 @RunWith(JUnit4.class) 38 public class CompletionStageTest { 39 40 static abstract class BaseStage<T> implements CompletionStage<T> { 41 private T resultObj = null; 42 private Throwable exception = null; 43 private Executor executor = null; 44 BaseStage()45 BaseStage() { 46 } 47 BaseStage(T result)48 BaseStage(T result) { 49 resultObj = result; 50 } 51 BaseStage(Executor executor)52 BaseStage(Executor executor) { 53 this.executor = executor; 54 } 55 hasExecutor()56 protected boolean hasExecutor() { 57 return executor != null; 58 } 59 execute()60 protected void execute() throws Throwable { 61 executor.execute(() -> { 62 try { 63 resultObj = doGet(); 64 } catch (Throwable ex) { 65 exception = ex; 66 } 67 }); 68 if (exception != null) { 69 throw exception; 70 } 71 } 72 isAsync()73 boolean isAsync() { 74 return false; 75 } 76 completeExceptionally(Throwable ex)77 public void completeExceptionally(Throwable ex) { 78 this.exception = ex; 79 } 80 doGet()81 protected abstract T doGet() throws Throwable; 82 get()83 public T get() throws Throwable { 84 if (this.exception != null) { 85 throw new TestCompletionException(exception); 86 } 87 if (resultObj != null) { 88 return resultObj; 89 } 90 91 if (!hasExecutor()) { 92 resultObj = doGet(); 93 } else { 94 execute(); 95 } 96 return resultObj; 97 } 98 99 @Override thenApply(Function<? super T,? extends U> fn)100 public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn) { 101 throw new UnsupportedOperationException(); 102 } 103 @Override thenApplyAsync(Function<? super T,? extends U> fn)104 public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn) { 105 throw new UnsupportedOperationException(); 106 } 107 @Override thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)108 public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn, 109 Executor executor) { 110 throw new UnsupportedOperationException(); 111 } 112 @Override thenAccept(Consumer<? super T> action)113 public CompletionStage<Void> thenAccept(Consumer<? super T> action) { 114 throw new UnsupportedOperationException(); 115 } 116 @Override thenAcceptAsync(Consumer<? super T> action)117 public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action) { 118 throw new UnsupportedOperationException(); 119 } 120 @Override thenAcceptAsync(Consumer<? super T> action, Executor executor)121 public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action, 122 Executor executor) { 123 throw new UnsupportedOperationException(); 124 } 125 @Override thenRun(Runnable action)126 public CompletionStage<Void> thenRun(Runnable action) { 127 throw new UnsupportedOperationException(); 128 } 129 @Override thenRunAsync(Runnable action)130 public CompletionStage<Void> thenRunAsync(Runnable action) { 131 throw new UnsupportedOperationException(); 132 } 133 @Override thenRunAsync(Runnable action, Executor executor)134 public CompletionStage<Void> thenRunAsync(Runnable action, 135 Executor executor) { 136 throw new UnsupportedOperationException(); 137 } 138 @Override thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)139 public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other, 140 BiFunction<? super T,? super U,? extends V> fn) { 141 throw new UnsupportedOperationException(); 142 } 143 @Override thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)144 public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other, 145 BiFunction<? super T,? super U,? extends V> fn) { 146 throw new UnsupportedOperationException(); 147 } 148 @Override thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)149 public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other, 150 BiFunction<? super T,? super U,? extends V> fn, 151 Executor executor) { 152 throw new UnsupportedOperationException(); 153 } 154 @Override thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)155 public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other, 156 BiConsumer<? super T, ? super U> action) { 157 throw new UnsupportedOperationException(); 158 } 159 @Override thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)160 public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, 161 BiConsumer<? super T, ? super U> action) { 162 throw new UnsupportedOperationException(); 163 } 164 @Override thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor)165 public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, 166 BiConsumer<? super T, ? super U> action, 167 Executor executor) { 168 throw new UnsupportedOperationException(); 169 } 170 @Override runAfterBoth(CompletionStage<?> other, Runnable action)171 public CompletionStage<Void> runAfterBoth(CompletionStage<?> other, 172 Runnable action) { 173 throw new UnsupportedOperationException(); 174 } 175 @Override runAfterBothAsync(CompletionStage<?> other, Runnable action)176 public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other, 177 Runnable action) { 178 throw new UnsupportedOperationException(); 179 } 180 @Override runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor)181 public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other, 182 Runnable action, 183 Executor executor) { 184 throw new UnsupportedOperationException(); 185 } 186 @Override applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn)187 public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other, 188 Function<? super T, U> fn) { 189 throw new UnsupportedOperationException(); 190 } 191 @Override applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn)192 public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other, 193 Function<? super T, U> fn) { 194 throw new UnsupportedOperationException(); 195 } 196 @Override applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn, Executor executor)197 public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other, 198 Function<? super T, U> fn, 199 Executor executor) { 200 throw new UnsupportedOperationException(); 201 } 202 @Override acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)203 public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other, 204 Consumer<? super T> action) { 205 throw new UnsupportedOperationException(); 206 } 207 @Override acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)208 public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other, 209 Consumer<? super T> action) { 210 throw new UnsupportedOperationException(); 211 } 212 @Override acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)213 public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other, 214 Consumer<? super T> action, 215 Executor executor) { 216 throw new UnsupportedOperationException(); 217 } 218 @Override runAfterEither(CompletionStage<?> other, Runnable action)219 public CompletionStage<Void> runAfterEither(CompletionStage<?> other, 220 Runnable action) { 221 throw new UnsupportedOperationException(); 222 } 223 @Override runAfterEitherAsync(CompletionStage<?> other, Runnable action)224 public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other, 225 Runnable action) { 226 throw new UnsupportedOperationException(); 227 } 228 @Override runAfterEitherAsync(CompletionStage<?> other, Runnable action, Executor executor)229 public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other, 230 Runnable action, 231 Executor executor) { 232 throw new UnsupportedOperationException(); 233 } 234 235 @Override thenCompose( Function<? super T, ? extends CompletionStage<U>> fn)236 public <U> CompletionStage<U> thenCompose( 237 Function<? super T, ? extends CompletionStage<U>> fn) { 238 return new ComposeStage<T, U>(this, fn); 239 } 240 241 @Override thenComposeAsync( Function<? super T, ? extends CompletionStage<U>> fn)242 public <U> CompletionStage<U> thenComposeAsync( 243 Function<? super T, ? extends CompletionStage<U>> fn) { 244 throw new UnsupportedOperationException(); 245 } 246 @Override thenComposeAsync( Function<? super T, ? extends CompletionStage<U>> fn, Executor executor)247 public <U> CompletionStage<U> thenComposeAsync( 248 Function<? super T, ? extends CompletionStage<U>> fn, 249 Executor executor) { 250 throw new UnsupportedOperationException(); 251 } 252 253 @Override handle(BiFunction<? super T, Throwable, ? extends U> fn)254 public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn) { 255 return new HandleStage<T, U>(this, fn); 256 } 257 258 @Override handleAsync( BiFunction<? super T, Throwable, ? extends U> fn)259 public <U> CompletionStage<U> handleAsync( 260 BiFunction<? super T, Throwable, ? extends U> fn) { 261 return new AsyncHandleStage<T, U>(this, fn); 262 } 263 264 @Override handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)265 public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, 266 Executor executor) { 267 return new AsyncHandleStage<T, U>(this, fn, executor); 268 } 269 270 @Override whenComplete(BiConsumer<? super T, ? super Throwable> action)271 public CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> action) { 272 throw new UnsupportedOperationException(); 273 } 274 @Override whenCompleteAsync( BiConsumer<? super T, ? super Throwable> action)275 public CompletionStage<T> whenCompleteAsync( 276 BiConsumer<? super T, ? super Throwable> action) { 277 throw new UnsupportedOperationException(); 278 } 279 @Override whenCompleteAsync( BiConsumer<? super T, ? super Throwable> action, Executor executor)280 public CompletionStage<T> whenCompleteAsync( 281 BiConsumer<? super T, ? super Throwable> action, 282 Executor executor) { 283 throw new UnsupportedOperationException(); 284 } 285 @Override exceptionally(Function<Throwable, ? extends T> fn)286 public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn) { 287 throw new UnsupportedOperationException(); 288 } 289 @Override toCompletableFuture()290 public CompletableFuture<T> toCompletableFuture() { 291 throw new UnsupportedOperationException(); 292 } 293 } 294 295 static abstract class ChildStage<T, U> extends BaseStage<U> { 296 private final BaseStage<T> parent; 297 ChildStage(BaseStage<T> parent)298 ChildStage(BaseStage<T> parent) { 299 this.parent = parent; 300 } 301 ChildStage(BaseStage<T> parent, Executor executor)302 ChildStage(BaseStage<T> parent, Executor executor) { 303 super(executor); 304 this.parent = parent; 305 } 306 parentGet()307 protected T parentGet() throws Throwable { 308 return parent.get(); 309 } 310 } 311 312 static class HandleStage<T, U> extends ChildStage<T, U> { 313 private final BiFunction<? super T, Throwable, ? extends U> fn; 314 HandleStage(BaseStage<T> parent, BiFunction<? super T, Throwable, ? extends U> fn)315 HandleStage(BaseStage<T> parent, 316 BiFunction<? super T, Throwable, ? extends U> fn) { 317 super(parent); 318 this.fn = fn; 319 } 320 HandleStage(BaseStage<T> parent, BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)321 HandleStage(BaseStage<T> parent, 322 BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) { 323 super(parent, executor); 324 this.fn = fn; 325 } 326 327 @Override doGet()328 protected U doGet() throws Throwable { 329 T parentResult = null; 330 TestCompletionException thrownException = null; 331 try { 332 parentResult = parentGet(); 333 } catch (TestCompletionException ex) { 334 thrownException = ex; 335 if (isAsync()) { 336 thrownException.markAsync(); 337 } 338 } 339 340 return fn.apply(parentResult, thrownException); 341 } 342 } 343 344 static class AsyncHandleStage<T, U> extends HandleStage<T, U> { 345 AsyncHandleStage(BaseStage<T> parent, BiFunction<? super T, Throwable, ? extends U> fn)346 AsyncHandleStage(BaseStage<T> parent, 347 BiFunction<? super T, Throwable, ? extends U> fn) { 348 super(parent, fn); 349 } 350 AsyncHandleStage(BaseStage<T> parent, BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)351 AsyncHandleStage(BaseStage<T> parent, 352 BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) { 353 super(parent, fn, executor); 354 } 355 356 @Override isAsync()357 boolean isAsync() { 358 return true; 359 } 360 } 361 362 static final class ComposeStage<T, U> extends ChildStage<T, U> { 363 private final Function<? super T, ? extends CompletionStage<U>> fn; 364 ComposeStage(BaseStage<T> parent, Function<? super T, ? extends CompletionStage<U>> fn)365 ComposeStage(BaseStage<T> parent, 366 Function<? super T, ? extends CompletionStage<U>> fn) { 367 super(parent); 368 this.fn = fn; 369 } 370 371 @Override doGet()372 protected U doGet() throws Throwable { 373 T parentResult = null; 374 parentResult = parentGet(); 375 BaseStage<U> next = (BaseStage<U>) fn.apply(parentResult); 376 return next.get(); 377 } 378 } 379 380 static final class SupplierStage<T> extends BaseStage<T> { 381 private Supplier<T> fn = null; 382 SupplierStage()383 SupplierStage() { 384 } 385 SupplierStage(Supplier<T> fn)386 SupplierStage(Supplier<T> fn) { 387 this.fn = fn; 388 } 389 390 @Override doGet()391 public T doGet() throws Throwable { 392 393 if (fn != null) { 394 return fn.get(); 395 } 396 397 return null; 398 } 399 } 400 401 static final class ResolvedStage<T> extends BaseStage<T> { ResolvedStage(T result)402 ResolvedStage(T result) { 403 super(result); 404 } 405 406 @Override doGet()407 public T doGet() throws Throwable { 408 // This shouldn't be reached as resultObj is already set. 409 return null; 410 } 411 } 412 413 static final class TestCompletionException extends Throwable { 414 private boolean asyncHandled = false; 415 TestCompletionException(Throwable cause)416 TestCompletionException(Throwable cause) { 417 super(cause); 418 } 419 markAsync()420 void markAsync() { 421 asyncHandled = true; 422 } 423 isAsyncHandled()424 boolean isAsyncHandled() { 425 return asyncHandled; 426 } 427 } 428 429 static final class TestExecutor implements Executor { 430 private int executions = 0; 431 432 @Override execute(Runnable command)433 public void execute(Runnable command) { 434 ++executions; 435 command.run(); 436 } 437 wasRun()438 boolean wasRun() { 439 return executions > 0; 440 } 441 } 442 443 static abstract class ExceptionalFunction<T, U> 444 implements Function<TestCompletionException, U> { 445 private int invocationCount = 0; 446 private Throwable exception = null; 447 private boolean asyncCall = false; 448 final private T resultObj; 449 ExceptionalFunction(T result)450 ExceptionalFunction(T result) { 451 resultObj = result; 452 } 453 markInvoked(TestCompletionException ex)454 protected void markInvoked(TestCompletionException ex) { 455 invocationCount++; 456 exception = ex.getCause(); 457 if (ex.isAsyncHandled()) { 458 markAsync(); 459 } 460 } 461 getResult()462 protected T getResult() { 463 return resultObj; 464 } 465 wasInvoked()466 boolean wasInvoked() { 467 return invocationCount > 0; 468 } 469 wasAsync()470 boolean wasAsync() { 471 return asyncCall; 472 } 473 markAsync()474 void markAsync() { 475 asyncCall = true; 476 } 477 getException()478 Throwable getException() { 479 return exception; 480 } 481 } 482 483 static final class ExceptionalCompletionFunction<T> extends ExceptionalFunction<T, T> { 484 ExceptionalCompletionFunction(T result)485 ExceptionalCompletionFunction(T result) { 486 super(result); 487 } 488 489 @Override apply(TestCompletionException ex)490 public T apply(TestCompletionException ex) { 491 markInvoked(ex); 492 return getResult(); 493 } 494 } 495 496 static final class ExceptionalCompletionStageFunction<T> 497 extends ExceptionalFunction<T, CompletionStage<T>> { 498 ExceptionalCompletionStageFunction(T result)499 ExceptionalCompletionStageFunction(T result) { 500 super(result); 501 } 502 503 @Override apply(TestCompletionException ex)504 public CompletionStage<T> apply(TestCompletionException ex) { 505 markInvoked(ex); 506 return new ResolvedStage<T>(getResult()); 507 } 508 } 509 510 /** 511 * default exceptionallyAsync will provide for a stage with a new CompletionStage which will 512 * run asynchronously if the stage completes exceptionally. 513 * 514 * Validate that the supplied function was run on exceptional completion and it received the 515 * original exception. 516 */ 517 @Test testExceptionallyAsync()518 public void testExceptionallyAsync() throws Throwable { 519 BaseStage stage = new SupplierStage(); 520 Object expectedResult = new Object(); 521 Throwable expectedException = new RuntimeException(); 522 ExceptionalCompletionFunction fn = new ExceptionalCompletionFunction(expectedResult); 523 524 stage.completeExceptionally(expectedException); 525 stage = (BaseStage) stage.exceptionallyAsync(fn); 526 Object result = stage.get(); 527 528 assertTrue(fn.wasInvoked()); 529 assertTrue(fn.wasAsync()); 530 assertSame(expectedException, fn.getException()); 531 assertSame(expectedResult, result); 532 } 533 534 /** 535 * default exceptionallyAsync will provide for a stage with a new CompletionStage which will 536 * run asynchronously if the stage completes exceptionally. The new CompletionStage will be run 537 * in a provided executor. 538 * 539 * Validate that the supplied function was run on exceptional completion and it received the 540 * original exception. The run of the function occurred within the provided executor. 541 */ 542 @Test testExceptionallyAsyncWithExecutor()543 public void testExceptionallyAsyncWithExecutor() throws Throwable { 544 BaseStage stage = new SupplierStage(); 545 Object expectedResult = new Object(); 546 Throwable expectedException = new RuntimeException(); 547 ExceptionalCompletionFunction fn = new ExceptionalCompletionFunction(expectedResult); 548 TestExecutor executor = new TestExecutor(); 549 550 stage.completeExceptionally(expectedException); 551 stage = (BaseStage) stage.exceptionallyAsync(fn, executor); 552 Object result = stage.get(); 553 554 assertTrue(fn.wasInvoked()); 555 assertTrue(fn.wasAsync()); 556 assertSame(expectedException, fn.getException()); 557 assertSame(expectedResult, result); 558 assertTrue(executor.wasRun()); 559 } 560 561 /** 562 * default exceptionallyComposeAsync will compose a stage with a new CompletionStage which will 563 * run if the stage completes exceptionally. 564 * 565 * Validate that the supplied function was run on exceptional completion and it received the 566 * original exception. 567 */ 568 @Test testExceptionallyCompose()569 public void testExceptionallyCompose() throws Throwable { 570 BaseStage stage = new SupplierStage(); 571 Object expectedResult = new Object(); 572 Throwable expectedException = new RuntimeException(); 573 ExceptionalCompletionStageFunction fn = new ExceptionalCompletionStageFunction(expectedResult); 574 575 stage.completeExceptionally(expectedException); 576 stage = (BaseStage) stage.exceptionallyCompose(fn); 577 Object result = stage.get(); 578 579 assertTrue(fn.wasInvoked()); 580 assertFalse(fn.wasAsync()); 581 assertSame(expectedException, fn.getException()); 582 assertSame(expectedResult, result); 583 } 584 585 /** 586 * default exceptionallyComposeAsync will compose a stage with a new CompletionStage which will 587 * run asynchronously if the stage completes exceptionally. 588 * 589 * Validate that the supplied function was run on exceptional completion and it received the 590 * original exception. 591 */ 592 @Test testExceptionallyComposeAsync()593 public void testExceptionallyComposeAsync() throws Throwable { 594 BaseStage stage = new SupplierStage(); 595 Object expectedResult = new Object(); 596 Throwable expectedException = new RuntimeException(); 597 ExceptionalCompletionStageFunction fn = new ExceptionalCompletionStageFunction(expectedResult); 598 599 stage.completeExceptionally(expectedException); 600 stage = (BaseStage) stage.exceptionallyComposeAsync(fn); 601 Object result = stage.get(); 602 603 assertTrue(fn.wasInvoked()); 604 assertTrue(fn.wasAsync()); 605 assertSame(expectedException, fn.getException()); 606 assertSame(expectedResult, result); 607 } 608 609 /** 610 * default exceptionallyComposeAsync will compose a stage with a new CompletionStage which will 611 * run asynchronously if the stage completes exceptionally. The new CompletionStage will be run 612 * in a provided executor. 613 * 614 * Validate that the supplied function was run on exceptional completion and it received the 615 * original exception. The run of the function occurred within the provided executor. 616 */ 617 @Test testExceptionallyComposeAsyncWithExecutor()618 public void testExceptionallyComposeAsyncWithExecutor() throws Throwable { 619 BaseStage stage = new SupplierStage(); 620 Object expectedResult = new Object(); 621 Throwable expectedException = new RuntimeException(); 622 ExceptionalCompletionStageFunction fn = new ExceptionalCompletionStageFunction(expectedResult); 623 TestExecutor executor = new TestExecutor(); 624 625 stage.completeExceptionally(expectedException); 626 stage = (BaseStage) stage.exceptionallyComposeAsync(fn, executor); 627 Object result = stage.get(); 628 629 assertTrue(fn.wasInvoked()); 630 assertTrue(fn.wasAsync()); 631 assertSame(expectedException, fn.getException()); 632 assertSame(expectedResult, result); 633 assertTrue(executor.wasRun()); 634 } 635 636 } 637