001/************************************************************************ 002 * Licensed under Public Domain (CC0) * 003 * * 004 * To the extent possible under law, the person who associated CC0 with * 005 * this code has waived all copyright and related or neighboring * 006 * rights to this code. * 007 * * 008 * You should have received a copy of the CC0 legalcode along with this * 009 * work. If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.* 010 ************************************************************************/ 011 012package org.reactivestreams.tck; 013 014import org.reactivestreams.Publisher; 015import org.reactivestreams.Subscriber; 016import org.reactivestreams.Subscription; 017import org.reactivestreams.tck.TestEnvironment.*; 018import org.reactivestreams.tck.support.Optional; 019import org.reactivestreams.tck.support.SubscriberWhiteboxVerificationRules; 020import org.reactivestreams.tck.support.TestException; 021import org.testng.SkipException; 022import org.testng.annotations.AfterClass; 023import org.testng.annotations.BeforeClass; 024import org.testng.annotations.BeforeMethod; 025import org.testng.annotations.Test; 026 027import java.util.concurrent.ExecutorService; 028import java.util.concurrent.Executors; 029 030import static org.testng.Assert.assertTrue; 031 032/** 033 * Provides tests for verifying {@link org.reactivestreams.Subscriber} and {@link org.reactivestreams.Subscription} specification rules. 034 * 035 * @see org.reactivestreams.Subscriber 036 * @see org.reactivestreams.Subscription 037 */ 038public abstract class SubscriberWhiteboxVerification<T> extends WithHelperPublisher<T> 039 implements SubscriberWhiteboxVerificationRules { 040 041 private final TestEnvironment env; 042 043 protected SubscriberWhiteboxVerification(TestEnvironment env) { 044 this.env = env; 045 } 046 047 // USER API 048 049 /** 050 * This is the main method you must implement in your test incarnation. 051 * It must create a new {@link org.reactivestreams.Subscriber} instance to be subjected to the testing logic. 052 * 053 * In order to be meaningfully testable your Subscriber must inform the given 054 * `WhiteboxSubscriberProbe` of the respective events having been received. 055 */ 056 public abstract Subscriber<T> createSubscriber(WhiteboxSubscriberProbe<T> probe); 057 058 // ENV SETUP 059 060 /** 061 * Executor service used by the default provided asynchronous Publisher. 062 * @see #createHelperPublisher(long) 063 */ 064 private ExecutorService publisherExecutor; 065 @BeforeClass public void startPublisherExecutorService() { publisherExecutor = Executors.newFixedThreadPool(4); } 066 @AfterClass public void shutdownPublisherExecutorService() { if (publisherExecutor != null) publisherExecutor.shutdown(); } 067 @Override public ExecutorService publisherExecutorService() { return publisherExecutor; } 068 069 ////////////////////// TEST ENV CLEANUP ///////////////////////////////////// 070 071 @BeforeMethod 072 public void setUp() throws Exception { 073 env.clearAsyncErrors(); 074 } 075 076 ////////////////////// TEST SETUP VERIFICATION ////////////////////////////// 077 078 @Test 079 public void required_exerciseWhiteboxHappyPath() throws Throwable { 080 subscriberTest(new TestStageTestRun() { 081 @Override 082 public void run(WhiteboxTestStage stage) throws InterruptedException { 083 stage.puppet().triggerRequest(1); 084 stage.puppet().triggerRequest(1); 085 086 long receivedRequests = stage.expectRequest(); 087 088 stage.signalNext(); 089 stage.probe.expectNext(stage.lastT); 090 091 stage.puppet().triggerRequest(1); 092 if (receivedRequests == 1) { 093 stage.expectRequest(); 094 } 095 096 stage.signalNext(); 097 stage.probe.expectNext(stage.lastT); 098 099 stage.puppet().signalCancel(); 100 stage.expectCancelling(); 101 102 stage.verifyNoAsyncErrors(); 103 } 104 }); 105 } 106 107 ////////////////////// SPEC RULE VERIFICATION /////////////////////////////// 108 109 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.1 110 @Override @Test 111 public void required_spec201_mustSignalDemandViaSubscriptionRequest() throws Throwable { 112 subscriberTest(new TestStageTestRun() { 113 @Override 114 public void run(WhiteboxTestStage stage) throws InterruptedException { 115 stage.puppet().triggerRequest(1); 116 stage.expectRequest(); 117 118 stage.signalNext(); 119 } 120 }); 121 } 122 123 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.2 124 @Override @Test 125 public void untested_spec202_shouldAsynchronouslyDispatch() throws Exception { 126 notVerified(); // cannot be meaningfully tested, or can it? 127 } 128 129 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.3 130 @Override @Test 131 public void required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnComplete() throws Throwable { 132 subscriberTestWithoutSetup(new TestStageTestRun() { 133 @Override 134 public void run(WhiteboxTestStage stage) throws Throwable { 135 final Subscription subs = new Subscription() { 136 @Override 137 public void request(long n) { 138 final Optional<StackTraceElement> onCompleteStackTraceElement = env.findCallerMethodInStackTrace("onComplete"); 139 if (onCompleteStackTraceElement.isDefined()) { 140 final StackTraceElement stackElem = onCompleteStackTraceElement.get(); 141 env.flop(String.format("Subscription::request MUST NOT be called from Subscriber::onComplete (Rule 2.3)! (Caller: %s::%s line %d)", 142 stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber())); 143 } 144 } 145 146 @Override 147 public void cancel() { 148 final Optional<StackTraceElement> onCompleteStackElement = env.findCallerMethodInStackTrace("onComplete"); 149 if (onCompleteStackElement.isDefined()) { 150 final StackTraceElement stackElem = onCompleteStackElement.get(); 151 env.flop(String.format("Subscription::cancel MUST NOT be called from Subscriber::onComplete (Rule 2.3)! (Caller: %s::%s line %d)", 152 stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber())); 153 } 154 } 155 }; 156 157 stage.probe = stage.createWhiteboxSubscriberProbe(env); 158 final Subscriber<T> sub = createSubscriber(stage.probe); 159 160 sub.onSubscribe(subs); 161 sub.onComplete(); 162 163 env.verifyNoAsyncErrorsNoDelay(); 164 } 165 }); 166 } 167 168 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.3 169 @Override @Test 170 public void required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnError() throws Throwable { 171 subscriberTestWithoutSetup(new TestStageTestRun() { 172 @Override 173 public void run(WhiteboxTestStage stage) throws Throwable { 174 final Subscription subs = new Subscription() { 175 @Override 176 public void request(long n) { 177 Throwable thr = new Throwable(); 178 for (StackTraceElement stackElem : thr.getStackTrace()) { 179 if (stackElem.getMethodName().equals("onError")) { 180 env.flop(String.format("Subscription::request MUST NOT be called from Subscriber::onError (Rule 2.3)! (Caller: %s::%s line %d)", 181 stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber())); 182 } 183 } 184 } 185 186 @Override 187 public void cancel() { 188 Throwable thr = new Throwable(); 189 for (StackTraceElement stackElem : thr.getStackTrace()) { 190 if (stackElem.getMethodName().equals("onError")) { 191 env.flop(String.format("Subscription::cancel MUST NOT be called from Subscriber::onError (Rule 2.3)! (Caller: %s::%s line %d)", 192 stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber())); 193 } 194 } 195 } 196 }; 197 198 stage.probe = stage.createWhiteboxSubscriberProbe(env); 199 final Subscriber<T> sub = createSubscriber(stage.probe); 200 201 sub.onSubscribe(subs); 202 sub.onError(new TestException()); 203 204 env.verifyNoAsyncErrorsNoDelay(); 205 } 206 }); 207 } 208 209 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.4 210 @Override @Test 211 public void untested_spec204_mustConsiderTheSubscriptionAsCancelledInAfterRecievingOnCompleteOrOnError() throws Exception { 212 notVerified(); // cannot be meaningfully tested, or can it? 213 } 214 215 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.5 216 @Override @Test 217 public void required_spec205_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal() throws Throwable { 218 subscriberTest(new TestStageTestRun() { 219 @Override 220 public void run(WhiteboxTestStage stage) throws Throwable { 221 // try to subscribe another time, if the subscriber calls `probe.registerOnSubscribe` the test will fail 222 final Latch secondSubscriptionCancelled = new Latch(env); 223 final Subscriber<? super T> sub = stage.sub(); 224 final Subscription subscription = new Subscription() { 225 @Override 226 public void request(long elements) { 227 // ignore... 228 } 229 230 @Override 231 public void cancel() { 232 secondSubscriptionCancelled.close(); 233 } 234 235 @Override 236 public String toString() { 237 return "SecondSubscription(should get cancelled)"; 238 } 239 }; 240 sub.onSubscribe(subscription); 241 242 secondSubscriptionCancelled.expectClose("Expected 2nd Subscription given to subscriber to be cancelled, but `Subscription.cancel()` was not called"); 243 env.verifyNoAsyncErrors(); 244 } 245 }); 246 } 247 248 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.6 249 @Override @Test 250 public void untested_spec206_mustCallSubscriptionCancelIfItIsNoLongerValid() throws Exception { 251 notVerified(); // cannot be meaningfully tested, or can it? 252 } 253 254 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.7 255 @Override @Test 256 public void untested_spec207_mustEnsureAllCallsOnItsSubscriptionTakePlaceFromTheSameThreadOrTakeCareOfSynchronization() throws Exception { 257 notVerified(); // cannot be meaningfully tested, or can it? 258 // the same thread part of the clause can be verified but that is not very useful, or is it? 259 } 260 261 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.8 262 @Override @Test 263 public void required_spec208_mustBePreparedToReceiveOnNextSignalsAfterHavingCalledSubscriptionCancel() throws Throwable { 264 subscriberTest(new TestStageTestRun() { 265 @Override 266 public void run(WhiteboxTestStage stage) throws InterruptedException { 267 stage.puppet().triggerRequest(1); 268 stage.puppet().signalCancel(); 269 stage.signalNext(); 270 271 stage.puppet().triggerRequest(1); 272 stage.puppet().triggerRequest(1); 273 274 stage.verifyNoAsyncErrors(); 275 } 276 }); 277 } 278 279 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.9 280 @Override @Test 281 public void required_spec209_mustBePreparedToReceiveAnOnCompleteSignalWithPrecedingRequestCall() throws Throwable { 282 subscriberTest(new TestStageTestRun() { 283 @Override 284 public void run(WhiteboxTestStage stage) throws InterruptedException { 285 stage.puppet().triggerRequest(1); 286 stage.sendCompletion(); 287 stage.probe.expectCompletion(); 288 289 stage.verifyNoAsyncErrors(); 290 } 291 }); 292 } 293 294 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.9 295 @Override @Test 296 public void required_spec209_mustBePreparedToReceiveAnOnCompleteSignalWithoutPrecedingRequestCall() throws Throwable { 297 subscriberTest(new TestStageTestRun() { 298 @Override 299 public void run(WhiteboxTestStage stage) throws InterruptedException { 300 stage.sendCompletion(); 301 stage.probe.expectCompletion(); 302 303 stage.verifyNoAsyncErrors(); 304 } 305 }); 306 } 307 308 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.10 309 @Override @Test 310 public void required_spec210_mustBePreparedToReceiveAnOnErrorSignalWithPrecedingRequestCall() throws Throwable { 311 subscriberTest(new TestStageTestRun() { 312 @Override 313 public void run(WhiteboxTestStage stage) throws InterruptedException { 314 stage.puppet().triggerRequest(1); 315 stage.puppet().triggerRequest(1); 316 317 Exception ex = new TestException(); 318 stage.sendError(ex); 319 stage.probe.expectError(ex); 320 321 env.verifyNoAsyncErrorsNoDelay(); 322 } 323 }); 324 } 325 326 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.10 327 @Override @Test 328 public void required_spec210_mustBePreparedToReceiveAnOnErrorSignalWithoutPrecedingRequestCall() throws Throwable { 329 subscriberTest(new TestStageTestRun() { 330 @Override 331 public void run(WhiteboxTestStage stage) throws InterruptedException { 332 Exception ex = new TestException(); 333 stage.sendError(ex); 334 stage.probe.expectError(ex); 335 336 env.verifyNoAsyncErrorsNoDelay(); 337 } 338 }); 339 } 340 341 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.11 342 @Override @Test 343 public void untested_spec211_mustMakeSureThatAllCallsOnItsMethodsHappenBeforeTheProcessingOfTheRespectiveEvents() throws Exception { 344 notVerified(); // cannot be meaningfully tested, or can it? 345 } 346 347 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.12 348 @Override @Test 349 public void untested_spec212_mustNotCallOnSubscribeMoreThanOnceBasedOnObjectEquality_specViolation() throws Throwable { 350 notVerified(); // cannot be meaningfully tested, or can it? 351 } 352 353 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.13 354 @Override @Test 355 public void untested_spec213_failingOnSignalInvocation() throws Exception { 356 notVerified(); // cannot be meaningfully tested, or can it? 357 } 358 359 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.13 360 @Override @Test 361 public void required_spec213_onSubscribe_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable { 362 subscriberTest(new TestStageTestRun() { 363 @Override 364 public void run(WhiteboxTestStage stage) throws Throwable { 365 366 final Subscriber<? super T> sub = stage.sub(); 367 boolean gotNPE = false; 368 try { 369 sub.onSubscribe(null); 370 } catch (final NullPointerException expected) { 371 gotNPE = true; 372 } 373 374 assertTrue(gotNPE, "onSubscribe(null) did not throw NullPointerException"); 375 env.verifyNoAsyncErrorsNoDelay(); 376 } 377 }); 378 } 379 380 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.13 381 @Override @Test 382 public void required_spec213_onNext_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable { 383 subscriberTest(new TestStageTestRun() { 384 @Override 385 public void run(WhiteboxTestStage stage) throws Throwable { 386 387 final Subscriber<? super T> sub = stage.sub(); 388 boolean gotNPE = false; 389 try { 390 sub.onNext(null); 391 } catch (final NullPointerException expected) { 392 gotNPE = true; 393 } 394 395 assertTrue(gotNPE, "onNext(null) did not throw NullPointerException"); 396 env.verifyNoAsyncErrorsNoDelay(); 397 } 398 }); 399 } 400 401 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.13 402 @Override @Test 403 public void required_spec213_onError_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable { 404 subscriberTest(new TestStageTestRun() { 405 @Override 406 public void run(WhiteboxTestStage stage) throws Throwable { 407 408 final Subscriber<? super T> sub = stage.sub(); 409 boolean gotNPE = false; 410 try { 411 sub.onError(null); 412 } catch (final NullPointerException expected) { 413 gotNPE = true; 414 } finally { 415 assertTrue(gotNPE, "onError(null) did not throw NullPointerException"); 416 } 417 418 env.verifyNoAsyncErrorsNoDelay(); 419 } 420 }); 421 } 422 423 424 ////////////////////// SUBSCRIPTION SPEC RULE VERIFICATION ////////////////// 425 426 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.1 427 @Override @Test 428 public void untested_spec301_mustNotBeCalledOutsideSubscriberContext() throws Exception { 429 notVerified(); // cannot be meaningfully tested, or can it? 430 } 431 432 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.8 433 @Override @Test 434 public void required_spec308_requestMustRegisterGivenNumberElementsToBeProduced() throws Throwable { 435 subscriberTest(new TestStageTestRun() { 436 @Override 437 public void run(WhiteboxTestStage stage) throws InterruptedException { 438 stage.puppet().triggerRequest(2); 439 stage.probe.expectNext(stage.signalNext()); 440 stage.probe.expectNext(stage.signalNext()); 441 442 stage.probe.expectNone(); 443 stage.puppet().triggerRequest(3); 444 445 stage.verifyNoAsyncErrors(); 446 } 447 }); 448 } 449 450 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.10 451 @Override @Test 452 public void untested_spec310_requestMaySynchronouslyCallOnNextOnSubscriber() throws Exception { 453 notVerified(); // cannot be meaningfully tested, or can it? 454 } 455 456 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.11 457 @Override @Test 458 public void untested_spec311_requestMaySynchronouslyCallOnCompleteOrOnError() throws Exception { 459 notVerified(); // cannot be meaningfully tested, or can it? 460 } 461 462 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.14 463 @Override @Test 464 public void untested_spec314_cancelMayCauseThePublisherToShutdownIfNoOtherSubscriptionExists() throws Exception { 465 notVerified(); // cannot be meaningfully tested, or can it? 466 } 467 468 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.15 469 @Override @Test 470 public void untested_spec315_cancelMustNotThrowExceptionAndMustSignalOnError() throws Exception { 471 notVerified(); // cannot be meaningfully tested, or can it? 472 } 473 474 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.16 475 @Override @Test 476 public void untested_spec316_requestMustNotThrowExceptionAndMustOnErrorTheSubscriber() throws Exception { 477 notVerified(); // cannot be meaningfully tested, or can it? 478 } 479 480 /////////////////////// ADDITIONAL "COROLLARY" TESTS //////////////////////// 481 482 /////////////////////// TEST INFRASTRUCTURE ///////////////////////////////// 483 484 abstract class TestStageTestRun { 485 public abstract void run(WhiteboxTestStage stage) throws Throwable; 486 } 487 488 /** 489 * Prepares subscriber and publisher pair (by subscribing the first to the latter), 490 * and then hands over the tests {@link WhiteboxTestStage} over to the test. 491 * 492 * The test stage is, like in a puppet show, used to orchestrate what each participant should do. 493 * Since this is a whitebox test, this allows the stage to completely control when and how to signal / expect signals. 494 */ 495 public void subscriberTest(TestStageTestRun body) throws Throwable { 496 WhiteboxTestStage stage = new WhiteboxTestStage(env, true); 497 body.run(stage); 498 } 499 500 /** 501 * Provides a {@link WhiteboxTestStage} without performing any additional setup, 502 * like the {@link #subscriberTest(SubscriberWhiteboxVerification.TestStageTestRun)} would. 503 * 504 * Use this method to write tests in which you need full control over when and how the initial {@code subscribe} is signalled. 505 */ 506 public void subscriberTestWithoutSetup(TestStageTestRun body) throws Throwable { 507 WhiteboxTestStage stage = new WhiteboxTestStage(env, false); 508 body.run(stage); 509 } 510 511 /** 512 * Test for feature that MAY be implemented. This test will be marked as SKIPPED if it fails. 513 */ 514 public void optionalSubscriberTestWithoutSetup(TestStageTestRun body) throws Throwable { 515 try { 516 subscriberTestWithoutSetup(body); 517 } catch (Exception ex) { 518 notVerified("Skipped because tested publisher does NOT implement this OPTIONAL requirement."); 519 } 520 } 521 522 public class WhiteboxTestStage extends ManualPublisher<T> { 523 public Publisher<T> pub; 524 public ManualSubscriber<T> tees; // gives us access to a stream T values 525 public WhiteboxSubscriberProbe<T> probe; 526 527 public T lastT = null; 528 529 public WhiteboxTestStage(TestEnvironment env) throws InterruptedException { 530 this(env, true); 531 } 532 533 public WhiteboxTestStage(TestEnvironment env, boolean runDefaultInit) throws InterruptedException { 534 super(env); 535 if (runDefaultInit) { 536 pub = this.createHelperPublisher(Long.MAX_VALUE); 537 tees = env.newManualSubscriber(pub); 538 probe = new WhiteboxSubscriberProbe<T>(env, subscriber); 539 subscribe(createSubscriber(probe)); 540 probe.puppet.expectCompletion(env.defaultTimeoutMillis(), String.format("Subscriber %s did not `registerOnSubscribe`", sub())); 541 } 542 } 543 544 public Subscriber<? super T> sub() { 545 return subscriber.value(); 546 } 547 548 public SubscriberPuppet puppet() { 549 return probe.puppet(); 550 } 551 552 public WhiteboxSubscriberProbe<T> probe() { 553 return probe; 554 } 555 556 public Publisher<T> createHelperPublisher(long elements) { 557 return SubscriberWhiteboxVerification.this.createHelperPublisher(elements); 558 } 559 560 public WhiteboxSubscriberProbe<T> createWhiteboxSubscriberProbe(TestEnvironment env) { 561 return new WhiteboxSubscriberProbe<T>(env, subscriber); 562 } 563 564 public T signalNext() throws InterruptedException { 565 return signalNext(nextT()); 566 } 567 568 private T signalNext(T element) throws InterruptedException { 569 sendNext(element); 570 return element; 571 } 572 573 public T nextT() throws InterruptedException { 574 lastT = tees.requestNextElement(); 575 return lastT; 576 } 577 578 public void verifyNoAsyncErrors() { 579 env.verifyNoAsyncErrors(); 580 } 581 } 582 583 /** 584 * This class is intented to be used as {@code Subscriber} decorator and should be used in {@code pub.subscriber(...)} calls, 585 * in order to allow intercepting calls on the underlying {@code Subscriber}. 586 * This delegation allows the proxy to implement {@link BlackboxProbe} assertions. 587 */ 588 public static class BlackboxSubscriberProxy<T> extends BlackboxProbe<T> implements Subscriber<T> { 589 590 public BlackboxSubscriberProxy(TestEnvironment env, Subscriber<T> subscriber) { 591 super(env, Promise.<Subscriber<? super T>>completed(env, subscriber)); 592 } 593 594 @Override 595 public void onSubscribe(Subscription s) { 596 sub().onSubscribe(s); 597 } 598 599 @Override 600 public void onNext(T t) { 601 registerOnNext(t); 602 sub().onNext(t); 603 } 604 605 @Override 606 public void onError(Throwable cause) { 607 registerOnError(cause); 608 sub().onError(cause); 609 } 610 611 @Override 612 public void onComplete() { 613 registerOnComplete(); 614 sub().onComplete(); 615 } 616 } 617 618 public static class BlackboxProbe<T> implements SubscriberProbe<T> { 619 protected final TestEnvironment env; 620 protected final Promise<Subscriber<? super T>> subscriber; 621 622 protected final Receptacle<T> elements; 623 protected final Promise<Throwable> error; 624 625 public BlackboxProbe(TestEnvironment env, Promise<Subscriber<? super T>> subscriber) { 626 this.env = env; 627 this.subscriber = subscriber; 628 elements = new Receptacle<T>(env); 629 error = new Promise<Throwable>(env); 630 } 631 632 @Override 633 public void registerOnNext(T element) { 634 elements.add(element); 635 } 636 637 @Override 638 public void registerOnComplete() { 639 try { 640 elements.complete(); 641 } catch (IllegalStateException ex) { 642 // "Queue full", onComplete was already called 643 env.flop("subscriber::onComplete was called a second time, which is illegal according to Rule 1.7"); 644 } 645 } 646 647 @Override 648 public void registerOnError(Throwable cause) { 649 try { 650 error.complete(cause); 651 } catch (IllegalStateException ex) { 652 // "Queue full", onError was already called 653 env.flop("subscriber::onError was called a second time, which is illegal according to Rule 1.7"); 654 } 655 } 656 657 public T expectNext() throws InterruptedException { 658 return elements.next(env.defaultTimeoutMillis(), String.format("Subscriber %s did not call `registerOnNext(_)`", sub())); 659 } 660 661 public void expectNext(T expected) throws InterruptedException { 662 expectNext(expected, env.defaultTimeoutMillis()); 663 } 664 665 public void expectNext(T expected, long timeoutMillis) throws InterruptedException { 666 T received = elements.next(timeoutMillis, String.format("Subscriber %s did not call `registerOnNext(%s)`", sub(), expected)); 667 if (!received.equals(expected)) { 668 env.flop(String.format("Subscriber %s called `registerOnNext(%s)` rather than `registerOnNext(%s)`", sub(), received, expected)); 669 } 670 } 671 672 public Subscriber<? super T> sub() { 673 return subscriber.value(); 674 } 675 676 public void expectCompletion() throws InterruptedException { 677 expectCompletion(env.defaultTimeoutMillis()); 678 } 679 680 public void expectCompletion(long timeoutMillis) throws InterruptedException { 681 expectCompletion(timeoutMillis, String.format("Subscriber %s did not call `registerOnComplete()`", sub())); 682 } 683 684 public void expectCompletion(long timeoutMillis, String msg) throws InterruptedException { 685 elements.expectCompletion(timeoutMillis, msg); 686 } 687 688 @SuppressWarnings("ThrowableResultOfMethodCallIgnored") 689 public <E extends Throwable> void expectErrorWithMessage(Class<E> expected, String requiredMessagePart) throws InterruptedException { 690 final E err = expectError(expected); 691 String message = err.getMessage(); 692 assertTrue(message.contains(requiredMessagePart), 693 String.format("Got expected exception %s but missing message [%s], was: %s", err.getClass(), requiredMessagePart, expected)); 694 } 695 696 public <E extends Throwable> E expectError(Class<E> expected) throws InterruptedException { 697 return expectError(expected, env.defaultTimeoutMillis()); 698 } 699 700 @SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"}) 701 public <E extends Throwable> E expectError(Class<E> expected, long timeoutMillis) throws InterruptedException { 702 error.expectCompletion(timeoutMillis, String.format("Subscriber %s did not call `registerOnError(%s)`", sub(), expected)); 703 if (error.value() == null) { 704 return env.flopAndFail(String.format("Subscriber %s did not call `registerOnError(%s)`", sub(), expected)); 705 } else if (expected.isInstance(error.value())) { 706 return (E) error.value(); 707 } else { 708 return env.flopAndFail(String.format("Subscriber %s called `registerOnError(%s)` rather than `registerOnError(%s)`", sub(), error.value(), expected)); 709 } 710 } 711 712 public void expectError(Throwable expected) throws InterruptedException { 713 expectError(expected, env.defaultTimeoutMillis()); 714 } 715 716 @SuppressWarnings("ThrowableResultOfMethodCallIgnored") 717 public void expectError(Throwable expected, long timeoutMillis) throws InterruptedException { 718 error.expectCompletion(timeoutMillis, String.format("Subscriber %s did not call `registerOnError(%s)`", sub(), expected)); 719 if (error.value() != expected) { 720 env.flop(String.format("Subscriber %s called `registerOnError(%s)` rather than `registerOnError(%s)`", sub(), error.value(), expected)); 721 } 722 } 723 724 public void expectNone() throws InterruptedException { 725 expectNone(env.defaultNoSignalsTimeoutMillis()); 726 } 727 728 public void expectNone(long withinMillis) throws InterruptedException { 729 elements.expectNone(withinMillis, "Expected nothing"); 730 } 731 732 } 733 734 public static class WhiteboxSubscriberProbe<T> extends BlackboxProbe<T> implements SubscriberPuppeteer { 735 protected Promise<SubscriberPuppet> puppet; 736 737 public WhiteboxSubscriberProbe(TestEnvironment env, Promise<Subscriber<? super T>> subscriber) { 738 super(env, subscriber); 739 puppet = new Promise<SubscriberPuppet>(env); 740 } 741 742 private SubscriberPuppet puppet() { 743 return puppet.value(); 744 } 745 746 @Override 747 public void registerOnSubscribe(SubscriberPuppet p) { 748 if (!puppet.isCompleted()) { 749 puppet.complete(p); 750 } 751 } 752 753 } 754 755 public interface SubscriberPuppeteer { 756 757 /** 758 * Must be called by the test subscriber when it has successfully registered a subscription 759 * inside the `onSubscribe` method. 760 */ 761 void registerOnSubscribe(SubscriberPuppet puppet); 762 } 763 764 public interface SubscriberProbe<T> { 765 766 /** 767 * Must be called by the test subscriber when it has received an`onNext` event. 768 */ 769 void registerOnNext(T element); 770 771 /** 772 * Must be called by the test subscriber when it has received an `onComplete` event. 773 */ 774 void registerOnComplete(); 775 776 /** 777 * Must be called by the test subscriber when it has received an `onError` event. 778 */ 779 void registerOnError(Throwable cause); 780 781 } 782 783 public interface SubscriberPuppet { 784 void triggerRequest(long elements); 785 786 void signalCancel(); 787 } 788 789 public void notVerified() { 790 throw new SkipException("Not verified using this TCK."); 791 } 792 793 public void notVerified(String msg) { 794 throw new SkipException(msg); 795 } 796}