001package org.reactivestreams.tck; 002 003import org.reactivestreams.Publisher; 004import org.reactivestreams.Subscriber; 005import org.reactivestreams.Subscription; 006import org.reactivestreams.tck.support.SubscriberBufferOverflowException; 007import org.reactivestreams.tck.support.Optional; 008 009import java.util.LinkedList; 010import java.util.List; 011import java.util.concurrent.ArrayBlockingQueue; 012import java.util.concurrent.CopyOnWriteArrayList; 013import java.util.concurrent.CountDownLatch; 014import java.util.concurrent.TimeUnit; 015 016import static org.testng.Assert.assertTrue; 017import static org.testng.Assert.fail; 018 019public class TestEnvironment { 020 public static final int TEST_BUFFER_SIZE = 16; 021 022 private static final String DEFAULT_TIMEOUT_MILLIS_ENV = "DEFAULT_TIMEOUT_MILLIS"; 023 private static final long DEFAULT_TIMEOUT_MILLIS = 100; 024 025 private final long defaultTimeoutMillis; 026 private final boolean printlnDebug; 027 028 private CopyOnWriteArrayList<Throwable> asyncErrors = new CopyOnWriteArrayList<Throwable>(); 029 030 /** 031 * Tests must specify the timeout for expected outcome of asynchronous 032 * interactions. Longer timeout does not invalidate the correctness of 033 * the implementation, but can in some cases result in longer time to 034 * run the tests. 035 * 036 * @param defaultTimeoutMillis default timeout to be used in all expect* methods 037 * @param printlnDebug if true, signals such as OnNext / Request / OnComplete etc will be printed to standard output, 038 * often helpful to pinpoint simple race conditions etc. 039 */ 040 public TestEnvironment(long defaultTimeoutMillis, boolean printlnDebug) { 041 this.defaultTimeoutMillis = defaultTimeoutMillis; 042 this.printlnDebug = printlnDebug; 043 } 044 045 /** 046 * Tests must specify the timeout for expected outcome of asynchronous 047 * interactions. Longer timeout does not invalidate the correctness of 048 * the implementation, but can in some cases result in longer time to 049 * run the tests. 050 * 051 * @param defaultTimeoutMillis default timeout to be used in all expect* methods 052 */ 053 public TestEnvironment(long defaultTimeoutMillis) { 054 this(defaultTimeoutMillis, false); 055 } 056 057 /** 058 * Tests must specify the timeout for expected outcome of asynchronous 059 * interactions. Longer timeout does not invalidate the correctness of 060 * the implementation, but can in some cases result in longer time to 061 * run the tests. 062 * 063 * The default timeout for all expect* methods will be obtained by either the env variable {@code DEFAULT_TIMEOUT_MILLIS} 064 * or the default value ({@link TestEnvironment#DEFAULT_TIMEOUT_MILLIS}) will be used. 065 * 066 * @param printlnDebug if true, signals such as OnNext / Request / OnComplete etc will be printed to standard output, 067 * often helpful to pinpoint simple race conditions etc. 068 */ 069 public TestEnvironment(boolean printlnDebug) { 070 this(envDefaultTimeoutMillis(), printlnDebug); 071 } 072 073 /** 074 * Tests must specify the timeout for expected outcome of asynchronous 075 * interactions. Longer timeout does not invalidate the correctness of 076 * the implementation, but can in some cases result in longer time to 077 * run the tests. 078 * 079 * The default timeout for all expect* methods will be obtained by either the env variable {@code DEFAULT_TIMEOUT_MILLIS} 080 * or the default value ({@link TestEnvironment#DEFAULT_TIMEOUT_MILLIS}) will be used. 081 */ 082 public TestEnvironment() { 083 this(envDefaultTimeoutMillis()); 084 } 085 086 public long defaultTimeoutMillis() { 087 return defaultTimeoutMillis; 088 } 089 090 /** 091 * Tries to parse the env variable {@code DEFAULT_TIMEOUT_MILLIS} as long and returns the value if present OR its default value. 092 * 093 * @throws java.lang.IllegalArgumentException when unable to parse the env variable 094 */ 095 public static long envDefaultTimeoutMillis() { 096 final String envMillis = System.getenv(DEFAULT_TIMEOUT_MILLIS_ENV); 097 if (envMillis == null) return DEFAULT_TIMEOUT_MILLIS; 098 else try { 099 return Long.parseLong(envMillis); 100 } catch(NumberFormatException ex) { 101 throw new IllegalArgumentException(String.format("Unable to parse %s env value [%s] as long!", DEFAULT_TIMEOUT_MILLIS_ENV, envMillis), ex); 102 } 103 } 104 105 /** 106 * To flop means to "fail asynchronously", either by onErroring or by failing some TCK check triggered asynchronously. 107 * This method does *NOT* fail the test - it's up to inspections of the error to fail the test if required. 108 * 109 * Use {@code env.verifyNoAsyncErrorsNoDelay()} at the end of your TCK tests to verify there no flops called during it's execution. 110 * To check investigate asyncErrors more closely you can use {@code expectError} methods or collect the error directly 111 * from the environment using {@code env.dropAsyncError()}. 112 * 113 * To clear asyncErrors you can call {@link org.reactivestreams.tck.TestEnvironment#clearAsyncErrors()} 114 */ 115 public void flop(String msg) { 116 try { 117 fail(msg); 118 } catch (Throwable t) { 119 asyncErrors.add(t); 120 } 121 } 122 123 /** 124 * To flop means to "fail asynchronously", either by onErroring or by failing some TCK check triggered asynchronously. 125 * This method does *NOT* fail the test - it's up to inspections of the error to fail the test if required. 126 * 127 * This overload keeps the passed in throwable as the asyncError, instead of creating an AssertionError for this. 128 * 129 * Use {@code env.verifyNoAsyncErrorsNoDelay()} at the end of your TCK tests to verify there no flops called during it's execution. 130 * To check investigate asyncErrors more closely you can use {@code expectError} methods or collect the error directly 131 * from the environment using {@code env.dropAsyncError()}. 132 * 133 * To clear asyncErrors you can call {@link org.reactivestreams.tck.TestEnvironment#clearAsyncErrors()} 134 */ 135 public void flop(Throwable thr, String msg) { 136 try { 137 fail(msg, thr); 138 } catch (Throwable t) { 139 asyncErrors.add(thr); 140 } 141 } 142 143 /** 144 * To flop means to "fail asynchronously", either by onErroring or by failing some TCK check triggered asynchronously. 145 * This method does *NOT* fail the test - it's up to inspections of the error to fail the test if required. 146 * 147 * This overload keeps the passed in throwable as the asyncError, instead of creating an AssertionError for this. 148 * 149 * Use {@code env.verifyNoAsyncErrorsNoDelay()} at the end of your TCK tests to verify there no flops called during it's execution. 150 * To check investigate asyncErrors more closely you can use {@code expectError} methods or collect the error directly 151 * from the environment using {@code env.dropAsyncError()}. 152 * 153 * To clear asyncErrors you can call {@link org.reactivestreams.tck.TestEnvironment#clearAsyncErrors()} 154 */ 155 public void flop(Throwable thr) { 156 try { 157 fail(thr.getMessage(), thr); 158 } catch (Throwable t) { 159 asyncErrors.add(thr); 160 } 161 } 162 163 /** 164 * To flop means to "fail asynchronously", either by onErroring or by failing some TCK check triggered asynchronously. 165 * 166 * This method DOES fail the test right away (it tries to, by throwing an AssertionException), 167 * in such it is different from {@link org.reactivestreams.tck.TestEnvironment#flop} which only records the error. 168 * 169 * Use {@code env.verifyNoAsyncErrorsNoDelay()} at the end of your TCK tests to verify there no flops called during it's execution. 170 * To check investigate asyncErrors more closely you can use {@code expectError} methods or collect the error directly 171 * from the environment using {@code env.dropAsyncError()}. 172 * 173 * To clear asyncErrors you can call {@link org.reactivestreams.tck.TestEnvironment#clearAsyncErrors()} 174 */ 175 public <T> T flopAndFail(String msg) { 176 try { 177 fail(msg); 178 } catch (Throwable t) { 179 asyncErrors.add(t); 180 fail(msg, t); 181 } 182 return null; // unreachable, the previous block will always exit by throwing 183 } 184 185 186 187 public <T> void subscribe(Publisher<T> pub, TestSubscriber<T> sub) throws InterruptedException { 188 subscribe(pub, sub, defaultTimeoutMillis); 189 } 190 191 public <T> void subscribe(Publisher<T> pub, TestSubscriber<T> sub, long timeoutMillis) throws InterruptedException { 192 pub.subscribe(sub); 193 sub.subscription.expectCompletion(timeoutMillis, String.format("Could not subscribe %s to Publisher %s", sub, pub)); 194 verifyNoAsyncErrorsNoDelay(); 195 } 196 197 public <T> ManualSubscriber<T> newBlackholeSubscriber(Publisher<T> pub) throws InterruptedException { 198 ManualSubscriberWithSubscriptionSupport<T> sub = new BlackholeSubscriberWithSubscriptionSupport<T>(this); 199 subscribe(pub, sub, defaultTimeoutMillis()); 200 return sub; 201 } 202 203 public <T> ManualSubscriber<T> newManualSubscriber(Publisher<T> pub) throws InterruptedException { 204 return newManualSubscriber(pub, defaultTimeoutMillis()); 205 } 206 207 public <T> ManualSubscriber<T> newManualSubscriber(Publisher<T> pub, long timeoutMillis) throws InterruptedException { 208 ManualSubscriberWithSubscriptionSupport<T> sub = new ManualSubscriberWithSubscriptionSupport<T>(this); 209 subscribe(pub, sub, timeoutMillis); 210 return sub; 211 } 212 213 public void clearAsyncErrors() { 214 asyncErrors.clear(); 215 } 216 217 public Throwable dropAsyncError() { 218 try { 219 return asyncErrors.remove(0); 220 } catch (IndexOutOfBoundsException ex) { 221 return null; 222 } 223 } 224 225 /** 226 * Waits for {@link TestEnvironment#defaultTimeoutMillis()} and then verifies that no asynchronous errors 227 * were signalled pior to, or during that time (by calling {@code flop()}). 228 */ 229 public void verifyNoAsyncErrors() { 230 verifyNoAsyncErrors(defaultTimeoutMillis()); 231 } 232 233 /** 234 * This version of {@code verifyNoAsyncErrors} should be used when errors still could be signalled 235 * asynchronously during {@link TestEnvironment#defaultTimeoutMillis()} time. 236 * <p></p> 237 * It will immediatly check if any async errors were signaled (using {@link TestEnvironment#flop(String)}, 238 * and if no errors encountered wait for another default timeout as the errors may yet be signalled. 239 * The initial check is performed in order to fail-fast in case of an already failed test. 240 */ 241 public void verifyNoAsyncErrors(long delay) { 242 try { 243 verifyNoAsyncErrorsNoDelay(); 244 245 Thread.sleep(delay); 246 verifyNoAsyncErrorsNoDelay(); 247 } catch (InterruptedException e) { 248 throw new RuntimeException(e); 249 } 250 } 251 252 /** 253 * Verifies that no asynchronous errors were signalled pior to calling this method (by calling {@code flop()}). 254 * This version of verifyNoAsyncError <b>does not wait before checking for asynchronous errors</b>, and is to be used 255 * for example in tight loops etc. 256 */ 257 public void verifyNoAsyncErrorsNoDelay() { 258 for (Throwable e : asyncErrors) { 259 if (e instanceof AssertionError) { 260 throw (AssertionError) e; 261 } else { 262 fail(String.format("Async error during test execution: %s", e.getMessage()), e); 263 } 264 } 265 } 266 267 /** If {@code TestEnvironment#printlnDebug} is true, print debug message to std out. */ 268 public void debug(String msg) { 269 if (printlnDebug) 270 System.out.printf("[TCK-DEBUG] %s%n", msg); 271 } 272 273 /** 274 * Looks for given {@code method} method in stack trace. 275 * Can be used to answer questions like "was this method called from onComplete?". 276 * 277 * @return the caller's StackTraceElement at which he the looked for method was found in the call stack, EMPTY otherwise 278 */ 279 public Optional<StackTraceElement> findCallerMethodInStackTrace(String method) { 280 final Throwable thr = new Throwable(); // gets the stacktrace 281 282 for (StackTraceElement stackElement : thr.getStackTrace()) { 283 if (stackElement.getMethodName().equals(method)) { 284 return Optional.of(stackElement); 285 } 286 } 287 return Optional.empty(); 288 } 289 290 // ---- classes ---- 291 292 /** 293 * {@link Subscriber} implementation which can be steered by test code and asserted on. 294 */ 295 public static class ManualSubscriber<T> extends TestSubscriber<T> { 296 Receptacle<T> received; 297 298 public ManualSubscriber(TestEnvironment env) { 299 super(env); 300 received = new Receptacle<T>(this.env); 301 } 302 303 @Override 304 public void onNext(T element) { 305 try { 306 received.add(element); 307 } catch (IllegalStateException ex) { 308 // error message refinement 309 throw new SubscriberBufferOverflowException( 310 String.format("Received more than bufferSize (%d) onNext signals. " + 311 "The Publisher probably emited more signals than expected!", 312 received.QUEUE_SIZE), ex); 313 } 314 } 315 316 @Override 317 public void onComplete() { 318 received.complete(); 319 } 320 321 public void request(long elements) { 322 subscription.value().request(elements); 323 } 324 325 public T requestNextElement() throws InterruptedException { 326 return requestNextElement(env.defaultTimeoutMillis()); 327 } 328 329 public T requestNextElement(long timeoutMillis) throws InterruptedException { 330 return requestNextElement(timeoutMillis, "Did not receive expected element"); 331 } 332 333 public T requestNextElement(String errorMsg) throws InterruptedException { 334 return requestNextElement(env.defaultTimeoutMillis(), errorMsg); 335 } 336 337 public T requestNextElement(long timeoutMillis, String errorMsg) throws InterruptedException { 338 request(1); 339 return nextElement(timeoutMillis, errorMsg); 340 } 341 342 public Optional<T> requestNextElementOrEndOfStream(String errorMsg) throws InterruptedException { 343 return requestNextElementOrEndOfStream(env.defaultTimeoutMillis(), errorMsg); 344 } 345 346 public Optional<T> requestNextElementOrEndOfStream(long timeoutMillis) throws InterruptedException { 347 return requestNextElementOrEndOfStream(timeoutMillis, "Did not receive expected stream completion"); 348 } 349 350 public Optional<T> requestNextElementOrEndOfStream(long timeoutMillis, String errorMsg) throws InterruptedException { 351 request(1); 352 return nextElementOrEndOfStream(timeoutMillis, errorMsg); 353 } 354 355 public void requestEndOfStream() throws InterruptedException { 356 requestEndOfStream(env.defaultTimeoutMillis(), "Did not receive expected stream completion"); 357 } 358 359 public void requestEndOfStream(long timeoutMillis) throws InterruptedException { 360 requestEndOfStream(timeoutMillis, "Did not receive expected stream completion"); 361 } 362 363 public void requestEndOfStream(String errorMsg) throws InterruptedException { 364 requestEndOfStream(env.defaultTimeoutMillis(), errorMsg); 365 } 366 367 public void requestEndOfStream(long timeoutMillis, String errorMsg) throws InterruptedException { 368 request(1); 369 expectCompletion(timeoutMillis, errorMsg); 370 } 371 372 public List<T> requestNextElements(long elements) throws InterruptedException { 373 request(elements); 374 return nextElements(elements, env.defaultTimeoutMillis()); 375 } 376 377 public List<T> requestNextElements(long elements, long timeoutMillis) throws InterruptedException { 378 request(elements); 379 return nextElements(elements, timeoutMillis, String.format("Did not receive %d expected elements", elements)); 380 } 381 382 public List<T> requestNextElements(long elements, long timeoutMillis, String errorMsg) throws InterruptedException { 383 request(elements); 384 return nextElements(elements, timeoutMillis, errorMsg); 385 } 386 387 public T nextElement() throws InterruptedException { 388 return nextElement(env.defaultTimeoutMillis()); 389 } 390 391 public T nextElement(long timeoutMillis) throws InterruptedException { 392 return nextElement(timeoutMillis, "Did not receive expected element"); 393 } 394 395 public T nextElement(String errorMsg) throws InterruptedException { 396 return nextElement(env.defaultTimeoutMillis(), errorMsg); 397 } 398 399 public T nextElement(long timeoutMillis, String errorMsg) throws InterruptedException { 400 return received.next(timeoutMillis, errorMsg); 401 } 402 403 public Optional<T> nextElementOrEndOfStream() throws InterruptedException { 404 return nextElementOrEndOfStream(env.defaultTimeoutMillis(), "Did not receive expected stream completion"); 405 } 406 407 public Optional<T> nextElementOrEndOfStream(long timeoutMillis) throws InterruptedException { 408 return nextElementOrEndOfStream(timeoutMillis, "Did not receive expected stream completion"); 409 } 410 411 public Optional<T> nextElementOrEndOfStream(long timeoutMillis, String errorMsg) throws InterruptedException { 412 return received.nextOrEndOfStream(timeoutMillis, errorMsg); 413 } 414 415 public List<T> nextElements(long elements) throws InterruptedException { 416 return nextElements(elements, env.defaultTimeoutMillis(), "Did not receive expected element or completion"); 417 } 418 419 public List<T> nextElements(long elements, String errorMsg) throws InterruptedException { 420 return nextElements(elements, env.defaultTimeoutMillis(), errorMsg); 421 } 422 423 public List<T> nextElements(long elements, long timeoutMillis) throws InterruptedException { 424 return nextElements(elements, timeoutMillis, "Did not receive expected element or completion"); 425 } 426 427 public List<T> nextElements(long elements, long timeoutMillis, String errorMsg) throws InterruptedException { 428 return received.nextN(elements, timeoutMillis, errorMsg); 429 } 430 431 public void expectNext(T expected) throws InterruptedException { 432 expectNext(expected, env.defaultTimeoutMillis()); 433 } 434 435 public void expectNext(T expected, long timeoutMillis) throws InterruptedException { 436 T received = nextElement(timeoutMillis, "Did not receive expected element on downstream"); 437 if (!received.equals(expected)) { 438 env.flop(String.format("Expected element %s on downstream but received %s", expected, received)); 439 } 440 } 441 442 public void expectCompletion() throws InterruptedException { 443 expectCompletion(env.defaultTimeoutMillis(), "Did not receive expected stream completion"); 444 } 445 446 public void expectCompletion(long timeoutMillis) throws InterruptedException { 447 expectCompletion(timeoutMillis, "Did not receive expected stream completion"); 448 } 449 450 public void expectCompletion(String errorMsg) throws InterruptedException { 451 expectCompletion(env.defaultTimeoutMillis(), errorMsg); 452 } 453 454 public void expectCompletion(long timeoutMillis, String errorMsg) throws InterruptedException { 455 received.expectCompletion(timeoutMillis, errorMsg); 456 } 457 458 public <E extends Throwable> void expectErrorWithMessage(Class<E> expected, String requiredMessagePart) throws Exception { 459 expectErrorWithMessage(expected, requiredMessagePart, env.defaultTimeoutMillis()); 460 } 461 462 @SuppressWarnings("ThrowableResultOfMethodCallIgnored") 463 public <E extends Throwable> void expectErrorWithMessage(Class<E> expected, String requiredMessagePart, long timeoutMillis) throws Exception { 464 final E err = expectError(expected, timeoutMillis); 465 final String message = err.getMessage(); 466 assertTrue(message.contains(requiredMessagePart), 467 String.format("Got expected exception [%s] but missing message part [%s], was: %s", 468 err.getClass(), requiredMessagePart, err.getMessage())); 469 } 470 471 public <E extends Throwable> E expectError(Class<E> expected) throws Exception { 472 return expectError(expected, env.defaultTimeoutMillis()); 473 } 474 475 public <E extends Throwable> E expectError(Class<E> expected, long timeoutMillis) throws Exception { 476 return expectError(expected, timeoutMillis, String.format("Expected onError(%s)", expected.getName())); 477 } 478 479 public <E extends Throwable> E expectError(Class<E> expected, String errorMsg) throws Exception { 480 return expectError(expected, env.defaultTimeoutMillis(), errorMsg); 481 } 482 483 public <E extends Throwable> E expectError(Class<E> expected, long timeoutMillis, String errorMsg) throws Exception { 484 return received.expectError(expected, timeoutMillis, errorMsg); 485 } 486 487 public void expectNone() throws InterruptedException { 488 expectNone(env.defaultTimeoutMillis()); 489 } 490 491 public void expectNone(String errMsgPrefix) throws InterruptedException { 492 expectNone(env.defaultTimeoutMillis(), errMsgPrefix); 493 } 494 495 public void expectNone(long withinMillis) throws InterruptedException { 496 expectNone(withinMillis, "Did not expect an element but got element"); 497 } 498 499 public void expectNone(long withinMillis, String errMsgPrefix) throws InterruptedException { 500 received.expectNone(withinMillis, errMsgPrefix); 501 } 502 503 } 504 505 public static class ManualSubscriberWithSubscriptionSupport<T> extends ManualSubscriber<T> { 506 507 public ManualSubscriberWithSubscriptionSupport(TestEnvironment env) { 508 super(env); 509 } 510 511 @Override 512 public void onNext(T element) { 513 env.debug(String.format("%s::onNext(%s)", this, element)); 514 if (subscription.isCompleted()) { 515 super.onNext(element); 516 } else { 517 env.flop(String.format("Subscriber::onNext(%s) called before Subscriber::onSubscribe", element)); 518 } 519 } 520 521 @Override 522 public void onComplete() { 523 env.debug(this + "::onComplete()"); 524 if (subscription.isCompleted()) { 525 super.onComplete(); 526 } else { 527 env.flop("Subscriber::onComplete() called before Subscriber::onSubscribe"); 528 } 529 } 530 531 @Override 532 public void onSubscribe(Subscription s) { 533 env.debug(String.format("%s::onSubscribe(%s)", this, s)); 534 if (!subscription.isCompleted()) { 535 subscription.complete(s); 536 } else { 537 env.flop("Subscriber::onSubscribe called on an already-subscribed Subscriber"); 538 } 539 } 540 541 @Override 542 public void onError(Throwable cause) { 543 env.debug(String.format("%s::onError(%s)", this, cause)); 544 if (subscription.isCompleted()) { 545 super.onError(cause); 546 } else { 547 env.flop(cause, String.format("Subscriber::onError(%s) called before Subscriber::onSubscribe", cause)); 548 } 549 } 550 } 551 552 /** 553 * Similar to {@link org.reactivestreams.tck.TestEnvironment.ManualSubscriberWithSubscriptionSupport} 554 * but does not accumulate values signalled via <code>onNext</code>, thus it can not be used to assert 555 * values signalled to this subscriber. Instead it may be used to quickly drain a given publisher. 556 */ 557 public static class BlackholeSubscriberWithSubscriptionSupport<T> 558 extends ManualSubscriberWithSubscriptionSupport<T> { 559 560 public BlackholeSubscriberWithSubscriptionSupport(TestEnvironment env) { 561 super(env); 562 } 563 564 @Override 565 public void onNext(T element) { 566 env.debug(String.format("%s::onNext(%s)", this, element)); 567 if (!subscription.isCompleted()) { 568 env.flop(String.format("Subscriber::onNext(%s) called before Subscriber::onSubscribe", element)); 569 } 570 } 571 572 @Override 573 public T nextElement(long timeoutMillis, String errorMsg) throws InterruptedException { 574 throw new RuntimeException("Can not expect elements from BlackholeSubscriber, use ManualSubscriber instead!"); 575 } 576 577 @Override 578 public List<T> nextElements(long elements, long timeoutMillis, String errorMsg) throws InterruptedException { 579 throw new RuntimeException("Can not expect elements from BlackholeSubscriber, use ManualSubscriber instead!"); 580 } 581 } 582 583 public static class TestSubscriber<T> implements Subscriber<T> { 584 final Promise<Subscription> subscription; 585 586 protected final TestEnvironment env; 587 588 public TestSubscriber(TestEnvironment env) { 589 this.env = env; 590 subscription = new Promise<Subscription>(env); 591 } 592 593 @Override 594 public void onError(Throwable cause) { 595 env.flop(cause, String.format("Unexpected Subscriber::onError(%s)", cause)); 596 } 597 598 @Override 599 public void onComplete() { 600 env.flop("Unexpected Subscriber::onComplete()"); 601 } 602 603 @Override 604 public void onNext(T element) { 605 env.flop(String.format("Unexpected Subscriber::onNext(%s)", element)); 606 } 607 608 @Override 609 public void onSubscribe(Subscription subscription) { 610 env.flop(String.format("Unexpected Subscriber::onSubscribe(%s)", subscription)); 611 } 612 613 public void cancel() { 614 if (subscription.isCompleted()) { 615 subscription.value().cancel(); 616 } else { 617 env.flop("Cannot cancel a subscription before having received it"); 618 } 619 } 620 } 621 622 public static class ManualPublisher<T> implements Publisher<T> { 623 protected final TestEnvironment env; 624 625 protected long pendingDemand = 0L; 626 protected Promise<Subscriber<? super T>> subscriber; 627 628 protected final Receptacle<Long> requests; 629 630 protected final Latch cancelled; 631 632 public ManualPublisher(TestEnvironment env) { 633 this.env = env; 634 requests = new Receptacle<Long>(env); 635 cancelled = new Latch(env); 636 subscriber = new Promise<Subscriber<? super T>>(this.env); 637 } 638 639 @Override 640 public void subscribe(Subscriber<? super T> s) { 641 if (!subscriber.isCompleted()) { 642 subscriber.completeImmediatly(s); 643 644 Subscription subs = new Subscription() { 645 @Override 646 public void request(long elements) { 647 requests.add(elements); 648 } 649 650 @Override 651 public void cancel() { 652 cancelled.close(); 653 } 654 }; 655 s.onSubscribe(subs); 656 657 } else { 658 env.flop("TestPublisher doesn't support more than one Subscriber"); 659 } 660 } 661 662 public void sendNext(T element) { 663 if (subscriber.isCompleted()) { 664 subscriber.value().onNext(element); 665 } else { 666 env.flop("Cannot sendNext before having a Subscriber"); 667 } 668 } 669 670 public void sendCompletion() { 671 if (subscriber.isCompleted()) { 672 subscriber.value().onComplete(); 673 } else { 674 env.flop("Cannot sendCompletion before having a Subscriber"); 675 } 676 } 677 678 public void sendError(Throwable cause) { 679 if (subscriber.isCompleted()) { 680 subscriber.value().onError(cause); 681 } else { 682 env.flop("Cannot sendError before having a Subscriber"); 683 } 684 } 685 686 public long expectRequest() throws InterruptedException { 687 return expectRequest(env.defaultTimeoutMillis()); 688 } 689 690 public long expectRequest(long timeoutMillis) throws InterruptedException { 691 long requested = requests.next(timeoutMillis, "Did not receive expected `request` call"); 692 if (requested <= 0) { 693 return env.<Long>flopAndFail(String.format("Requests cannot be zero or negative but received request(%s)", requested)); 694 } else { 695 pendingDemand += requested; 696 return requested; 697 } 698 } 699 700 public void expectExactRequest(long expected) throws InterruptedException { 701 expectExactRequest(expected, env.defaultTimeoutMillis()); 702 } 703 704 public void expectExactRequest(long expected, long timeoutMillis) throws InterruptedException { 705 long requested = expectRequest(timeoutMillis); 706 if (requested != expected) { 707 env.flop(String.format("Received `request(%d)` on upstream but expected `request(%d)`", requested, expected)); 708 } 709 pendingDemand += requested; 710 } 711 712 public void expectNoRequest() throws InterruptedException { 713 expectNoRequest(env.defaultTimeoutMillis()); 714 } 715 716 public void expectNoRequest(long timeoutMillis) throws InterruptedException { 717 requests.expectNone(timeoutMillis, "Received an unexpected call to: request: "); 718 } 719 720 public void expectCancelling() throws InterruptedException { 721 expectCancelling(env.defaultTimeoutMillis()); 722 } 723 724 public void expectCancelling(long timeoutMillis) throws InterruptedException { 725 cancelled.expectClose(timeoutMillis, "Did not receive expected cancelling of upstream subscription"); 726 } 727 } 728 729 /** 730 * Like a CountDownLatch, but resettable and with some convenience methods 731 */ 732 public static class Latch { 733 private final TestEnvironment env; 734 volatile private CountDownLatch countDownLatch = new CountDownLatch(1); 735 736 public Latch(TestEnvironment env) { 737 this.env = env; 738 } 739 740 public void reOpen() { 741 countDownLatch = new CountDownLatch(1); 742 } 743 744 public boolean isClosed() { 745 return countDownLatch.getCount() == 0; 746 } 747 748 public void close() { 749 countDownLatch.countDown(); 750 } 751 752 public void assertClosed(String openErrorMsg) { 753 if (!isClosed()) { 754 env.flop(new ExpectedClosedLatchException(openErrorMsg)); 755 } 756 } 757 758 public void assertOpen(String closedErrorMsg) { 759 if (isClosed()) { 760 env.flop(new ExpectedOpenLatchException(closedErrorMsg)); 761 } 762 } 763 764 public void expectClose(String notClosedErrorMsg) throws InterruptedException { 765 expectClose(env.defaultTimeoutMillis(), notClosedErrorMsg); 766 } 767 768 public void expectClose(long timeoutMillis, String notClosedErrorMsg) throws InterruptedException { 769 countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS); 770 if (countDownLatch.getCount() > 0) { 771 env.flop(String.format("%s within %d ms", notClosedErrorMsg, timeoutMillis)); 772 } 773 } 774 775 static final class ExpectedOpenLatchException extends RuntimeException { 776 public ExpectedOpenLatchException(String message) { 777 super(message); 778 } 779 } 780 781 static final class ExpectedClosedLatchException extends RuntimeException { 782 public ExpectedClosedLatchException(String message) { 783 super(message); 784 } 785 } 786 787 } 788 789 // simple promise for *one* value, which cannot be reset 790 public static class Promise<T> { 791 private final TestEnvironment env; 792 793 public static <T> Promise<T> completed(TestEnvironment env, T value) { 794 Promise<T> promise = new Promise<T>(env); 795 promise.completeImmediatly(value); 796 return promise; 797 } 798 799 public Promise(TestEnvironment env) { 800 this.env = env; 801 } 802 803 private ArrayBlockingQueue<T> abq = new ArrayBlockingQueue<T>(1); 804 private volatile T _value = null; 805 806 public T value() { 807 if (isCompleted()) { 808 return _value; 809 } else { 810 env.flop("Cannot access promise value before completion"); 811 return null; 812 } 813 } 814 815 public boolean isCompleted() { 816 return _value != null; 817 } 818 819 /** 820 * Allows using expectCompletion to await for completion of the value and complete it _then_ 821 */ 822 public void complete(T value) { 823 abq.add(value); 824 } 825 826 /** 827 * Completes the promise right away, it is not possible to expectCompletion on a Promise completed this way 828 */ 829 public void completeImmediatly(T value) { 830 complete(value); // complete! 831 _value = value; // immediatly! 832 } 833 834 public void expectCompletion(long timeoutMillis, String errorMsg) throws InterruptedException { 835 if (!isCompleted()) { 836 T val = abq.poll(timeoutMillis, TimeUnit.MILLISECONDS); 837 838 if (val == null) { 839 env.flop(String.format("%s within %d ms", errorMsg, timeoutMillis)); 840 } else { 841 _value = val; 842 } 843 } 844 } 845 } 846 847 // a "Promise" for multiple values, which also supports "end-of-stream reached" 848 public static class Receptacle<T> { 849 final int QUEUE_SIZE = 2 * TEST_BUFFER_SIZE; 850 private final TestEnvironment env; 851 852 private final ArrayBlockingQueue<Optional<T>> abq = new ArrayBlockingQueue<Optional<T>>(QUEUE_SIZE); 853 854 private final Latch completedLatch; 855 856 Receptacle(TestEnvironment env) { 857 this.env = env; 858 this.completedLatch = new Latch(env); 859 } 860 861 public void add(T value) { 862 completedLatch.assertOpen(String.format("Unexpected element %s received after stream completed", value)); 863 864 abq.add(Optional.of(value)); 865 } 866 867 public void complete() { 868 completedLatch.assertOpen("Unexpected additional complete signal received!"); 869 completedLatch.close(); 870 871 abq.add(Optional.<T>empty()); 872 } 873 874 public T next(long timeoutMillis, String errorMsg) throws InterruptedException { 875 Optional<T> value = abq.poll(timeoutMillis, TimeUnit.MILLISECONDS); 876 877 if (value == null) { 878 return env.flopAndFail(String.format("%s within %d ms", errorMsg, timeoutMillis)); 879 } else if (value.isDefined()) { 880 return value.get(); 881 } else { 882 return env.flopAndFail("Expected element but got end-of-stream"); 883 } 884 } 885 886 public Optional<T> nextOrEndOfStream(long timeoutMillis, String errorMsg) throws InterruptedException { 887 Optional<T> value = abq.poll(timeoutMillis, TimeUnit.MILLISECONDS); 888 889 if (value == null) { 890 env.flop(String.format("%s within %d ms", errorMsg, timeoutMillis)); 891 return Optional.empty(); 892 } 893 894 return value; 895 } 896 897 /** 898 * @param timeoutMillis total timeout time for awaiting all {@code elements} number of elements 899 */ 900 public List<T> nextN(long elements, long timeoutMillis, String errorMsg) throws InterruptedException { 901 List<T> result = new LinkedList<T>(); 902 long remaining = elements; 903 long deadline = System.currentTimeMillis() + timeoutMillis; 904 while (remaining > 0) { 905 long remainingMillis = deadline - System.currentTimeMillis(); 906 907 result.add(next(remainingMillis, errorMsg)); 908 remaining--; 909 } 910 911 return result; 912 } 913 914 915 public void expectCompletion(long timeoutMillis, String errorMsg) throws InterruptedException { 916 Optional<T> value = abq.poll(timeoutMillis, TimeUnit.MILLISECONDS); 917 918 if (value == null) { 919 env.flop(String.format("%s within %d ms", errorMsg, timeoutMillis)); 920 } else if (value.isDefined()) { 921 env.flop(String.format("Expected end-of-stream but got element [%s]", value.get())); 922 } // else, ok 923 } 924 925 @SuppressWarnings("unchecked") 926 public <E extends Throwable> E expectError(Class<E> clazz, long timeoutMillis, String errorMsg) throws Exception { 927 Thread.sleep(timeoutMillis); 928 929 if (env.asyncErrors.isEmpty()) { 930 return env.flopAndFail(String.format("%s within %d ms", errorMsg, timeoutMillis)); 931 } else { 932 // ok, there was an expected error 933 Throwable thrown = env.asyncErrors.remove(0); 934 935 if (clazz.isInstance(thrown)) { 936 return (E) thrown; 937 } else { 938 939 return env.flopAndFail(String.format("%s within %d ms; Got %s but expected %s", 940 errorMsg, timeoutMillis, thrown.getClass().getCanonicalName(), clazz.getCanonicalName())); 941 } 942 } 943 } 944 945 public void expectNone(long withinMillis, String errorMsgPrefix) throws InterruptedException { 946 Thread.sleep(withinMillis); 947 Optional<T> value = abq.poll(); 948 949 if (value == null) { 950 // ok 951 } else if (value.isDefined()) { 952 env.flop(String.format("%s [%s]", errorMsgPrefix, value.get())); 953 } else { 954 env.flop("Expected no element but got end-of-stream"); 955 } 956 } 957 } 958} 959