001package org.reactivestreams.tck; 002 003import org.reactivestreams.Publisher; 004import org.reactivestreams.Subscriber; 005import org.reactivestreams.Subscription; 006import org.reactivestreams.tck.TestEnvironment.*; 007import org.reactivestreams.tck.support.Optional; 008import org.reactivestreams.tck.support.SubscriberWhiteboxVerificationRules; 009import org.reactivestreams.tck.support.TestException; 010import org.testng.SkipException; 011import org.testng.annotations.AfterClass; 012import org.testng.annotations.BeforeClass; 013import org.testng.annotations.BeforeMethod; 014import org.testng.annotations.Test; 015 016import java.util.concurrent.ExecutorService; 017import java.util.concurrent.Executors; 018 019import static org.testng.Assert.assertTrue; 020 021/** 022 * Provides tests for verifying {@link org.reactivestreams.Subscriber} and {@link org.reactivestreams.Subscription} specification rules. 023 * 024 * @see org.reactivestreams.Subscriber 025 * @see org.reactivestreams.Subscription 026 */ 027public abstract class SubscriberWhiteboxVerification<T> extends WithHelperPublisher<T> 028 implements SubscriberWhiteboxVerificationRules { 029 030 private final TestEnvironment env; 031 032 protected SubscriberWhiteboxVerification(TestEnvironment env) { 033 this.env = env; 034 } 035 036 // USER API 037 038 /** 039 * This is the main method you must implement in your test incarnation. 040 * It must create a new {@link org.reactivestreams.Subscriber} instance to be subjected to the testing logic. 041 * 042 * In order to be meaningfully testable your Subscriber must inform the given 043 * `WhiteboxSubscriberProbe` of the respective events having been received. 044 */ 045 public abstract Subscriber<T> createSubscriber(WhiteboxSubscriberProbe<T> probe); 046 047 // ENV SETUP 048 049 /** 050 * Executor service used by the default provided asynchronous Publisher. 051 * @see #createHelperPublisher(long) 052 */ 053 private ExecutorService publisherExecutor; 054 @BeforeClass public void startPublisherExecutorService() { publisherExecutor = Executors.newFixedThreadPool(4); } 055 @AfterClass public void shutdownPublisherExecutorService() { if (publisherExecutor != null) publisherExecutor.shutdown(); } 056 @Override public ExecutorService publisherExecutorService() { return publisherExecutor; } 057 058 ////////////////////// TEST ENV CLEANUP ///////////////////////////////////// 059 060 @BeforeMethod 061 public void setUp() throws Exception { 062 env.clearAsyncErrors(); 063 } 064 065 ////////////////////// TEST SETUP VERIFICATION ////////////////////////////// 066 067 @Test 068 public void required_exerciseWhiteboxHappyPath() throws Throwable { 069 subscriberTest(new TestStageTestRun() { 070 @Override 071 public void run(WhiteboxTestStage stage) throws InterruptedException { 072 stage.puppet().triggerRequest(1); 073 stage.puppet().triggerRequest(1); 074 075 long receivedRequests = stage.expectRequest(); 076 077 stage.signalNext(); 078 stage.probe.expectNext(stage.lastT); 079 080 stage.puppet().triggerRequest(1); 081 if (receivedRequests == 1) { 082 stage.expectRequest(); 083 } 084 085 stage.signalNext(); 086 stage.probe.expectNext(stage.lastT); 087 088 stage.puppet().signalCancel(); 089 stage.expectCancelling(); 090 091 stage.verifyNoAsyncErrors(); 092 } 093 }); 094 } 095 096 ////////////////////// SPEC RULE VERIFICATION /////////////////////////////// 097 098 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.1 099 @Override @Test 100 public void required_spec201_mustSignalDemandViaSubscriptionRequest() throws Throwable { 101 subscriberTest(new TestStageTestRun() { 102 @Override 103 public void run(WhiteboxTestStage stage) throws InterruptedException { 104 stage.puppet().triggerRequest(1); 105 stage.expectRequest(); 106 107 stage.signalNext(); 108 } 109 }); 110 } 111 112 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.2 113 @Override @Test 114 public void untested_spec202_shouldAsynchronouslyDispatch() throws Exception { 115 notVerified(); // cannot be meaningfully tested, or can it? 116 } 117 118 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.3 119 @Override @Test 120 public void required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnComplete() throws Throwable { 121 subscriberTestWithoutSetup(new TestStageTestRun() { 122 @Override 123 public void run(WhiteboxTestStage stage) throws Throwable { 124 final Subscription subs = new Subscription() { 125 @Override 126 public void request(long n) { 127 final Optional<StackTraceElement> onCompleteStackTraceElement = env.findCallerMethodInStackTrace("onComplete"); 128 if (onCompleteStackTraceElement.isDefined()) { 129 final StackTraceElement stackElem = onCompleteStackTraceElement.get(); 130 env.flop(String.format("Subscription::request MUST NOT be called from Subscriber::onComplete (Rule 2.3)! (Caller: %s::%s line %d)", 131 stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber())); 132 } 133 } 134 135 @Override 136 public void cancel() { 137 final Optional<StackTraceElement> onCompleteStackElement = env.findCallerMethodInStackTrace("onComplete"); 138 if (onCompleteStackElement.isDefined()) { 139 final StackTraceElement stackElem = onCompleteStackElement.get(); 140 env.flop(String.format("Subscription::cancel MUST NOT be called from Subscriber::onComplete (Rule 2.3)! (Caller: %s::%s line %d)", 141 stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber())); 142 } 143 } 144 }; 145 146 stage.probe = stage.createWhiteboxSubscriberProbe(env); 147 final Subscriber<T> sub = createSubscriber(stage.probe); 148 149 sub.onSubscribe(subs); 150 sub.onComplete(); 151 152 env.verifyNoAsyncErrorsNoDelay(); 153 } 154 }); 155 } 156 157 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.3 158 @Override @Test 159 public void required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnError() throws Throwable { 160 subscriberTestWithoutSetup(new TestStageTestRun() { 161 @Override 162 public void run(WhiteboxTestStage stage) throws Throwable { 163 final Subscription subs = new Subscription() { 164 @Override 165 public void request(long n) { 166 Throwable thr = new Throwable(); 167 for (StackTraceElement stackElem : thr.getStackTrace()) { 168 if (stackElem.getMethodName().equals("onError")) { 169 env.flop(String.format("Subscription::request MUST NOT be called from Subscriber::onError (Rule 2.3)! (Caller: %s::%s line %d)", 170 stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber())); 171 } 172 } 173 } 174 175 @Override 176 public void cancel() { 177 Throwable thr = new Throwable(); 178 for (StackTraceElement stackElem : thr.getStackTrace()) { 179 if (stackElem.getMethodName().equals("onError")) { 180 env.flop(String.format("Subscription::cancel MUST NOT be called from Subscriber::onError (Rule 2.3)! (Caller: %s::%s line %d)", 181 stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber())); 182 } 183 } 184 } 185 }; 186 187 stage.probe = stage.createWhiteboxSubscriberProbe(env); 188 final Subscriber<T> sub = createSubscriber(stage.probe); 189 190 sub.onSubscribe(subs); 191 sub.onError(new TestException()); 192 193 env.verifyNoAsyncErrorsNoDelay(); 194 } 195 }); 196 } 197 198 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.4 199 @Override @Test 200 public void untested_spec204_mustConsiderTheSubscriptionAsCancelledInAfterRecievingOnCompleteOrOnError() throws Exception { 201 notVerified(); // cannot be meaningfully tested, or can it? 202 } 203 204 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.5 205 @Override @Test 206 public void required_spec205_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal() throws Throwable { 207 subscriberTest(new TestStageTestRun() { 208 @Override 209 public void run(WhiteboxTestStage stage) throws Throwable { 210 // try to subscribe another time, if the subscriber calls `probe.registerOnSubscribe` the test will fail 211 final Latch secondSubscriptionCancelled = new Latch(env); 212 final Subscriber<? super T> sub = stage.sub(); 213 final Subscription subscription = new Subscription() { 214 @Override 215 public void request(long elements) { 216 // ignore... 217 } 218 219 @Override 220 public void cancel() { 221 secondSubscriptionCancelled.close(); 222 } 223 224 @Override 225 public String toString() { 226 return "SecondSubscription(should get cancelled)"; 227 } 228 }; 229 sub.onSubscribe(subscription); 230 231 secondSubscriptionCancelled.expectClose("Expected 2nd Subscription given to subscriber to be cancelled, but `Subscription.cancel()` was not called"); 232 env.verifyNoAsyncErrors(); 233 } 234 }); 235 } 236 237 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.6 238 @Override @Test 239 public void untested_spec206_mustCallSubscriptionCancelIfItIsNoLongerValid() throws Exception { 240 notVerified(); // cannot be meaningfully tested, or can it? 241 } 242 243 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.7 244 @Override @Test 245 public void untested_spec207_mustEnsureAllCallsOnItsSubscriptionTakePlaceFromTheSameThreadOrTakeCareOfSynchronization() throws Exception { 246 notVerified(); // cannot be meaningfully tested, or can it? 247 // the same thread part of the clause can be verified but that is not very useful, or is it? 248 } 249 250 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.8 251 @Override @Test 252 public void required_spec208_mustBePreparedToReceiveOnNextSignalsAfterHavingCalledSubscriptionCancel() throws Throwable { 253 subscriberTest(new TestStageTestRun() { 254 @Override 255 public void run(WhiteboxTestStage stage) throws InterruptedException { 256 stage.puppet().triggerRequest(1); 257 stage.puppet().signalCancel(); 258 stage.signalNext(); 259 260 stage.puppet().triggerRequest(1); 261 stage.puppet().triggerRequest(1); 262 263 stage.verifyNoAsyncErrors(); 264 } 265 }); 266 } 267 268 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.9 269 @Override @Test 270 public void required_spec209_mustBePreparedToReceiveAnOnCompleteSignalWithPrecedingRequestCall() throws Throwable { 271 subscriberTest(new TestStageTestRun() { 272 @Override 273 public void run(WhiteboxTestStage stage) throws InterruptedException { 274 stage.puppet().triggerRequest(1); 275 stage.sendCompletion(); 276 stage.probe.expectCompletion(); 277 278 stage.verifyNoAsyncErrors(); 279 } 280 }); 281 } 282 283 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.9 284 @Override @Test 285 public void required_spec209_mustBePreparedToReceiveAnOnCompleteSignalWithoutPrecedingRequestCall() throws Throwable { 286 subscriberTest(new TestStageTestRun() { 287 @Override 288 public void run(WhiteboxTestStage stage) throws InterruptedException { 289 stage.sendCompletion(); 290 stage.probe.expectCompletion(); 291 292 stage.verifyNoAsyncErrors(); 293 } 294 }); 295 } 296 297 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.10 298 @Override @Test 299 public void required_spec210_mustBePreparedToReceiveAnOnErrorSignalWithPrecedingRequestCall() throws Throwable { 300 subscriberTest(new TestStageTestRun() { 301 @Override 302 public void run(WhiteboxTestStage stage) throws InterruptedException { 303 stage.puppet().triggerRequest(1); 304 stage.puppet().triggerRequest(1); 305 306 Exception ex = new TestException(); 307 stage.sendError(ex); 308 stage.probe.expectError(ex); 309 310 env.verifyNoAsyncErrorsNoDelay(); 311 } 312 }); 313 } 314 315 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.10 316 @Override @Test 317 public void required_spec210_mustBePreparedToReceiveAnOnErrorSignalWithoutPrecedingRequestCall() throws Throwable { 318 subscriberTest(new TestStageTestRun() { 319 @Override 320 public void run(WhiteboxTestStage stage) throws InterruptedException { 321 Exception ex = new TestException(); 322 stage.sendError(ex); 323 stage.probe.expectError(ex); 324 325 env.verifyNoAsyncErrorsNoDelay(); 326 } 327 }); 328 } 329 330 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.11 331 @Override @Test 332 public void untested_spec211_mustMakeSureThatAllCallsOnItsMethodsHappenBeforeTheProcessingOfTheRespectiveEvents() throws Exception { 333 notVerified(); // cannot be meaningfully tested, or can it? 334 } 335 336 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.12 337 @Override @Test 338 public void untested_spec212_mustNotCallOnSubscribeMoreThanOnceBasedOnObjectEquality_specViolation() throws Throwable { 339 notVerified(); // cannot be meaningfully tested, or can it? 340 } 341 342 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.13 343 @Override @Test 344 public void untested_spec213_failingOnSignalInvocation() throws Exception { 345 notVerified(); // cannot be meaningfully tested, or can it? 346 } 347 348 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.13 349 @Override @Test 350 public void required_spec213_onSubscribe_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable { 351 subscriberTest(new TestStageTestRun() { 352 @Override 353 public void run(WhiteboxTestStage stage) throws Throwable { 354 355 final Subscriber<? super T> sub = stage.sub(); 356 boolean gotNPE = false; 357 try { 358 sub.onSubscribe(null); 359 } catch (final NullPointerException expected) { 360 gotNPE = true; 361 } 362 363 assertTrue(gotNPE, "onSubscribe(null) did not throw NullPointerException"); 364 env.verifyNoAsyncErrorsNoDelay(); 365 } 366 }); 367 } 368 369 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.13 370 @Override @Test 371 public void required_spec213_onNext_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable { 372 subscriberTest(new TestStageTestRun() { 373 @Override 374 public void run(WhiteboxTestStage stage) throws Throwable { 375 376 final Subscriber<? super T> sub = stage.sub(); 377 boolean gotNPE = false; 378 try { 379 sub.onNext(null); 380 } catch (final NullPointerException expected) { 381 gotNPE = true; 382 } 383 384 assertTrue(gotNPE, "onNext(null) did not throw NullPointerException"); 385 env.verifyNoAsyncErrorsNoDelay(); 386 } 387 }); 388 } 389 390 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.13 391 @Override @Test 392 public void required_spec213_onError_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable { 393 subscriberTest(new TestStageTestRun() { 394 @Override 395 public void run(WhiteboxTestStage stage) throws Throwable { 396 397 final Subscriber<? super T> sub = stage.sub(); 398 boolean gotNPE = false; 399 try { 400 sub.onError(null); 401 } catch (final NullPointerException expected) { 402 gotNPE = true; 403 } finally { 404 assertTrue(gotNPE, "onError(null) did not throw NullPointerException"); 405 } 406 407 env.verifyNoAsyncErrorsNoDelay(); 408 } 409 }); 410 } 411 412 413 ////////////////////// SUBSCRIPTION SPEC RULE VERIFICATION ////////////////// 414 415 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.1 416 @Override @Test 417 public void untested_spec301_mustNotBeCalledOutsideSubscriberContext() throws Exception { 418 notVerified(); // cannot be meaningfully tested, or can it? 419 } 420 421 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.8 422 @Override @Test 423 public void required_spec308_requestMustRegisterGivenNumberElementsToBeProduced() throws Throwable { 424 subscriberTest(new TestStageTestRun() { 425 @Override 426 public void run(WhiteboxTestStage stage) throws InterruptedException { 427 stage.puppet().triggerRequest(2); 428 stage.probe.expectNext(stage.signalNext()); 429 stage.probe.expectNext(stage.signalNext()); 430 431 stage.probe.expectNone(); 432 stage.puppet().triggerRequest(3); 433 434 stage.verifyNoAsyncErrors(); 435 } 436 }); 437 } 438 439 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.10 440 @Override @Test 441 public void untested_spec310_requestMaySynchronouslyCallOnNextOnSubscriber() throws Exception { 442 notVerified(); // cannot be meaningfully tested, or can it? 443 } 444 445 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.11 446 @Override @Test 447 public void untested_spec311_requestMaySynchronouslyCallOnCompleteOrOnError() throws Exception { 448 notVerified(); // cannot be meaningfully tested, or can it? 449 } 450 451 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.14 452 @Override @Test 453 public void untested_spec314_cancelMayCauseThePublisherToShutdownIfNoOtherSubscriptionExists() throws Exception { 454 notVerified(); // cannot be meaningfully tested, or can it? 455 } 456 457 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.15 458 @Override @Test 459 public void untested_spec315_cancelMustNotThrowExceptionAndMustSignalOnError() throws Exception { 460 notVerified(); // cannot be meaningfully tested, or can it? 461 } 462 463 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.16 464 @Override @Test 465 public void untested_spec316_requestMustNotThrowExceptionAndMustOnErrorTheSubscriber() throws Exception { 466 notVerified(); // cannot be meaningfully tested, or can it? 467 } 468 469 /////////////////////// ADDITIONAL "COROLLARY" TESTS //////////////////////// 470 471 /////////////////////// TEST INFRASTRUCTURE ///////////////////////////////// 472 473 abstract class TestStageTestRun { 474 public abstract void run(WhiteboxTestStage stage) throws Throwable; 475 } 476 477 /** 478 * Prepares subscriber and publisher pair (by subscribing the first to the latter), 479 * and then hands over the tests {@link WhiteboxTestStage} over to the test. 480 * 481 * The test stage is, like in a puppet show, used to orchestrate what each participant should do. 482 * Since this is a whitebox test, this allows the stage to completely control when and how to signal / expect signals. 483 */ 484 public void subscriberTest(TestStageTestRun body) throws Throwable { 485 WhiteboxTestStage stage = new WhiteboxTestStage(env, true); 486 body.run(stage); 487 } 488 489 /** 490 * Provides a {@link WhiteboxTestStage} without performing any additional setup, 491 * like the {@link #subscriberTest(SubscriberWhiteboxVerification.TestStageTestRun)} would. 492 * 493 * Use this method to write tests in which you need full control over when and how the initial {@code subscribe} is signalled. 494 */ 495 public void subscriberTestWithoutSetup(TestStageTestRun body) throws Throwable { 496 WhiteboxTestStage stage = new WhiteboxTestStage(env, false); 497 body.run(stage); 498 } 499 500 /** 501 * Test for feature that MAY be implemented. This test will be marked as SKIPPED if it fails. 502 */ 503 public void optionalSubscriberTestWithoutSetup(TestStageTestRun body) throws Throwable { 504 try { 505 subscriberTestWithoutSetup(body); 506 } catch (Exception ex) { 507 notVerified("Skipped because tested publisher does NOT implement this OPTIONAL requirement."); 508 } 509 } 510 511 public class WhiteboxTestStage extends ManualPublisher<T> { 512 public Publisher<T> pub; 513 public ManualSubscriber<T> tees; // gives us access to a stream T values 514 public WhiteboxSubscriberProbe<T> probe; 515 516 public T lastT = null; 517 518 public WhiteboxTestStage(TestEnvironment env) throws InterruptedException { 519 this(env, true); 520 } 521 522 public WhiteboxTestStage(TestEnvironment env, boolean runDefaultInit) throws InterruptedException { 523 super(env); 524 if (runDefaultInit) { 525 pub = this.createHelperPublisher(Long.MAX_VALUE); 526 tees = env.newManualSubscriber(pub); 527 probe = new WhiteboxSubscriberProbe<T>(env, subscriber); 528 subscribe(createSubscriber(probe)); 529 probe.puppet.expectCompletion(env.defaultTimeoutMillis(), String.format("Subscriber %s did not `registerOnSubscribe`", sub())); 530 } 531 } 532 533 public Subscriber<? super T> sub() { 534 return subscriber.value(); 535 } 536 537 public SubscriberPuppet puppet() { 538 return probe.puppet(); 539 } 540 541 public WhiteboxSubscriberProbe<T> probe() { 542 return probe; 543 } 544 545 public Publisher<T> createHelperPublisher(long elements) { 546 return SubscriberWhiteboxVerification.this.createHelperPublisher(elements); 547 } 548 549 public WhiteboxSubscriberProbe<T> createWhiteboxSubscriberProbe(TestEnvironment env) { 550 return new WhiteboxSubscriberProbe<T>(env, subscriber); 551 } 552 553 public T signalNext() throws InterruptedException { 554 return signalNext(nextT()); 555 } 556 557 private T signalNext(T element) throws InterruptedException { 558 sendNext(element); 559 return element; 560 } 561 562 public T nextT() throws InterruptedException { 563 lastT = tees.requestNextElement(); 564 return lastT; 565 } 566 567 public void verifyNoAsyncErrors() { 568 env.verifyNoAsyncErrors(); 569 } 570 } 571 572 /** 573 * This class is intented to be used as {@code Subscriber} decorator and should be used in {@code pub.subscriber(...)} calls, 574 * in order to allow intercepting calls on the underlying {@code Subscriber}. 575 * This delegation allows the proxy to implement {@link BlackboxProbe} assertions. 576 */ 577 public static class BlackboxSubscriberProxy<T> extends BlackboxProbe<T> implements Subscriber<T> { 578 579 public BlackboxSubscriberProxy(TestEnvironment env, Subscriber<T> subscriber) { 580 super(env, Promise.<Subscriber<? super T>>completed(env, subscriber)); 581 } 582 583 @Override 584 public void onSubscribe(Subscription s) { 585 sub().onSubscribe(s); 586 } 587 588 @Override 589 public void onNext(T t) { 590 registerOnNext(t); 591 sub().onNext(t); 592 } 593 594 @Override 595 public void onError(Throwable cause) { 596 registerOnError(cause); 597 sub().onError(cause); 598 } 599 600 @Override 601 public void onComplete() { 602 registerOnComplete(); 603 sub().onComplete(); 604 } 605 } 606 607 public static class BlackboxProbe<T> implements SubscriberProbe<T> { 608 protected final TestEnvironment env; 609 protected final Promise<Subscriber<? super T>> subscriber; 610 611 protected final Receptacle<T> elements; 612 protected final Promise<Throwable> error; 613 614 public BlackboxProbe(TestEnvironment env, Promise<Subscriber<? super T>> subscriber) { 615 this.env = env; 616 this.subscriber = subscriber; 617 elements = new Receptacle<T>(env); 618 error = new Promise<Throwable>(env); 619 } 620 621 @Override 622 public void registerOnNext(T element) { 623 elements.add(element); 624 } 625 626 @Override 627 public void registerOnComplete() { 628 try { 629 elements.complete(); 630 } catch (IllegalStateException ex) { 631 // "Queue full", onComplete was already called 632 env.flop("subscriber::onComplete was called a second time, which is illegal according to Rule 1.7"); 633 } 634 } 635 636 @Override 637 public void registerOnError(Throwable cause) { 638 try { 639 error.complete(cause); 640 } catch (IllegalStateException ex) { 641 // "Queue full", onError was already called 642 env.flop("subscriber::onError was called a second time, which is illegal according to Rule 1.7"); 643 } 644 } 645 646 public T expectNext() throws InterruptedException { 647 return elements.next(env.defaultTimeoutMillis(), String.format("Subscriber %s did not call `registerOnNext(_)`", sub())); 648 } 649 650 public void expectNext(T expected) throws InterruptedException { 651 expectNext(expected, env.defaultTimeoutMillis()); 652 } 653 654 public void expectNext(T expected, long timeoutMillis) throws InterruptedException { 655 T received = elements.next(timeoutMillis, String.format("Subscriber %s did not call `registerOnNext(%s)`", sub(), expected)); 656 if (!received.equals(expected)) { 657 env.flop(String.format("Subscriber %s called `registerOnNext(%s)` rather than `registerOnNext(%s)`", sub(), received, expected)); 658 } 659 } 660 661 public Subscriber<? super T> sub() { 662 return subscriber.value(); 663 } 664 665 public void expectCompletion() throws InterruptedException { 666 expectCompletion(env.defaultTimeoutMillis()); 667 } 668 669 public void expectCompletion(long timeoutMillis) throws InterruptedException { 670 expectCompletion(timeoutMillis, String.format("Subscriber %s did not call `registerOnComplete()`", sub())); 671 } 672 673 public void expectCompletion(long timeoutMillis, String msg) throws InterruptedException { 674 elements.expectCompletion(timeoutMillis, msg); 675 } 676 677 @SuppressWarnings("ThrowableResultOfMethodCallIgnored") 678 public <E extends Throwable> void expectErrorWithMessage(Class<E> expected, String requiredMessagePart) throws InterruptedException { 679 final E err = expectError(expected); 680 String message = err.getMessage(); 681 assertTrue(message.contains(requiredMessagePart), 682 String.format("Got expected exception %s but missing message [%s], was: %s", err.getClass(), requiredMessagePart, expected)); 683 } 684 685 public <E extends Throwable> E expectError(Class<E> expected) throws InterruptedException { 686 return expectError(expected, env.defaultTimeoutMillis()); 687 } 688 689 @SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"}) 690 public <E extends Throwable> E expectError(Class<E> expected, long timeoutMillis) throws InterruptedException { 691 error.expectCompletion(timeoutMillis, String.format("Subscriber %s did not call `registerOnError(%s)`", sub(), expected)); 692 if (error.value() == null) { 693 return env.flopAndFail(String.format("Subscriber %s did not call `registerOnError(%s)`", sub(), expected)); 694 } else if (expected.isInstance(error.value())) { 695 return (E) error.value(); 696 } else { 697 return env.flopAndFail(String.format("Subscriber %s called `registerOnError(%s)` rather than `registerOnError(%s)`", sub(), error.value(), expected)); 698 } 699 } 700 701 public void expectError(Throwable expected) throws InterruptedException { 702 expectError(expected, env.defaultTimeoutMillis()); 703 } 704 705 @SuppressWarnings("ThrowableResultOfMethodCallIgnored") 706 public void expectError(Throwable expected, long timeoutMillis) throws InterruptedException { 707 error.expectCompletion(timeoutMillis, String.format("Subscriber %s did not call `registerOnError(%s)`", sub(), expected)); 708 if (error.value() != expected) { 709 env.flop(String.format("Subscriber %s called `registerOnError(%s)` rather than `registerOnError(%s)`", sub(), error.value(), expected)); 710 } 711 } 712 713 public void expectNone() throws InterruptedException { 714 expectNone(env.defaultTimeoutMillis()); 715 } 716 717 public void expectNone(long withinMillis) throws InterruptedException { 718 elements.expectNone(withinMillis, "Expected nothing"); 719 } 720 721 } 722 723 public static class WhiteboxSubscriberProbe<T> extends BlackboxProbe<T> implements SubscriberPuppeteer { 724 protected Promise<SubscriberPuppet> puppet; 725 726 public WhiteboxSubscriberProbe(TestEnvironment env, Promise<Subscriber<? super T>> subscriber) { 727 super(env, subscriber); 728 puppet = new Promise<SubscriberPuppet>(env); 729 } 730 731 private SubscriberPuppet puppet() { 732 return puppet.value(); 733 } 734 735 @Override 736 public void registerOnSubscribe(SubscriberPuppet p) { 737 if (!puppet.isCompleted()) { 738 puppet.complete(p); 739 } 740 } 741 742 } 743 744 public interface SubscriberPuppeteer { 745 746 /** 747 * Must be called by the test subscriber when it has successfully registered a subscription 748 * inside the `onSubscribe` method. 749 */ 750 void registerOnSubscribe(SubscriberPuppet puppet); 751 } 752 753 public interface SubscriberProbe<T> { 754 755 /** 756 * Must be called by the test subscriber when it has received an`onNext` event. 757 */ 758 void registerOnNext(T element); 759 760 /** 761 * Must be called by the test subscriber when it has received an `onComplete` event. 762 */ 763 void registerOnComplete(); 764 765 /** 766 * Must be called by the test subscriber when it has received an `onError` event. 767 */ 768 void registerOnError(Throwable cause); 769 770 } 771 772 public interface SubscriberPuppet { 773 void triggerRequest(long elements); 774 775 void signalCancel(); 776 } 777 778 public void notVerified() { 779 throw new SkipException("Not verified using this TCK."); 780 } 781 782 public void notVerified(String msg) { 783 throw new SkipException(msg); 784 } 785}