001/************************************************************************ 002 * Licensed under Public Domain (CC0) * 003 * * 004 * To the extent possible under law, the person who associated CC0 with * 005 * this code has waived all copyright and related or neighboring * 006 * rights to this code. * 007 * * 008 * You should have received a copy of the CC0 legalcode along with this * 009 * work. If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.* 010 ************************************************************************/ 011 012package org.reactivestreams.tck; 013 014import org.reactivestreams.Publisher; 015import org.reactivestreams.Subscriber; 016import org.reactivestreams.Subscription; 017import org.reactivestreams.tck.TestEnvironment.*; 018import org.reactivestreams.tck.flow.support.Optional; 019import org.reactivestreams.tck.flow.support.SubscriberWhiteboxVerificationRules; 020import org.reactivestreams.tck.flow.support.TestException; 021import org.testng.SkipException; 022import org.testng.annotations.AfterClass; 023import org.testng.annotations.BeforeClass; 024import org.testng.annotations.BeforeMethod; 025import org.testng.annotations.Test; 026 027import java.util.concurrent.ExecutorService; 028import java.util.concurrent.Executors; 029 030import static org.testng.Assert.assertTrue; 031 032/** 033 * Provides whitebox style tests for verifying {@link org.reactivestreams.Subscriber} 034 * and {@link org.reactivestreams.Subscription} specification rules. 035 * 036 * @see org.reactivestreams.Subscriber 037 * @see org.reactivestreams.Subscription 038 */ 039public abstract class SubscriberWhiteboxVerification<T> extends WithHelperPublisher<T> 040 implements SubscriberWhiteboxVerificationRules { 041 042 private final TestEnvironment env; 043 044 protected SubscriberWhiteboxVerification(TestEnvironment env) { 045 this.env = env; 046 } 047 048 // USER API 049 050 /** 051 * This is the main method you must implement in your test incarnation. 052 * It must create a new {@link org.reactivestreams.Subscriber} instance to be subjected to the testing logic. 053 * 054 * In order to be meaningfully testable your Subscriber must inform the given 055 * `WhiteboxSubscriberProbe` of the respective events having been received. 056 */ 057 public abstract Subscriber<T> createSubscriber(WhiteboxSubscriberProbe<T> probe); 058 059 // ENV SETUP 060 061 /** 062 * Executor service used by the default provided asynchronous Publisher. 063 * @see #createHelperPublisher(long) 064 */ 065 private ExecutorService publisherExecutor; 066 @BeforeClass public void startPublisherExecutorService() { publisherExecutor = Executors.newFixedThreadPool(4); } 067 @AfterClass public void shutdownPublisherExecutorService() { if (publisherExecutor != null) publisherExecutor.shutdown(); } 068 @Override public ExecutorService publisherExecutorService() { return publisherExecutor; } 069 070 ////////////////////// TEST ENV CLEANUP ///////////////////////////////////// 071 072 @BeforeMethod 073 public void setUp() throws Exception { 074 env.clearAsyncErrors(); 075 } 076 077 ////////////////////// TEST SETUP VERIFICATION ////////////////////////////// 078 079 @Test 080 public void required_exerciseWhiteboxHappyPath() throws Throwable { 081 subscriberTest(new TestStageTestRun() { 082 @Override 083 public void run(WhiteboxTestStage stage) throws InterruptedException { 084 stage.puppet().triggerRequest(1); 085 stage.puppet().triggerRequest(1); 086 087 long receivedRequests = stage.expectRequest(); 088 089 stage.signalNext(); 090 stage.probe.expectNext(stage.lastT); 091 092 stage.puppet().triggerRequest(1); 093 if (receivedRequests == 1) { 094 stage.expectRequest(); 095 } 096 097 stage.signalNext(); 098 stage.probe.expectNext(stage.lastT); 099 100 stage.puppet().signalCancel(); 101 stage.expectCancelling(); 102 103 stage.verifyNoAsyncErrors(); 104 } 105 }); 106 } 107 108 ////////////////////// SPEC RULE VERIFICATION /////////////////////////////// 109 110 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.1 111 @Override @Test 112 public void required_spec201_mustSignalDemandViaSubscriptionRequest() throws Throwable { 113 subscriberTest(new TestStageTestRun() { 114 @Override 115 public void run(WhiteboxTestStage stage) throws InterruptedException { 116 stage.puppet().triggerRequest(1); 117 stage.expectRequest(); 118 119 stage.signalNext(); 120 } 121 }); 122 } 123 124 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.2 125 @Override @Test 126 public void untested_spec202_shouldAsynchronouslyDispatch() throws Exception { 127 notVerified(); // cannot be meaningfully tested, or can it? 128 } 129 130 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.3 131 @Override @Test 132 public void required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnComplete() throws Throwable { 133 subscriberTestWithoutSetup(new TestStageTestRun() { 134 @Override 135 public void run(WhiteboxTestStage stage) throws Throwable { 136 final Subscription subs = new Subscription() { 137 @Override 138 public void request(long n) { 139 final Optional<StackTraceElement> onCompleteStackTraceElement = env.findCallerMethodInStackTrace("onComplete"); 140 if (onCompleteStackTraceElement.isDefined()) { 141 final StackTraceElement stackElem = onCompleteStackTraceElement.get(); 142 env.flop(String.format("Subscription::request MUST NOT be called from Subscriber::onComplete (Rule 2.3)! (Caller: %s::%s line %d)", 143 stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber())); 144 } 145 } 146 147 @Override 148 public void cancel() { 149 final Optional<StackTraceElement> onCompleteStackElement = env.findCallerMethodInStackTrace("onComplete"); 150 if (onCompleteStackElement.isDefined()) { 151 final StackTraceElement stackElem = onCompleteStackElement.get(); 152 env.flop(String.format("Subscription::cancel MUST NOT be called from Subscriber::onComplete (Rule 2.3)! (Caller: %s::%s line %d)", 153 stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber())); 154 } 155 } 156 }; 157 158 stage.probe = stage.createWhiteboxSubscriberProbe(env); 159 final Subscriber<T> sub = createSubscriber(stage.probe); 160 161 sub.onSubscribe(subs); 162 sub.onComplete(); 163 164 env.verifyNoAsyncErrorsNoDelay(); 165 } 166 }); 167 } 168 169 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.3 170 @Override @Test 171 public void required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnError() throws Throwable { 172 subscriberTestWithoutSetup(new TestStageTestRun() { 173 @Override 174 public void run(WhiteboxTestStage stage) throws Throwable { 175 final Subscription subs = new Subscription() { 176 @Override 177 public void request(long n) { 178 Throwable thr = new Throwable(); 179 for (StackTraceElement stackElem : thr.getStackTrace()) { 180 if (stackElem.getMethodName().equals("onError")) { 181 env.flop(String.format("Subscription::request MUST NOT be called from Subscriber::onError (Rule 2.3)! (Caller: %s::%s line %d)", 182 stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber())); 183 } 184 } 185 } 186 187 @Override 188 public void cancel() { 189 Throwable thr = new Throwable(); 190 for (StackTraceElement stackElem : thr.getStackTrace()) { 191 if (stackElem.getMethodName().equals("onError")) { 192 env.flop(String.format("Subscription::cancel MUST NOT be called from Subscriber::onError (Rule 2.3)! (Caller: %s::%s line %d)", 193 stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber())); 194 } 195 } 196 } 197 }; 198 199 stage.probe = stage.createWhiteboxSubscriberProbe(env); 200 final Subscriber<T> sub = createSubscriber(stage.probe); 201 202 sub.onSubscribe(subs); 203 sub.onError(new TestException()); 204 205 env.verifyNoAsyncErrorsNoDelay(); 206 } 207 }); 208 } 209 210 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.4 211 @Override @Test 212 public void untested_spec204_mustConsiderTheSubscriptionAsCancelledInAfterRecievingOnCompleteOrOnError() throws Exception { 213 notVerified(); // cannot be meaningfully tested, or can it? 214 } 215 216 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.5 217 @Override @Test 218 public void required_spec205_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal() throws Throwable { 219 subscriberTest(new TestStageTestRun() { 220 @Override 221 public void run(WhiteboxTestStage stage) throws Throwable { 222 // try to subscribe another time, if the subscriber calls `probe.registerOnSubscribe` the test will fail 223 final Latch secondSubscriptionCancelled = new Latch(env); 224 final Subscriber<? super T> sub = stage.sub(); 225 final Subscription subscription = new Subscription() { 226 @Override 227 public void request(long elements) { 228 // ignore... 229 } 230 231 @Override 232 public void cancel() { 233 secondSubscriptionCancelled.close(); 234 } 235 236 @Override 237 public String toString() { 238 return "SecondSubscription(should get cancelled)"; 239 } 240 }; 241 sub.onSubscribe(subscription); 242 243 secondSubscriptionCancelled.expectClose("Expected 2nd Subscription given to subscriber to be cancelled, but `Subscription.cancel()` was not called"); 244 env.verifyNoAsyncErrors(); 245 } 246 }); 247 } 248 249 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.6 250 @Override @Test 251 public void untested_spec206_mustCallSubscriptionCancelIfItIsNoLongerValid() throws Exception { 252 notVerified(); // cannot be meaningfully tested, or can it? 253 } 254 255 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.7 256 @Override @Test 257 public void untested_spec207_mustEnsureAllCallsOnItsSubscriptionTakePlaceFromTheSameThreadOrTakeCareOfSynchronization() throws Exception { 258 notVerified(); // cannot be meaningfully tested, or can it? 259 // the same thread part of the clause can be verified but that is not very useful, or is it? 260 } 261 262 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.8 263 @Override @Test 264 public void required_spec208_mustBePreparedToReceiveOnNextSignalsAfterHavingCalledSubscriptionCancel() throws Throwable { 265 subscriberTest(new TestStageTestRun() { 266 @Override 267 public void run(WhiteboxTestStage stage) throws InterruptedException { 268 stage.puppet().triggerRequest(1); 269 stage.puppet().signalCancel(); 270 stage.signalNext(); 271 272 stage.puppet().triggerRequest(1); 273 stage.puppet().triggerRequest(1); 274 275 stage.verifyNoAsyncErrors(); 276 } 277 }); 278 } 279 280 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.9 281 @Override @Test 282 public void required_spec209_mustBePreparedToReceiveAnOnCompleteSignalWithPrecedingRequestCall() throws Throwable { 283 subscriberTest(new TestStageTestRun() { 284 @Override 285 public void run(WhiteboxTestStage stage) throws InterruptedException { 286 stage.puppet().triggerRequest(1); 287 stage.sendCompletion(); 288 stage.probe.expectCompletion(); 289 290 stage.verifyNoAsyncErrors(); 291 } 292 }); 293 } 294 295 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.9 296 @Override @Test 297 public void required_spec209_mustBePreparedToReceiveAnOnCompleteSignalWithoutPrecedingRequestCall() throws Throwable { 298 subscriberTest(new TestStageTestRun() { 299 @Override 300 public void run(WhiteboxTestStage stage) throws InterruptedException { 301 stage.sendCompletion(); 302 stage.probe.expectCompletion(); 303 304 stage.verifyNoAsyncErrors(); 305 } 306 }); 307 } 308 309 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.10 310 @Override @Test 311 public void required_spec210_mustBePreparedToReceiveAnOnErrorSignalWithPrecedingRequestCall() throws Throwable { 312 subscriberTest(new TestStageTestRun() { 313 @Override 314 public void run(WhiteboxTestStage stage) throws InterruptedException { 315 stage.puppet().triggerRequest(1); 316 stage.puppet().triggerRequest(1); 317 318 Exception ex = new TestException(); 319 stage.sendError(ex); 320 stage.probe.expectError(ex); 321 322 env.verifyNoAsyncErrorsNoDelay(); 323 } 324 }); 325 } 326 327 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.10 328 @Override @Test 329 public void required_spec210_mustBePreparedToReceiveAnOnErrorSignalWithoutPrecedingRequestCall() throws Throwable { 330 subscriberTest(new TestStageTestRun() { 331 @Override 332 public void run(WhiteboxTestStage stage) throws InterruptedException { 333 Exception ex = new TestException(); 334 stage.sendError(ex); 335 stage.probe.expectError(ex); 336 337 env.verifyNoAsyncErrorsNoDelay(); 338 } 339 }); 340 } 341 342 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.11 343 @Override @Test 344 public void untested_spec211_mustMakeSureThatAllCallsOnItsMethodsHappenBeforeTheProcessingOfTheRespectiveEvents() throws Exception { 345 notVerified(); // cannot be meaningfully tested, or can it? 346 } 347 348 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.12 349 @Override @Test 350 public void untested_spec212_mustNotCallOnSubscribeMoreThanOnceBasedOnObjectEquality_specViolation() throws Throwable { 351 notVerified(); // cannot be meaningfully tested, or can it? 352 } 353 354 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.13 355 @Override @Test 356 public void untested_spec213_failingOnSignalInvocation() throws Exception { 357 notVerified(); // cannot be meaningfully tested, or can it? 358 } 359 360 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.13 361 @Override @Test 362 public void required_spec213_onSubscribe_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable { 363 subscriberTest(new TestStageTestRun() { 364 @Override 365 public void run(WhiteboxTestStage stage) throws Throwable { 366 367 final Subscriber<? super T> sub = stage.sub(); 368 boolean gotNPE = false; 369 try { 370 sub.onSubscribe(null); 371 } catch (final NullPointerException expected) { 372 gotNPE = true; 373 } 374 375 assertTrue(gotNPE, "onSubscribe(null) did not throw NullPointerException"); 376 env.verifyNoAsyncErrorsNoDelay(); 377 } 378 }); 379 } 380 381 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.13 382 @Override @Test 383 public void required_spec213_onNext_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable { 384 subscriberTest(new TestStageTestRun() { 385 @Override 386 public void run(WhiteboxTestStage stage) throws Throwable { 387 388 final Subscriber<? super T> sub = stage.sub(); 389 boolean gotNPE = false; 390 try { 391 sub.onNext(null); 392 } catch (final NullPointerException expected) { 393 gotNPE = true; 394 } 395 396 assertTrue(gotNPE, "onNext(null) did not throw NullPointerException"); 397 env.verifyNoAsyncErrorsNoDelay(); 398 } 399 }); 400 } 401 402 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.13 403 @Override @Test 404 public void required_spec213_onError_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable { 405 subscriberTest(new TestStageTestRun() { 406 @Override 407 public void run(WhiteboxTestStage stage) throws Throwable { 408 409 final Subscriber<? super T> sub = stage.sub(); 410 boolean gotNPE = false; 411 try { 412 sub.onError(null); 413 } catch (final NullPointerException expected) { 414 gotNPE = true; 415 } finally { 416 assertTrue(gotNPE, "onError(null) did not throw NullPointerException"); 417 } 418 419 env.verifyNoAsyncErrorsNoDelay(); 420 } 421 }); 422 } 423 424 425 ////////////////////// SUBSCRIPTION SPEC RULE VERIFICATION ////////////////// 426 427 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.1 428 @Override @Test 429 public void untested_spec301_mustNotBeCalledOutsideSubscriberContext() throws Exception { 430 notVerified(); // cannot be meaningfully tested, or can it? 431 } 432 433 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.8 434 @Override @Test 435 public void required_spec308_requestMustRegisterGivenNumberElementsToBeProduced() throws Throwable { 436 subscriberTest(new TestStageTestRun() { 437 @Override 438 public void run(WhiteboxTestStage stage) throws InterruptedException { 439 stage.puppet().triggerRequest(2); 440 stage.probe.expectNext(stage.signalNext()); 441 stage.probe.expectNext(stage.signalNext()); 442 443 stage.probe.expectNone(); 444 stage.puppet().triggerRequest(3); 445 446 stage.verifyNoAsyncErrors(); 447 } 448 }); 449 } 450 451 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.10 452 @Override @Test 453 public void untested_spec310_requestMaySynchronouslyCallOnNextOnSubscriber() throws Exception { 454 notVerified(); // cannot be meaningfully tested, or can it? 455 } 456 457 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.11 458 @Override @Test 459 public void untested_spec311_requestMaySynchronouslyCallOnCompleteOrOnError() throws Exception { 460 notVerified(); // cannot be meaningfully tested, or can it? 461 } 462 463 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.14 464 @Override @Test 465 public void untested_spec314_cancelMayCauseThePublisherToShutdownIfNoOtherSubscriptionExists() throws Exception { 466 notVerified(); // cannot be meaningfully tested, or can it? 467 } 468 469 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.15 470 @Override @Test 471 public void untested_spec315_cancelMustNotThrowExceptionAndMustSignalOnError() throws Exception { 472 notVerified(); // cannot be meaningfully tested, or can it? 473 } 474 475 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.16 476 @Override @Test 477 public void untested_spec316_requestMustNotThrowExceptionAndMustOnErrorTheSubscriber() throws Exception { 478 notVerified(); // cannot be meaningfully tested, or can it? 479 } 480 481 /////////////////////// ADDITIONAL "COROLLARY" TESTS //////////////////////// 482 483 /////////////////////// TEST INFRASTRUCTURE ///////////////////////////////// 484 485 abstract class TestStageTestRun { 486 public abstract void run(WhiteboxTestStage stage) throws Throwable; 487 } 488 489 /** 490 * Prepares subscriber and publisher pair (by subscribing the first to the latter), 491 * and then hands over the tests {@link WhiteboxTestStage} over to the test. 492 * 493 * The test stage is, like in a puppet show, used to orchestrate what each participant should do. 494 * Since this is a whitebox test, this allows the stage to completely control when and how to signal / expect signals. 495 */ 496 public void subscriberTest(TestStageTestRun body) throws Throwable { 497 WhiteboxTestStage stage = new WhiteboxTestStage(env, true); 498 body.run(stage); 499 } 500 501 /** 502 * Provides a {@link WhiteboxTestStage} without performing any additional setup, 503 * like the {@link #subscriberTest(SubscriberWhiteboxVerification.TestStageTestRun)} would. 504 * 505 * Use this method to write tests in which you need full control over when and how the initial {@code subscribe} is signalled. 506 */ 507 public void subscriberTestWithoutSetup(TestStageTestRun body) throws Throwable { 508 WhiteboxTestStage stage = new WhiteboxTestStage(env, false); 509 body.run(stage); 510 } 511 512 /** 513 * Test for feature that MAY be implemented. This test will be marked as SKIPPED if it fails. 514 */ 515 public void optionalSubscriberTestWithoutSetup(TestStageTestRun body) throws Throwable { 516 try { 517 subscriberTestWithoutSetup(body); 518 } catch (Exception ex) { 519 notVerified("Skipped because tested publisher does NOT implement this OPTIONAL requirement."); 520 } 521 } 522 523 public class WhiteboxTestStage extends ManualPublisher<T> { 524 public Publisher<T> pub; 525 public ManualSubscriber<T> tees; // gives us access to a stream T values 526 public WhiteboxSubscriberProbe<T> probe; 527 528 public T lastT = null; 529 530 public WhiteboxTestStage(TestEnvironment env) throws InterruptedException { 531 this(env, true); 532 } 533 534 public WhiteboxTestStage(TestEnvironment env, boolean runDefaultInit) throws InterruptedException { 535 super(env); 536 if (runDefaultInit) { 537 pub = this.createHelperPublisher(Long.MAX_VALUE); 538 tees = env.newManualSubscriber(pub); 539 probe = new WhiteboxSubscriberProbe<T>(env, subscriber); 540 subscribe(createSubscriber(probe)); 541 probe.puppet.expectCompletion(env.defaultTimeoutMillis(), String.format("Subscriber %s did not `registerOnSubscribe`", sub())); 542 env.verifyNoAsyncErrorsNoDelay(); 543 } 544 } 545 546 public Subscriber<? super T> sub() { 547 return subscriber.value(); 548 } 549 550 public SubscriberPuppet puppet() { 551 return probe.puppet(); 552 } 553 554 public WhiteboxSubscriberProbe<T> probe() { 555 return probe; 556 } 557 558 public Publisher<T> createHelperPublisher(long elements) { 559 return SubscriberWhiteboxVerification.this.createHelperPublisher(elements); 560 } 561 562 public WhiteboxSubscriberProbe<T> createWhiteboxSubscriberProbe(TestEnvironment env) { 563 return new WhiteboxSubscriberProbe<T>(env, subscriber); 564 } 565 566 public T signalNext() throws InterruptedException { 567 return signalNext(nextT()); 568 } 569 570 private T signalNext(T element) throws InterruptedException { 571 sendNext(element); 572 return element; 573 } 574 575 public T nextT() throws InterruptedException { 576 lastT = tees.requestNextElement(); 577 return lastT; 578 } 579 580 public void verifyNoAsyncErrors() { 581 env.verifyNoAsyncErrors(); 582 } 583 } 584 585 /** 586 * This class is intented to be used as {@code Subscriber} decorator and should be used in {@code pub.subscriber(...)} calls, 587 * in order to allow intercepting calls on the underlying {@code Subscriber}. 588 * This delegation allows the proxy to implement {@link BlackboxProbe} assertions. 589 */ 590 public static class BlackboxSubscriberProxy<T> extends BlackboxProbe<T> implements Subscriber<T> { 591 592 public BlackboxSubscriberProxy(TestEnvironment env, Subscriber<T> subscriber) { 593 super(env, Promise.<Subscriber<? super T>>completed(env, subscriber)); 594 } 595 596 @Override 597 public void onSubscribe(Subscription s) { 598 sub().onSubscribe(s); 599 } 600 601 @Override 602 public void onNext(T t) { 603 registerOnNext(t); 604 sub().onNext(t); 605 } 606 607 @Override 608 public void onError(Throwable cause) { 609 registerOnError(cause); 610 sub().onError(cause); 611 } 612 613 @Override 614 public void onComplete() { 615 registerOnComplete(); 616 sub().onComplete(); 617 } 618 } 619 620 public static class BlackboxProbe<T> implements SubscriberProbe<T> { 621 protected final TestEnvironment env; 622 protected final Promise<Subscriber<? super T>> subscriber; 623 624 protected final Receptacle<T> elements; 625 protected final Promise<Throwable> error; 626 627 public BlackboxProbe(TestEnvironment env, Promise<Subscriber<? super T>> subscriber) { 628 this.env = env; 629 this.subscriber = subscriber; 630 elements = new Receptacle<T>(env); 631 error = new Promise<Throwable>(env); 632 } 633 634 @Override 635 public void registerOnNext(T element) { 636 elements.add(element); 637 } 638 639 @Override 640 public void registerOnComplete() { 641 try { 642 elements.complete(); 643 } catch (IllegalStateException ex) { 644 // "Queue full", onComplete was already called 645 env.flop("subscriber::onComplete was called a second time, which is illegal according to Rule 1.7"); 646 } 647 } 648 649 @Override 650 public void registerOnError(Throwable cause) { 651 try { 652 error.complete(cause); 653 } catch (IllegalStateException ex) { 654 // "Queue full", onError was already called 655 env.flop("subscriber::onError was called a second time, which is illegal according to Rule 1.7"); 656 } 657 } 658 659 public T expectNext() throws InterruptedException { 660 return elements.next(env.defaultTimeoutMillis(), String.format("Subscriber %s did not call `registerOnNext(_)`", sub())); 661 } 662 663 public void expectNext(T expected) throws InterruptedException { 664 expectNext(expected, env.defaultTimeoutMillis()); 665 } 666 667 public void expectNext(T expected, long timeoutMillis) throws InterruptedException { 668 T received = elements.next(timeoutMillis, String.format("Subscriber %s did not call `registerOnNext(%s)`", sub(), expected)); 669 if (!received.equals(expected)) { 670 env.flop(String.format("Subscriber %s called `registerOnNext(%s)` rather than `registerOnNext(%s)`", sub(), received, expected)); 671 } 672 } 673 674 public Subscriber<? super T> sub() { 675 return subscriber.value(); 676 } 677 678 public void expectCompletion() throws InterruptedException { 679 expectCompletion(env.defaultTimeoutMillis()); 680 } 681 682 public void expectCompletion(long timeoutMillis) throws InterruptedException { 683 expectCompletion(timeoutMillis, String.format("Subscriber %s did not call `registerOnComplete()`", sub())); 684 } 685 686 public void expectCompletion(long timeoutMillis, String msg) throws InterruptedException { 687 elements.expectCompletion(timeoutMillis, msg); 688 } 689 690 @SuppressWarnings("ThrowableResultOfMethodCallIgnored") 691 public <E extends Throwable> void expectErrorWithMessage(Class<E> expected, String requiredMessagePart) throws InterruptedException { 692 final E err = expectError(expected); 693 String message = err.getMessage(); 694 assertTrue(message.contains(requiredMessagePart), 695 String.format("Got expected exception %s but missing message [%s], was: %s", err.getClass(), requiredMessagePart, expected)); 696 } 697 698 public <E extends Throwable> E expectError(Class<E> expected) throws InterruptedException { 699 return expectError(expected, env.defaultTimeoutMillis()); 700 } 701 702 @SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"}) 703 public <E extends Throwable> E expectError(Class<E> expected, long timeoutMillis) throws InterruptedException { 704 error.expectCompletion(timeoutMillis, String.format("Subscriber %s did not call `registerOnError(%s)`", sub(), expected)); 705 if (error.value() == null) { 706 return env.flopAndFail(String.format("Subscriber %s did not call `registerOnError(%s)`", sub(), expected)); 707 } else if (expected.isInstance(error.value())) { 708 return (E) error.value(); 709 } else { 710 return env.flopAndFail(String.format("Subscriber %s called `registerOnError(%s)` rather than `registerOnError(%s)`", sub(), error.value(), expected)); 711 } 712 } 713 714 public void expectError(Throwable expected) throws InterruptedException { 715 expectError(expected, env.defaultTimeoutMillis()); 716 } 717 718 @SuppressWarnings("ThrowableResultOfMethodCallIgnored") 719 public void expectError(Throwable expected, long timeoutMillis) throws InterruptedException { 720 error.expectCompletion(timeoutMillis, String.format("Subscriber %s did not call `registerOnError(%s)`", sub(), expected)); 721 if (error.value() != expected) { 722 env.flop(String.format("Subscriber %s called `registerOnError(%s)` rather than `registerOnError(%s)`", sub(), error.value(), expected)); 723 } 724 } 725 726 public void expectNone() throws InterruptedException { 727 expectNone(env.defaultNoSignalsTimeoutMillis()); 728 } 729 730 public void expectNone(long withinMillis) throws InterruptedException { 731 elements.expectNone(withinMillis, "Expected nothing"); 732 } 733 734 } 735 736 public static class WhiteboxSubscriberProbe<T> extends BlackboxProbe<T> implements SubscriberPuppeteer { 737 protected Promise<SubscriberPuppet> puppet; 738 739 public WhiteboxSubscriberProbe(TestEnvironment env, Promise<Subscriber<? super T>> subscriber) { 740 super(env, subscriber); 741 puppet = new Promise<SubscriberPuppet>(env); 742 } 743 744 private SubscriberPuppet puppet() { 745 return puppet.value(); 746 } 747 748 @Override 749 public void registerOnSubscribe(SubscriberPuppet p) { 750 if (!puppet.isCompleted()) { 751 puppet.complete(p); 752 } 753 } 754 755 } 756 757 public interface SubscriberPuppeteer { 758 759 /** 760 * Must be called by the test subscriber when it has successfully registered a subscription 761 * inside the `onSubscribe` method. 762 */ 763 void registerOnSubscribe(SubscriberPuppet puppet); 764 } 765 766 public interface SubscriberProbe<T> { 767 768 /** 769 * Must be called by the test subscriber when it has received an`onNext` event. 770 */ 771 void registerOnNext(T element); 772 773 /** 774 * Must be called by the test subscriber when it has received an `onComplete` event. 775 */ 776 void registerOnComplete(); 777 778 /** 779 * Must be called by the test subscriber when it has received an `onError` event. 780 */ 781 void registerOnError(Throwable cause); 782 783 } 784 785 /** 786 * Implement this puppet in your Whitebox style tests. 787 * The test suite will invoke the specific trigger/signal methods requesting you to execute the specific action. 788 * Since this is a whitebox style test, you're allowed and expected to use knowladge about your implementation to 789 * make implement these calls. 790 */ 791 public interface SubscriberPuppet { 792 /** 793 * Trigger {@code request(elements)} on your {@link Subscriber} 794 */ 795 void triggerRequest(long elements); 796 797 /** 798 * Trigger {@code cancel()} on your {@link Subscriber} 799 */ 800 void signalCancel(); 801 } 802 803 public void notVerified() { 804 throw new SkipException("Not verified using this TCK."); 805 } 806 807 public void notVerified(String msg) { 808 throw new SkipException(msg); 809 } 810}