001/************************************************************************ 002 * Licensed under Public Domain (CC0) * 003 * * 004 * To the extent possible under law, the person who associated CC0 with * 005 * this code has waived all copyright and related or neighboring * 006 * rights to this code. * 007 * * 008 * You should have received a copy of the CC0 legalcode along with this * 009 * work. If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.* 010 ************************************************************************/ 011 012package org.reactivestreams.tck; 013 014import org.reactivestreams.Publisher; 015import org.reactivestreams.Subscriber; 016import org.reactivestreams.Subscription; 017import org.reactivestreams.tck.TestEnvironment.*; 018import org.reactivestreams.tck.flow.support.Optional; 019import org.reactivestreams.tck.flow.support.SubscriberWhiteboxVerificationRules; 020import org.reactivestreams.tck.flow.support.TestException; 021import org.testng.SkipException; 022import org.testng.annotations.AfterClass; 023import org.testng.annotations.BeforeClass; 024import org.testng.annotations.BeforeMethod; 025import org.testng.annotations.Test; 026 027import java.util.concurrent.ExecutorService; 028import java.util.concurrent.Executors; 029 030import static org.testng.Assert.assertTrue; 031 032/** 033 * Provides whitebox style tests for verifying {@link org.reactivestreams.Subscriber} 034 * and {@link org.reactivestreams.Subscription} specification rules. 035 * 036 * @see org.reactivestreams.Subscriber 037 * @see org.reactivestreams.Subscription 038 */ 039public abstract class SubscriberWhiteboxVerification<T> extends WithHelperPublisher<T> 040 implements SubscriberWhiteboxVerificationRules { 041 042 private final TestEnvironment env; 043 044 protected SubscriberWhiteboxVerification(TestEnvironment env) { 045 this.env = env; 046 } 047 048 // USER API 049 050 /** 051 * This is the main method you must implement in your test incarnation. 052 * It must create a new {@link org.reactivestreams.Subscriber} instance to be subjected to the testing logic. 053 * 054 * In order to be meaningfully testable your Subscriber must inform the given 055 * `WhiteboxSubscriberProbe` of the respective events having been received. 056 */ 057 public abstract Subscriber<T> createSubscriber(WhiteboxSubscriberProbe<T> probe); 058 059 // ENV SETUP 060 061 /** 062 * Executor service used by the default provided asynchronous Publisher. 063 * @see #createHelperPublisher(long) 064 */ 065 private ExecutorService publisherExecutor; 066 @BeforeClass public void startPublisherExecutorService() { publisherExecutor = Executors.newFixedThreadPool(4); } 067 @AfterClass public void shutdownPublisherExecutorService() { if (publisherExecutor != null) publisherExecutor.shutdown(); } 068 @Override public ExecutorService publisherExecutorService() { return publisherExecutor; } 069 070 ////////////////////// TEST ENV CLEANUP ///////////////////////////////////// 071 072 @BeforeMethod 073 public void setUp() throws Exception { 074 env.clearAsyncErrors(); 075 } 076 077 ////////////////////// TEST SETUP VERIFICATION ////////////////////////////// 078 079 @Test 080 public void required_exerciseWhiteboxHappyPath() throws Throwable { 081 subscriberTest(new TestStageTestRun() { 082 @Override 083 public void run(WhiteboxTestStage stage) throws InterruptedException { 084 stage.puppet().triggerRequest(1); 085 stage.puppet().triggerRequest(1); 086 087 long receivedRequests = stage.expectRequest(); 088 089 stage.signalNext(); 090 stage.probe.expectNext(stage.lastT); 091 092 stage.puppet().triggerRequest(1); 093 if (receivedRequests == 1) { 094 stage.expectRequest(); 095 } 096 097 stage.signalNext(); 098 stage.probe.expectNext(stage.lastT); 099 100 stage.puppet().signalCancel(); 101 stage.expectCancelling(); 102 103 stage.verifyNoAsyncErrors(); 104 } 105 }); 106 } 107 108 ////////////////////// SPEC RULE VERIFICATION /////////////////////////////// 109 110 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.1 111 @Override @Test 112 public void required_spec201_mustSignalDemandViaSubscriptionRequest() throws Throwable { 113 subscriberTest(new TestStageTestRun() { 114 @Override 115 public void run(WhiteboxTestStage stage) throws InterruptedException { 116 stage.puppet().triggerRequest(1); 117 stage.expectRequest(); 118 119 stage.signalNext(); 120 } 121 }); 122 } 123 124 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.2 125 @Override @Test 126 public void untested_spec202_shouldAsynchronouslyDispatch() throws Exception { 127 notVerified(); // cannot be meaningfully tested, or can it? 128 } 129 130 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.3 131 @Override @Test 132 public void required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnComplete() throws Throwable { 133 subscriberTestWithoutSetup(new TestStageTestRun() { 134 @Override 135 public void run(WhiteboxTestStage stage) throws Throwable { 136 final Subscription subs = new Subscription() { 137 @Override 138 public void request(long n) { 139 final Optional<StackTraceElement> onCompleteStackTraceElement = env.findCallerMethodInStackTrace("onComplete"); 140 if (onCompleteStackTraceElement.isDefined()) { 141 final StackTraceElement stackElem = onCompleteStackTraceElement.get(); 142 env.flop(String.format("Subscription::request MUST NOT be called from Subscriber::onComplete (Rule 2.3)! (Caller: %s::%s line %d)", 143 stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber())); 144 } 145 } 146 147 @Override 148 public void cancel() { 149 final Optional<StackTraceElement> onCompleteStackElement = env.findCallerMethodInStackTrace("onComplete"); 150 if (onCompleteStackElement.isDefined()) { 151 final StackTraceElement stackElem = onCompleteStackElement.get(); 152 env.flop(String.format("Subscription::cancel MUST NOT be called from Subscriber::onComplete (Rule 2.3)! (Caller: %s::%s line %d)", 153 stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber())); 154 } 155 } 156 }; 157 158 stage.probe = stage.createWhiteboxSubscriberProbe(env); 159 final Subscriber<T> sub = createSubscriber(stage.probe); 160 161 sub.onSubscribe(subs); 162 sub.onComplete(); 163 164 env.verifyNoAsyncErrorsNoDelay(); 165 } 166 }); 167 } 168 169 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.3 170 @Override @Test 171 public void required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnError() throws Throwable { 172 subscriberTestWithoutSetup(new TestStageTestRun() { 173 @Override 174 public void run(WhiteboxTestStage stage) throws Throwable { 175 final Subscription subs = new Subscription() { 176 @Override 177 public void request(long n) { 178 Throwable thr = new Throwable(); 179 for (StackTraceElement stackElem : thr.getStackTrace()) { 180 if (stackElem.getMethodName().equals("onError")) { 181 env.flop(String.format("Subscription::request MUST NOT be called from Subscriber::onError (Rule 2.3)! (Caller: %s::%s line %d)", 182 stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber())); 183 } 184 } 185 } 186 187 @Override 188 public void cancel() { 189 Throwable thr = new Throwable(); 190 for (StackTraceElement stackElem : thr.getStackTrace()) { 191 if (stackElem.getMethodName().equals("onError")) { 192 env.flop(String.format("Subscription::cancel MUST NOT be called from Subscriber::onError (Rule 2.3)! (Caller: %s::%s line %d)", 193 stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber())); 194 } 195 } 196 } 197 }; 198 199 stage.probe = stage.createWhiteboxSubscriberProbe(env); 200 final Subscriber<T> sub = createSubscriber(stage.probe); 201 202 sub.onSubscribe(subs); 203 sub.onError(new TestException()); 204 205 env.verifyNoAsyncErrorsNoDelay(); 206 } 207 }); 208 } 209 210 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.4 211 @Override @Test 212 public void untested_spec204_mustConsiderTheSubscriptionAsCancelledInAfterRecievingOnCompleteOrOnError() throws Exception { 213 notVerified(); // cannot be meaningfully tested, or can it? 214 } 215 216 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.5 217 @Override @Test 218 public void required_spec205_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal() throws Throwable { 219 subscriberTest(new TestStageTestRun() { 220 @Override 221 public void run(WhiteboxTestStage stage) throws Throwable { 222 // try to subscribe another time, if the subscriber calls `probe.registerOnSubscribe` the test will fail 223 final Latch secondSubscriptionCancelled = new Latch(env); 224 final Subscriber<? super T> sub = stage.sub(); 225 final Subscription subscription = new Subscription() { 226 @Override 227 public void request(long elements) { 228 // ignore... 229 } 230 231 @Override 232 public void cancel() { 233 secondSubscriptionCancelled.close(); 234 } 235 236 @Override 237 public String toString() { 238 return "SecondSubscription(should get cancelled)"; 239 } 240 }; 241 sub.onSubscribe(subscription); 242 243 secondSubscriptionCancelled.expectClose("Expected 2nd Subscription given to subscriber to be cancelled, but `Subscription.cancel()` was not called"); 244 env.verifyNoAsyncErrors(); 245 } 246 }); 247 } 248 249 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.6 250 @Override @Test 251 public void untested_spec206_mustCallSubscriptionCancelIfItIsNoLongerValid() throws Exception { 252 notVerified(); // cannot be meaningfully tested, or can it? 253 } 254 255 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.7 256 @Override @Test 257 public void untested_spec207_mustEnsureAllCallsOnItsSubscriptionTakePlaceFromTheSameThreadOrTakeCareOfSynchronization() throws Exception { 258 notVerified(); // cannot be meaningfully tested, or can it? 259 // the same thread part of the clause can be verified but that is not very useful, or is it? 260 } 261 262 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.8 263 @Override @Test 264 public void required_spec208_mustBePreparedToReceiveOnNextSignalsAfterHavingCalledSubscriptionCancel() throws Throwable { 265 subscriberTest(new TestStageTestRun() { 266 @Override 267 public void run(WhiteboxTestStage stage) throws InterruptedException { 268 stage.puppet().triggerRequest(1); 269 stage.expectRequest(); 270 stage.puppet().signalCancel(); 271 stage.expectCancelling(); 272 stage.signalNext(); 273 274 stage.puppet().triggerRequest(1); 275 stage.puppet().triggerRequest(1); 276 277 stage.verifyNoAsyncErrors(); 278 } 279 }); 280 } 281 282 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.9 283 @Override @Test 284 public void required_spec209_mustBePreparedToReceiveAnOnCompleteSignalWithPrecedingRequestCall() throws Throwable { 285 subscriberTest(new TestStageTestRun() { 286 @Override 287 public void run(WhiteboxTestStage stage) throws InterruptedException { 288 stage.puppet().triggerRequest(1); 289 stage.sendCompletion(); 290 stage.probe.expectCompletion(); 291 292 stage.verifyNoAsyncErrors(); 293 } 294 }); 295 } 296 297 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.9 298 @Override @Test 299 public void required_spec209_mustBePreparedToReceiveAnOnCompleteSignalWithoutPrecedingRequestCall() throws Throwable { 300 subscriberTest(new TestStageTestRun() { 301 @Override 302 public void run(WhiteboxTestStage stage) throws InterruptedException { 303 stage.sendCompletion(); 304 stage.probe.expectCompletion(); 305 306 stage.verifyNoAsyncErrors(); 307 } 308 }); 309 } 310 311 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.10 312 @Override @Test 313 public void required_spec210_mustBePreparedToReceiveAnOnErrorSignalWithPrecedingRequestCall() throws Throwable { 314 subscriberTest(new TestStageTestRun() { 315 @Override 316 public void run(WhiteboxTestStage stage) throws InterruptedException { 317 stage.puppet().triggerRequest(1); 318 stage.puppet().triggerRequest(1); 319 320 Exception ex = new TestException(); 321 stage.sendError(ex); 322 stage.probe.expectError(ex); 323 324 env.verifyNoAsyncErrorsNoDelay(); 325 } 326 }); 327 } 328 329 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.10 330 @Override @Test 331 public void required_spec210_mustBePreparedToReceiveAnOnErrorSignalWithoutPrecedingRequestCall() throws Throwable { 332 subscriberTest(new TestStageTestRun() { 333 @Override 334 public void run(WhiteboxTestStage stage) throws InterruptedException { 335 Exception ex = new TestException(); 336 stage.sendError(ex); 337 stage.probe.expectError(ex); 338 339 env.verifyNoAsyncErrorsNoDelay(); 340 } 341 }); 342 } 343 344 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.11 345 @Override @Test 346 public void untested_spec211_mustMakeSureThatAllCallsOnItsMethodsHappenBeforeTheProcessingOfTheRespectiveEvents() throws Exception { 347 notVerified(); // cannot be meaningfully tested, or can it? 348 } 349 350 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.12 351 @Override @Test 352 public void untested_spec212_mustNotCallOnSubscribeMoreThanOnceBasedOnObjectEquality_specViolation() throws Throwable { 353 notVerified(); // cannot be meaningfully tested, or can it? 354 } 355 356 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.13 357 @Override @Test 358 public void untested_spec213_failingOnSignalInvocation() throws Exception { 359 notVerified(); // cannot be meaningfully tested, or can it? 360 } 361 362 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.13 363 @Override @Test 364 public void required_spec213_onSubscribe_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable { 365 subscriberTest(new TestStageTestRun() { 366 @Override 367 public void run(WhiteboxTestStage stage) throws Throwable { 368 369 final Subscriber<? super T> sub = stage.sub(); 370 boolean gotNPE = false; 371 try { 372 sub.onSubscribe(null); 373 } catch (final NullPointerException expected) { 374 gotNPE = true; 375 } 376 377 assertTrue(gotNPE, "onSubscribe(null) did not throw NullPointerException"); 378 env.verifyNoAsyncErrorsNoDelay(); 379 } 380 }); 381 } 382 383 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.13 384 @Override @Test 385 public void required_spec213_onNext_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable { 386 subscriberTest(new TestStageTestRun() { 387 @Override 388 public void run(WhiteboxTestStage stage) throws Throwable { 389 390 final Subscriber<? super T> sub = stage.sub(); 391 boolean gotNPE = false; 392 try { 393 sub.onNext(null); 394 } catch (final NullPointerException expected) { 395 gotNPE = true; 396 } 397 398 assertTrue(gotNPE, "onNext(null) did not throw NullPointerException"); 399 env.verifyNoAsyncErrorsNoDelay(); 400 } 401 }); 402 } 403 404 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.13 405 @Override @Test 406 public void required_spec213_onError_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable { 407 subscriberTest(new TestStageTestRun() { 408 @Override 409 public void run(WhiteboxTestStage stage) throws Throwable { 410 411 final Subscriber<? super T> sub = stage.sub(); 412 boolean gotNPE = false; 413 try { 414 sub.onError(null); 415 } catch (final NullPointerException expected) { 416 gotNPE = true; 417 } finally { 418 assertTrue(gotNPE, "onError(null) did not throw NullPointerException"); 419 } 420 421 env.verifyNoAsyncErrorsNoDelay(); 422 } 423 }); 424 } 425 426 427 ////////////////////// SUBSCRIPTION SPEC RULE VERIFICATION ////////////////// 428 429 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.1 430 @Override @Test 431 public void untested_spec301_mustNotBeCalledOutsideSubscriberContext() throws Exception { 432 notVerified(); // cannot be meaningfully tested, or can it? 433 } 434 435 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.8 436 @Override @Test 437 public void required_spec308_requestMustRegisterGivenNumberElementsToBeProduced() throws Throwable { 438 subscriberTest(new TestStageTestRun() { 439 @Override 440 public void run(WhiteboxTestStage stage) throws InterruptedException { 441 stage.puppet().triggerRequest(2); 442 long requestedElements = stage.expectRequest(); 443 stage.probe.expectNext(stage.signalNext()); 444 // Some subscribers may only request one element at a time. 445 if (requestedElements < 2) { 446 stage.expectRequest(); 447 } 448 stage.probe.expectNext(stage.signalNext()); 449 450 stage.probe.expectNone(); 451 stage.puppet().triggerRequest(3); 452 453 stage.verifyNoAsyncErrors(); 454 } 455 }); 456 } 457 458 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.10 459 @Override @Test 460 public void untested_spec310_requestMaySynchronouslyCallOnNextOnSubscriber() throws Exception { 461 notVerified(); // cannot be meaningfully tested, or can it? 462 } 463 464 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.11 465 @Override @Test 466 public void untested_spec311_requestMaySynchronouslyCallOnCompleteOrOnError() throws Exception { 467 notVerified(); // cannot be meaningfully tested, or can it? 468 } 469 470 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.14 471 @Override @Test 472 public void untested_spec314_cancelMayCauseThePublisherToShutdownIfNoOtherSubscriptionExists() throws Exception { 473 notVerified(); // cannot be meaningfully tested, or can it? 474 } 475 476 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.15 477 @Override @Test 478 public void untested_spec315_cancelMustNotThrowExceptionAndMustSignalOnError() throws Exception { 479 notVerified(); // cannot be meaningfully tested, or can it? 480 } 481 482 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.16 483 @Override @Test 484 public void untested_spec316_requestMustNotThrowExceptionAndMustOnErrorTheSubscriber() throws Exception { 485 notVerified(); // cannot be meaningfully tested, or can it? 486 } 487 488 /////////////////////// ADDITIONAL "COROLLARY" TESTS //////////////////////// 489 490 /////////////////////// TEST INFRASTRUCTURE ///////////////////////////////// 491 492 abstract class TestStageTestRun { 493 public abstract void run(WhiteboxTestStage stage) throws Throwable; 494 } 495 496 /** 497 * Prepares subscriber and publisher pair (by subscribing the first to the latter), 498 * and then hands over the tests {@link WhiteboxTestStage} over to the test. 499 * 500 * The test stage is, like in a puppet show, used to orchestrate what each participant should do. 501 * Since this is a whitebox test, this allows the stage to completely control when and how to signal / expect signals. 502 */ 503 public void subscriberTest(TestStageTestRun body) throws Throwable { 504 WhiteboxTestStage stage = new WhiteboxTestStage(env, true); 505 body.run(stage); 506 } 507 508 /** 509 * Provides a {@link WhiteboxTestStage} without performing any additional setup, 510 * like the {@link #subscriberTest(SubscriberWhiteboxVerification.TestStageTestRun)} would. 511 * 512 * Use this method to write tests in which you need full control over when and how the initial {@code subscribe} is signalled. 513 */ 514 public void subscriberTestWithoutSetup(TestStageTestRun body) throws Throwable { 515 WhiteboxTestStage stage = new WhiteboxTestStage(env, false); 516 body.run(stage); 517 } 518 519 /** 520 * Test for feature that MAY be implemented. This test will be marked as SKIPPED if it fails. 521 */ 522 public void optionalSubscriberTestWithoutSetup(TestStageTestRun body) throws Throwable { 523 try { 524 subscriberTestWithoutSetup(body); 525 } catch (Exception ex) { 526 notVerified("Skipped because tested publisher does NOT implement this OPTIONAL requirement."); 527 } 528 } 529 530 public class WhiteboxTestStage extends ManualPublisher<T> { 531 public Publisher<T> pub; 532 public ManualSubscriber<T> tees; // gives us access to a stream T values 533 public WhiteboxSubscriberProbe<T> probe; 534 535 public T lastT = null; 536 537 public WhiteboxTestStage(TestEnvironment env) throws InterruptedException { 538 this(env, true); 539 } 540 541 public WhiteboxTestStage(TestEnvironment env, boolean runDefaultInit) throws InterruptedException { 542 super(env); 543 if (runDefaultInit) { 544 pub = this.createHelperPublisher(Long.MAX_VALUE); 545 tees = env.newManualSubscriber(pub); 546 probe = new WhiteboxSubscriberProbe<T>(env, subscriber); 547 subscribe(createSubscriber(probe)); 548 probe.puppet.expectCompletion(env.defaultTimeoutMillis(), String.format("Subscriber %s did not `registerOnSubscribe`", sub())); 549 env.verifyNoAsyncErrorsNoDelay(); 550 } 551 } 552 553 public Subscriber<? super T> sub() { 554 return subscriber.value(); 555 } 556 557 public SubscriberPuppet puppet() { 558 return probe.puppet(); 559 } 560 561 public WhiteboxSubscriberProbe<T> probe() { 562 return probe; 563 } 564 565 public Publisher<T> createHelperPublisher(long elements) { 566 return SubscriberWhiteboxVerification.this.createHelperPublisher(elements); 567 } 568 569 public WhiteboxSubscriberProbe<T> createWhiteboxSubscriberProbe(TestEnvironment env) { 570 return new WhiteboxSubscriberProbe<T>(env, subscriber); 571 } 572 573 public T signalNext() throws InterruptedException { 574 return signalNext(nextT()); 575 } 576 577 private T signalNext(T element) throws InterruptedException { 578 sendNext(element); 579 return element; 580 } 581 582 public T nextT() throws InterruptedException { 583 lastT = tees.requestNextElement(); 584 return lastT; 585 } 586 587 public void verifyNoAsyncErrors() { 588 env.verifyNoAsyncErrors(); 589 } 590 } 591 592 /** 593 * This class is intented to be used as {@code Subscriber} decorator and should be used in {@code pub.subscriber(...)} calls, 594 * in order to allow intercepting calls on the underlying {@code Subscriber}. 595 * This delegation allows the proxy to implement {@link BlackboxProbe} assertions. 596 */ 597 public static class BlackboxSubscriberProxy<T> extends BlackboxProbe<T> implements Subscriber<T> { 598 599 public BlackboxSubscriberProxy(TestEnvironment env, Subscriber<T> subscriber) { 600 super(env, Promise.<Subscriber<? super T>>completed(env, subscriber)); 601 } 602 603 @Override 604 public void onSubscribe(Subscription s) { 605 sub().onSubscribe(s); 606 } 607 608 @Override 609 public void onNext(T t) { 610 registerOnNext(t); 611 sub().onNext(t); 612 } 613 614 @Override 615 public void onError(Throwable cause) { 616 registerOnError(cause); 617 sub().onError(cause); 618 } 619 620 @Override 621 public void onComplete() { 622 registerOnComplete(); 623 sub().onComplete(); 624 } 625 } 626 627 public static class BlackboxProbe<T> implements SubscriberProbe<T> { 628 protected final TestEnvironment env; 629 protected final Promise<Subscriber<? super T>> subscriber; 630 631 protected final Receptacle<T> elements; 632 protected final Promise<Throwable> error; 633 634 public BlackboxProbe(TestEnvironment env, Promise<Subscriber<? super T>> subscriber) { 635 this.env = env; 636 this.subscriber = subscriber; 637 elements = new Receptacle<T>(env); 638 error = new Promise<Throwable>(env); 639 } 640 641 @Override 642 public void registerOnNext(T element) { 643 elements.add(element); 644 } 645 646 @Override 647 public void registerOnComplete() { 648 try { 649 elements.complete(); 650 } catch (IllegalStateException ex) { 651 // "Queue full", onComplete was already called 652 env.flop("subscriber::onComplete was called a second time, which is illegal according to Rule 1.7"); 653 } 654 } 655 656 @Override 657 public void registerOnError(Throwable cause) { 658 try { 659 error.complete(cause); 660 } catch (IllegalStateException ex) { 661 // "Queue full", onError was already called 662 env.flop("subscriber::onError was called a second time, which is illegal according to Rule 1.7"); 663 } 664 } 665 666 public T expectNext() throws InterruptedException { 667 return elements.next(env.defaultTimeoutMillis(), String.format("Subscriber %s did not call `registerOnNext(_)`", sub())); 668 } 669 670 public void expectNext(T expected) throws InterruptedException { 671 expectNext(expected, env.defaultTimeoutMillis()); 672 } 673 674 public void expectNext(T expected, long timeoutMillis) throws InterruptedException { 675 T received = elements.next(timeoutMillis, String.format("Subscriber %s did not call `registerOnNext(%s)`", sub(), expected)); 676 if (!received.equals(expected)) { 677 env.flop(String.format("Subscriber %s called `registerOnNext(%s)` rather than `registerOnNext(%s)`", sub(), received, expected)); 678 } 679 } 680 681 public Subscriber<? super T> sub() { 682 return subscriber.value(); 683 } 684 685 public void expectCompletion() throws InterruptedException { 686 expectCompletion(env.defaultTimeoutMillis()); 687 } 688 689 public void expectCompletion(long timeoutMillis) throws InterruptedException { 690 expectCompletion(timeoutMillis, String.format("Subscriber %s did not call `registerOnComplete()`", sub())); 691 } 692 693 public void expectCompletion(long timeoutMillis, String msg) throws InterruptedException { 694 elements.expectCompletion(timeoutMillis, msg); 695 } 696 697 @SuppressWarnings("ThrowableResultOfMethodCallIgnored") 698 public <E extends Throwable> void expectErrorWithMessage(Class<E> expected, String requiredMessagePart) throws InterruptedException { 699 final E err = expectError(expected); 700 String message = err.getMessage(); 701 assertTrue(message.contains(requiredMessagePart), 702 String.format("Got expected exception %s but missing message [%s], was: %s", err.getClass(), requiredMessagePart, expected)); 703 } 704 705 public <E extends Throwable> E expectError(Class<E> expected) throws InterruptedException { 706 return expectError(expected, env.defaultTimeoutMillis()); 707 } 708 709 @SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"}) 710 public <E extends Throwable> E expectError(Class<E> expected, long timeoutMillis) throws InterruptedException { 711 error.expectCompletion(timeoutMillis, String.format("Subscriber %s did not call `registerOnError(%s)`", sub(), expected)); 712 if (error.value() == null) { 713 return env.flopAndFail(String.format("Subscriber %s did not call `registerOnError(%s)`", sub(), expected)); 714 } else if (expected.isInstance(error.value())) { 715 return (E) error.value(); 716 } else { 717 return env.flopAndFail(String.format("Subscriber %s called `registerOnError(%s)` rather than `registerOnError(%s)`", sub(), error.value(), expected)); 718 } 719 } 720 721 public void expectError(Throwable expected) throws InterruptedException { 722 expectError(expected, env.defaultTimeoutMillis()); 723 } 724 725 @SuppressWarnings("ThrowableResultOfMethodCallIgnored") 726 public void expectError(Throwable expected, long timeoutMillis) throws InterruptedException { 727 error.expectCompletion(timeoutMillis, String.format("Subscriber %s did not call `registerOnError(%s)`", sub(), expected)); 728 if (error.value() != expected) { 729 env.flop(String.format("Subscriber %s called `registerOnError(%s)` rather than `registerOnError(%s)`", sub(), error.value(), expected)); 730 } 731 } 732 733 public void expectNone() throws InterruptedException { 734 expectNone(env.defaultNoSignalsTimeoutMillis()); 735 } 736 737 public void expectNone(long withinMillis) throws InterruptedException { 738 elements.expectNone(withinMillis, "Expected nothing"); 739 } 740 741 } 742 743 public static class WhiteboxSubscriberProbe<T> extends BlackboxProbe<T> implements SubscriberPuppeteer { 744 protected Promise<SubscriberPuppet> puppet; 745 746 public WhiteboxSubscriberProbe(TestEnvironment env, Promise<Subscriber<? super T>> subscriber) { 747 super(env, subscriber); 748 puppet = new Promise<SubscriberPuppet>(env); 749 } 750 751 private SubscriberPuppet puppet() { 752 return puppet.value(); 753 } 754 755 @Override 756 public void registerOnSubscribe(SubscriberPuppet p) { 757 if (!puppet.isCompleted()) { 758 puppet.complete(p); 759 } 760 } 761 762 } 763 764 public interface SubscriberPuppeteer { 765 766 /** 767 * Must be called by the test subscriber when it has successfully registered a subscription 768 * inside the `onSubscribe` method. 769 */ 770 void registerOnSubscribe(SubscriberPuppet puppet); 771 } 772 773 public interface SubscriberProbe<T> { 774 775 /** 776 * Must be called by the test subscriber when it has received an`onNext` event. 777 */ 778 void registerOnNext(T element); 779 780 /** 781 * Must be called by the test subscriber when it has received an `onComplete` event. 782 */ 783 void registerOnComplete(); 784 785 /** 786 * Must be called by the test subscriber when it has received an `onError` event. 787 */ 788 void registerOnError(Throwable cause); 789 790 } 791 792 /** 793 * Implement this puppet in your Whitebox style tests. 794 * The test suite will invoke the specific trigger/signal methods requesting you to execute the specific action. 795 * Since this is a whitebox style test, you're allowed and expected to use knowladge about your implementation to 796 * make implement these calls. 797 */ 798 public interface SubscriberPuppet { 799 800 /** 801 * Ensure that at least {@code elements} are eventually requested by your {@link Subscriber}, if it hasn't already 802 * requested that many elements. 803 * <p> 804 * This does not necessarily have to correlate 1:1 with a {@code Subscription.request(elements)} call, but the sum 805 * of the elements requested by your {@code Subscriber} must eventually be at least the the sum of the elements 806 * triggered to be requested by all the invocations of this method. 807 * <p> 808 * Additionally, subscribers are permitted to delay requesting elements until previous requests for elements have 809 * been fulfilled. For example, a subscriber that only requests one element at a time may fulfill the request made 810 * by this method by requesting one element {@code elements} times, waiting for each element to arrive before the 811 * next request is made. 812 * <p> 813 * Before sending any element to the subscriber, the TCK must wait for the subscriber to request that element, and 814 * must be prepared for the subscriber to only request one element at a time, it is not enough for the TCK to 815 * simply invoke this method before sending elements. 816 * <p> 817 * An invocation of {@link #signalCancel()} may be coalesced into any elements that have not yet been requested, 818 * such that only a cancel signal is emitted. 819 */ 820 void triggerRequest(long elements); 821 822 /** 823 * Trigger {@code cancel()} on your {@link Subscriber}. 824 * <p> 825 * An invocation of this method may be coalesced into any outstanding requests, as requested by 826 * {@link #triggerRequest(long)}, such that only a cancel signal is emitted. 827 */ 828 void signalCancel(); 829 } 830 831 public void notVerified() { 832 throw new SkipException("Not verified using this TCK."); 833 } 834 835 public void notVerified(String msg) { 836 throw new SkipException(msg); 837 } 838}