001package org.reactivestreams.tck;
002
003import org.reactivestreams.Publisher;
004import org.reactivestreams.Subscriber;
005import org.reactivestreams.Subscription;
006import org.reactivestreams.tck.TestEnvironment.*;
007import org.reactivestreams.tck.support.Optional;
008import org.reactivestreams.tck.support.SubscriberWhiteboxVerificationRules;
009import org.reactivestreams.tck.support.TestException;
010import org.testng.SkipException;
011import org.testng.annotations.AfterClass;
012import org.testng.annotations.BeforeClass;
013import org.testng.annotations.BeforeMethod;
014import org.testng.annotations.Test;
015
016import java.util.concurrent.ExecutorService;
017import java.util.concurrent.Executors;
018
019import static org.testng.Assert.assertTrue;
020
021/**
022 * Provides tests for verifying {@link org.reactivestreams.Subscriber} and {@link org.reactivestreams.Subscription} specification rules.
023 *
024 * @see org.reactivestreams.Subscriber
025 * @see org.reactivestreams.Subscription
026 */
027public abstract class SubscriberWhiteboxVerification<T> extends WithHelperPublisher<T>
028  implements SubscriberWhiteboxVerificationRules {
029
030  private final TestEnvironment env;
031
032  protected SubscriberWhiteboxVerification(TestEnvironment env) {
033    this.env = env;
034  }
035
036  // USER API
037
038  /**
039   * This is the main method you must implement in your test incarnation.
040   * It must create a new {@link org.reactivestreams.Subscriber} instance to be subjected to the testing logic.
041   *
042   * In order to be meaningfully testable your Subscriber must inform the given
043   * `WhiteboxSubscriberProbe` of the respective events having been received.
044   */
045  public abstract Subscriber<T> createSubscriber(WhiteboxSubscriberProbe<T> probe);
046
047  // ENV SETUP
048
049  /**
050   * Executor service used by the default provided asynchronous Publisher.
051   * @see #createHelperPublisher(long)
052   */
053  private ExecutorService publisherExecutor;
054  @BeforeClass public void startPublisherExecutorService() { publisherExecutor = Executors.newFixedThreadPool(4); }
055  @AfterClass public void shutdownPublisherExecutorService() { if (publisherExecutor != null) publisherExecutor.shutdown(); }
056  @Override public ExecutorService publisherExecutorService() { return publisherExecutor; }
057
058  ////////////////////// TEST ENV CLEANUP /////////////////////////////////////
059
060  @BeforeMethod
061  public void setUp() throws Exception {
062    env.clearAsyncErrors();
063  }
064
065  ////////////////////// TEST SETUP VERIFICATION //////////////////////////////
066
067  @Test
068  public void required_exerciseWhiteboxHappyPath() throws Throwable {
069    subscriberTest(new TestStageTestRun() {
070      @Override
071      public void run(WhiteboxTestStage stage) throws InterruptedException {
072        stage.puppet().triggerRequest(1);
073        stage.puppet().triggerRequest(1);
074
075        long receivedRequests = stage.expectRequest();
076
077        stage.signalNext();
078        stage.probe.expectNext(stage.lastT);
079
080        stage.puppet().triggerRequest(1);
081        if (receivedRequests == 1) {
082          stage.expectRequest();
083        }
084
085        stage.signalNext();
086        stage.probe.expectNext(stage.lastT);
087
088        stage.puppet().signalCancel();
089        stage.expectCancelling();
090
091        stage.verifyNoAsyncErrors();
092      }
093    });
094  }
095
096  ////////////////////// SPEC RULE VERIFICATION ///////////////////////////////
097
098  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.1
099  @Override @Test
100  public void required_spec201_mustSignalDemandViaSubscriptionRequest() throws Throwable {
101    subscriberTest(new TestStageTestRun() {
102      @Override
103      public void run(WhiteboxTestStage stage) throws InterruptedException {
104        stage.puppet().triggerRequest(1);
105        stage.expectRequest();
106
107        stage.signalNext();
108      }
109    });
110  }
111
112  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.2
113  @Override @Test
114  public void untested_spec202_shouldAsynchronouslyDispatch() throws Exception {
115    notVerified(); // cannot be meaningfully tested, or can it?
116  }
117
118  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.3
119  @Override @Test
120  public void required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnComplete() throws Throwable {
121    subscriberTestWithoutSetup(new TestStageTestRun() {
122      @Override
123      public void run(WhiteboxTestStage stage) throws Throwable {
124        final Subscription subs = new Subscription() {
125          @Override
126          public void request(long n) {
127            final Optional<StackTraceElement> onCompleteStackTraceElement = env.findCallerMethodInStackTrace("onComplete");
128            if (onCompleteStackTraceElement.isDefined()) {
129              final StackTraceElement stackElem = onCompleteStackTraceElement.get();
130              env.flop(String.format("Subscription::request MUST NOT be called from Subscriber::onComplete (Rule 2.3)! (Caller: %s::%s line %d)",
131                                     stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber()));
132            }
133          }
134
135          @Override
136          public void cancel() {
137            final Optional<StackTraceElement> onCompleteStackElement = env.findCallerMethodInStackTrace("onComplete");
138            if (onCompleteStackElement.isDefined()) {
139              final StackTraceElement stackElem = onCompleteStackElement.get();
140              env.flop(String.format("Subscription::cancel MUST NOT be called from Subscriber::onComplete (Rule 2.3)! (Caller: %s::%s line %d)",
141                                     stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber()));
142            }
143          }
144        };
145
146        stage.probe = stage.createWhiteboxSubscriberProbe(env);
147        final Subscriber<T> sub = createSubscriber(stage.probe);
148
149        sub.onSubscribe(subs);
150        sub.onComplete();
151
152        env.verifyNoAsyncErrorsNoDelay();
153      }
154    });
155  }
156
157  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.3
158  @Override @Test
159  public void required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnError() throws Throwable {
160    subscriberTestWithoutSetup(new TestStageTestRun() {
161      @Override
162      public void run(WhiteboxTestStage stage) throws Throwable {
163        final Subscription subs = new Subscription() {
164          @Override
165          public void request(long n) {
166            Throwable thr = new Throwable();
167            for (StackTraceElement stackElem : thr.getStackTrace()) {
168              if (stackElem.getMethodName().equals("onError")) {
169                env.flop(String.format("Subscription::request MUST NOT be called from Subscriber::onError (Rule 2.3)! (Caller: %s::%s line %d)",
170                                       stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber()));
171              }
172            }
173          }
174
175          @Override
176          public void cancel() {
177            Throwable thr = new Throwable();
178            for (StackTraceElement stackElem : thr.getStackTrace()) {
179              if (stackElem.getMethodName().equals("onError")) {
180                env.flop(String.format("Subscription::cancel MUST NOT be called from Subscriber::onError (Rule 2.3)! (Caller: %s::%s line %d)",
181                                       stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber()));
182              }
183            }
184          }
185        };
186
187        stage.probe = stage.createWhiteboxSubscriberProbe(env);
188        final Subscriber<T> sub = createSubscriber(stage.probe);
189
190        sub.onSubscribe(subs);
191        sub.onError(new TestException());
192
193        env.verifyNoAsyncErrorsNoDelay();
194      }
195    });
196  }
197
198  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.4
199  @Override @Test
200  public void untested_spec204_mustConsiderTheSubscriptionAsCancelledInAfterRecievingOnCompleteOrOnError() throws Exception {
201    notVerified(); // cannot be meaningfully tested, or can it?
202  }
203
204  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.5
205  @Override @Test
206  public void required_spec205_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal() throws Throwable {
207    subscriberTest(new TestStageTestRun() {
208      @Override
209      public void run(WhiteboxTestStage stage) throws Throwable {
210        // try to subscribe another time, if the subscriber calls `probe.registerOnSubscribe` the test will fail
211        final Latch secondSubscriptionCancelled = new Latch(env);
212        final Subscriber<? super T> sub = stage.sub();
213        final Subscription subscription = new Subscription() {
214          @Override
215          public void request(long elements) {
216            // ignore...
217          }
218
219          @Override
220          public void cancel() {
221            secondSubscriptionCancelled.close();
222          }
223
224          @Override
225          public String toString() {
226            return "SecondSubscription(should get cancelled)";
227          }
228        };
229        sub.onSubscribe(subscription);
230
231        secondSubscriptionCancelled.expectClose("Expected 2nd Subscription given to subscriber to be cancelled, but `Subscription.cancel()` was not called");
232        env.verifyNoAsyncErrors();
233      }
234    });
235  }
236
237  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.6
238  @Override @Test
239  public void untested_spec206_mustCallSubscriptionCancelIfItIsNoLongerValid() throws Exception {
240    notVerified(); // cannot be meaningfully tested, or can it?
241  }
242
243  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.7
244  @Override @Test
245  public void untested_spec207_mustEnsureAllCallsOnItsSubscriptionTakePlaceFromTheSameThreadOrTakeCareOfSynchronization() throws Exception {
246    notVerified(); // cannot be meaningfully tested, or can it?
247    // the same thread part of the clause can be verified but that is not very useful, or is it?
248  }
249
250  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.8
251  @Override @Test
252  public void required_spec208_mustBePreparedToReceiveOnNextSignalsAfterHavingCalledSubscriptionCancel() throws Throwable {
253    subscriberTest(new TestStageTestRun() {
254      @Override
255      public void run(WhiteboxTestStage stage) throws InterruptedException {
256        stage.puppet().triggerRequest(1);
257        stage.puppet().signalCancel();
258        stage.signalNext();
259
260        stage.puppet().triggerRequest(1);
261        stage.puppet().triggerRequest(1);
262
263        stage.verifyNoAsyncErrors();
264      }
265    });
266  }
267
268  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.9
269  @Override @Test
270  public void required_spec209_mustBePreparedToReceiveAnOnCompleteSignalWithPrecedingRequestCall() throws Throwable {
271    subscriberTest(new TestStageTestRun() {
272      @Override
273      public void run(WhiteboxTestStage stage) throws InterruptedException {
274        stage.puppet().triggerRequest(1);
275        stage.sendCompletion();
276        stage.probe.expectCompletion();
277
278        stage.verifyNoAsyncErrors();
279      }
280    });
281  }
282
283  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.9
284  @Override @Test
285  public void required_spec209_mustBePreparedToReceiveAnOnCompleteSignalWithoutPrecedingRequestCall() throws Throwable {
286    subscriberTest(new TestStageTestRun() {
287      @Override
288      public void run(WhiteboxTestStage stage) throws InterruptedException {
289        stage.sendCompletion();
290        stage.probe.expectCompletion();
291
292        stage.verifyNoAsyncErrors();
293      }
294    });
295  }
296
297  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.10
298  @Override @Test
299  public void required_spec210_mustBePreparedToReceiveAnOnErrorSignalWithPrecedingRequestCall() throws Throwable {
300    subscriberTest(new TestStageTestRun() {
301      @Override
302      public void run(WhiteboxTestStage stage) throws InterruptedException {
303        stage.puppet().triggerRequest(1);
304        stage.puppet().triggerRequest(1);
305
306        Exception ex = new TestException();
307        stage.sendError(ex);
308        stage.probe.expectError(ex);
309
310        env.verifyNoAsyncErrorsNoDelay();
311      }
312    });
313  }
314
315  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.10
316  @Override @Test
317  public void required_spec210_mustBePreparedToReceiveAnOnErrorSignalWithoutPrecedingRequestCall() throws Throwable {
318    subscriberTest(new TestStageTestRun() {
319      @Override
320      public void run(WhiteboxTestStage stage) throws InterruptedException {
321        Exception ex = new TestException();
322        stage.sendError(ex);
323        stage.probe.expectError(ex);
324
325        env.verifyNoAsyncErrorsNoDelay();
326      }
327    });
328  }
329
330  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.11
331  @Override @Test
332  public void untested_spec211_mustMakeSureThatAllCallsOnItsMethodsHappenBeforeTheProcessingOfTheRespectiveEvents() throws Exception {
333    notVerified(); // cannot be meaningfully tested, or can it?
334  }
335
336  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.12
337  @Override @Test
338  public void untested_spec212_mustNotCallOnSubscribeMoreThanOnceBasedOnObjectEquality_specViolation() throws Throwable {
339    notVerified(); // cannot be meaningfully tested, or can it?
340  }
341
342  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.13
343  @Override @Test
344  public void untested_spec213_failingOnSignalInvocation() throws Exception {
345    notVerified(); // cannot be meaningfully tested, or can it?
346  }
347
348  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.13
349  @Override @Test
350  public void required_spec213_onSubscribe_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable {
351    subscriberTest(new TestStageTestRun() {
352      @Override
353      public void run(WhiteboxTestStage stage) throws Throwable {
354
355        final Subscriber<? super T> sub = stage.sub();
356        boolean gotNPE = false;
357        try {
358          sub.onSubscribe(null);
359        } catch (final NullPointerException expected) {
360          gotNPE = true;
361        } 
362
363        assertTrue(gotNPE, "onSubscribe(null) did not throw NullPointerException");
364        env.verifyNoAsyncErrorsNoDelay();
365      }
366    });
367  }
368
369  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.13
370  @Override @Test
371  public void required_spec213_onNext_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable {
372    subscriberTest(new TestStageTestRun() {
373      @Override
374      public void run(WhiteboxTestStage stage) throws Throwable {
375
376        final Subscriber<? super T> sub = stage.sub();
377        boolean gotNPE = false;
378        try {
379          sub.onNext(null);
380        } catch (final NullPointerException expected) {
381          gotNPE = true;
382        }
383
384        assertTrue(gotNPE, "onNext(null) did not throw NullPointerException");
385        env.verifyNoAsyncErrorsNoDelay();
386      }
387    });
388  }
389
390  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.13
391  @Override @Test
392  public void required_spec213_onError_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable {
393    subscriberTest(new TestStageTestRun() {
394      @Override
395      public void run(WhiteboxTestStage stage) throws Throwable {
396
397          final Subscriber<? super T> sub = stage.sub();
398          boolean gotNPE = false;
399          try {
400            sub.onError(null);
401          } catch (final NullPointerException expected) {
402            gotNPE = true;
403          } finally {
404            assertTrue(gotNPE, "onError(null) did not throw NullPointerException");
405          }
406
407        env.verifyNoAsyncErrorsNoDelay();
408      }
409    });
410  }
411
412
413  ////////////////////// SUBSCRIPTION SPEC RULE VERIFICATION //////////////////
414
415  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.1
416  @Override @Test
417  public void untested_spec301_mustNotBeCalledOutsideSubscriberContext() throws Exception {
418    notVerified(); // cannot be meaningfully tested, or can it?
419  }
420
421  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.8
422  @Override @Test
423  public void required_spec308_requestMustRegisterGivenNumberElementsToBeProduced() throws Throwable {
424    subscriberTest(new TestStageTestRun() {
425      @Override
426      public void run(WhiteboxTestStage stage) throws InterruptedException {
427        stage.puppet().triggerRequest(2);
428        stage.probe.expectNext(stage.signalNext());
429        stage.probe.expectNext(stage.signalNext());
430
431        stage.probe.expectNone();
432        stage.puppet().triggerRequest(3);
433
434        stage.verifyNoAsyncErrors();
435      }
436    });
437  }
438
439  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.10
440  @Override @Test
441  public void untested_spec310_requestMaySynchronouslyCallOnNextOnSubscriber() throws Exception {
442    notVerified(); // cannot be meaningfully tested, or can it?
443  }
444
445  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.11
446  @Override @Test
447  public void untested_spec311_requestMaySynchronouslyCallOnCompleteOrOnError() throws Exception {
448    notVerified(); // cannot be meaningfully tested, or can it?
449  }
450
451  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.14
452  @Override @Test
453  public void untested_spec314_cancelMayCauseThePublisherToShutdownIfNoOtherSubscriptionExists() throws Exception {
454    notVerified(); // cannot be meaningfully tested, or can it?
455  }
456
457  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.15
458  @Override @Test
459  public void untested_spec315_cancelMustNotThrowExceptionAndMustSignalOnError() throws Exception {
460    notVerified(); // cannot be meaningfully tested, or can it?
461  }
462
463  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.16
464  @Override @Test
465  public void untested_spec316_requestMustNotThrowExceptionAndMustOnErrorTheSubscriber() throws Exception {
466    notVerified(); // cannot be meaningfully tested, or can it?
467  }
468
469  /////////////////////// ADDITIONAL "COROLLARY" TESTS ////////////////////////
470
471  /////////////////////// TEST INFRASTRUCTURE /////////////////////////////////
472
473  abstract class TestStageTestRun {
474    public abstract void run(WhiteboxTestStage stage) throws Throwable;
475  }
476
477  /**
478   * Prepares subscriber and publisher pair (by subscribing the first to the latter),
479   * and then hands over the tests {@link WhiteboxTestStage} over to the test.
480   *
481   * The test stage is, like in a puppet show, used to orchestrate what each participant should do.
482   * Since this is a whitebox test, this allows the stage to completely control when and how to signal / expect signals.
483   */
484  public void subscriberTest(TestStageTestRun body) throws Throwable {
485    WhiteboxTestStage stage = new WhiteboxTestStage(env, true);
486    body.run(stage);
487  }
488
489  /**
490   * Provides a {@link WhiteboxTestStage} without performing any additional setup,
491   * like the {@link #subscriberTest(SubscriberWhiteboxVerification.TestStageTestRun)} would.
492   *
493   * Use this method to write tests in which you need full control over when and how the initial {@code subscribe} is signalled.
494   */
495  public void subscriberTestWithoutSetup(TestStageTestRun body) throws Throwable {
496    WhiteboxTestStage stage = new WhiteboxTestStage(env, false);
497    body.run(stage);
498  }
499
500  /**
501   * Test for feature that MAY be implemented. This test will be marked as SKIPPED if it fails.
502   */
503  public void optionalSubscriberTestWithoutSetup(TestStageTestRun body) throws Throwable {
504    try {
505      subscriberTestWithoutSetup(body);
506    } catch (Exception ex) {
507      notVerified("Skipped because tested publisher does NOT implement this OPTIONAL requirement.");
508    }
509  }
510
511  public class WhiteboxTestStage extends ManualPublisher<T> {
512    public Publisher<T> pub;
513    public ManualSubscriber<T> tees; // gives us access to a stream T values
514    public WhiteboxSubscriberProbe<T> probe;
515
516    public T lastT = null;
517
518    public WhiteboxTestStage(TestEnvironment env) throws InterruptedException {
519      this(env, true);
520    }
521
522    public WhiteboxTestStage(TestEnvironment env, boolean runDefaultInit) throws InterruptedException {
523      super(env);
524      if (runDefaultInit) {
525        pub = this.createHelperPublisher(Long.MAX_VALUE);
526        tees = env.newManualSubscriber(pub);
527        probe = new WhiteboxSubscriberProbe<T>(env, subscriber);
528        subscribe(createSubscriber(probe));
529        probe.puppet.expectCompletion(env.defaultTimeoutMillis(), String.format("Subscriber %s did not `registerOnSubscribe`", sub()));
530      }
531    }
532
533    public Subscriber<? super T> sub() {
534      return subscriber.value();
535    }
536
537    public SubscriberPuppet puppet() {
538      return probe.puppet();
539    }
540
541    public WhiteboxSubscriberProbe<T> probe() {
542      return probe;
543    }
544
545    public Publisher<T> createHelperPublisher(long elements) {
546      return SubscriberWhiteboxVerification.this.createHelperPublisher(elements);
547    }
548
549    public WhiteboxSubscriberProbe<T> createWhiteboxSubscriberProbe(TestEnvironment env) {
550      return new WhiteboxSubscriberProbe<T>(env, subscriber);
551    }
552
553    public T signalNext() throws InterruptedException {
554      return signalNext(nextT());
555    }
556
557    private T signalNext(T element) throws InterruptedException {
558      sendNext(element);
559      return element;
560    }
561
562    public T nextT() throws InterruptedException {
563      lastT = tees.requestNextElement();
564      return lastT;
565    }
566
567    public void verifyNoAsyncErrors() {
568      env.verifyNoAsyncErrors();
569    }
570  }
571
572  /**
573   * This class is intented to be used as {@code Subscriber} decorator and should be used in {@code pub.subscriber(...)} calls,
574   * in order to allow intercepting calls on the underlying {@code Subscriber}.
575   * This delegation allows the proxy to implement {@link BlackboxProbe} assertions.
576   */
577  public static class BlackboxSubscriberProxy<T> extends BlackboxProbe<T> implements Subscriber<T> {
578
579    public BlackboxSubscriberProxy(TestEnvironment env, Subscriber<T> subscriber) {
580      super(env, Promise.<Subscriber<? super T>>completed(env, subscriber));
581    }
582
583    @Override
584    public void onSubscribe(Subscription s) {
585      sub().onSubscribe(s);
586    }
587
588    @Override
589    public void onNext(T t) {
590      registerOnNext(t);
591      sub().onNext(t);
592    }
593
594    @Override
595    public void onError(Throwable cause) {
596      registerOnError(cause);
597      sub().onError(cause);
598    }
599
600    @Override
601    public void onComplete() {
602      registerOnComplete();
603      sub().onComplete();
604    }
605  }
606
607  public static class BlackboxProbe<T> implements SubscriberProbe<T> {
608    protected final TestEnvironment env;
609    protected final Promise<Subscriber<? super T>> subscriber;
610
611    protected final Receptacle<T> elements;
612    protected final Promise<Throwable> error;
613
614    public BlackboxProbe(TestEnvironment env, Promise<Subscriber<? super T>> subscriber) {
615      this.env = env;
616      this.subscriber = subscriber;
617      elements = new Receptacle<T>(env);
618      error = new Promise<Throwable>(env);
619    }
620
621    @Override
622    public void registerOnNext(T element) {
623      elements.add(element);
624    }
625
626    @Override
627    public void registerOnComplete() {
628      try {
629        elements.complete();
630      } catch (IllegalStateException ex) {
631        // "Queue full", onComplete was already called
632        env.flop("subscriber::onComplete was called a second time, which is illegal according to Rule 1.7");
633      }
634    }
635
636    @Override
637    public void registerOnError(Throwable cause) {
638      try {
639        error.complete(cause);
640      } catch (IllegalStateException ex) {
641        // "Queue full", onError was already called
642        env.flop("subscriber::onError was called a second time, which is illegal according to Rule 1.7");
643      }
644    }
645
646    public T expectNext() throws InterruptedException {
647      return elements.next(env.defaultTimeoutMillis(), String.format("Subscriber %s did not call `registerOnNext(_)`", sub()));
648    }
649
650    public void expectNext(T expected) throws InterruptedException {
651      expectNext(expected, env.defaultTimeoutMillis());
652    }
653
654    public void expectNext(T expected, long timeoutMillis) throws InterruptedException {
655      T received = elements.next(timeoutMillis, String.format("Subscriber %s did not call `registerOnNext(%s)`", sub(), expected));
656      if (!received.equals(expected)) {
657        env.flop(String.format("Subscriber %s called `registerOnNext(%s)` rather than `registerOnNext(%s)`", sub(), received, expected));
658      }
659    }
660
661    public Subscriber<? super T> sub() {
662      return subscriber.value();
663    }
664
665    public void expectCompletion() throws InterruptedException {
666      expectCompletion(env.defaultTimeoutMillis());
667    }
668
669    public void expectCompletion(long timeoutMillis) throws InterruptedException {
670      expectCompletion(timeoutMillis, String.format("Subscriber %s did not call `registerOnComplete()`", sub()));
671    }
672
673    public void expectCompletion(long timeoutMillis, String msg) throws InterruptedException {
674      elements.expectCompletion(timeoutMillis, msg);
675    }
676
677    @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
678    public <E extends Throwable> void expectErrorWithMessage(Class<E> expected, String requiredMessagePart) throws InterruptedException {
679      final E err = expectError(expected);
680      String message = err.getMessage();
681      assertTrue(message.contains(requiredMessagePart),
682        String.format("Got expected exception %s but missing message [%s], was: %s", err.getClass(), requiredMessagePart, expected));
683    }
684
685    public <E extends Throwable> E expectError(Class<E> expected) throws InterruptedException {
686      return expectError(expected, env.defaultTimeoutMillis());
687    }
688
689    @SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"})
690    public <E extends Throwable> E expectError(Class<E> expected, long timeoutMillis) throws InterruptedException {
691      error.expectCompletion(timeoutMillis, String.format("Subscriber %s did not call `registerOnError(%s)`", sub(), expected));
692      if (error.value() == null) {
693        return env.flopAndFail(String.format("Subscriber %s did not call `registerOnError(%s)`", sub(), expected));
694      } else if (expected.isInstance(error.value())) {
695        return (E) error.value();
696      } else {
697        return env.flopAndFail(String.format("Subscriber %s called `registerOnError(%s)` rather than `registerOnError(%s)`", sub(), error.value(), expected));
698      }
699    }
700
701    public void expectError(Throwable expected) throws InterruptedException {
702      expectError(expected, env.defaultTimeoutMillis());
703    }
704
705    @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
706    public void expectError(Throwable expected, long timeoutMillis) throws InterruptedException {
707      error.expectCompletion(timeoutMillis, String.format("Subscriber %s did not call `registerOnError(%s)`", sub(), expected));
708      if (error.value() != expected) {
709        env.flop(String.format("Subscriber %s called `registerOnError(%s)` rather than `registerOnError(%s)`", sub(), error.value(), expected));
710      }
711    }
712
713    public void expectNone() throws InterruptedException {
714      expectNone(env.defaultTimeoutMillis());
715    }
716
717    public void expectNone(long withinMillis) throws InterruptedException {
718      elements.expectNone(withinMillis, "Expected nothing");
719    }
720
721  }
722
723  public static class WhiteboxSubscriberProbe<T> extends BlackboxProbe<T> implements SubscriberPuppeteer {
724    protected Promise<SubscriberPuppet> puppet;
725
726    public WhiteboxSubscriberProbe(TestEnvironment env, Promise<Subscriber<? super T>> subscriber) {
727      super(env, subscriber);
728      puppet = new Promise<SubscriberPuppet>(env);
729    }
730
731    private SubscriberPuppet puppet() {
732      return puppet.value();
733    }
734
735    @Override
736    public void registerOnSubscribe(SubscriberPuppet p) {
737      if (!puppet.isCompleted()) {
738        puppet.complete(p);
739      } 
740    }
741
742  }
743
744  public interface SubscriberPuppeteer {
745
746    /**
747     * Must be called by the test subscriber when it has successfully registered a subscription
748     * inside the `onSubscribe` method.
749     */
750    void registerOnSubscribe(SubscriberPuppet puppet);
751  }
752
753  public interface SubscriberProbe<T> {
754
755    /**
756     * Must be called by the test subscriber when it has received an`onNext` event.
757     */
758    void registerOnNext(T element);
759
760    /**
761     * Must be called by the test subscriber when it has received an `onComplete` event.
762     */
763    void registerOnComplete();
764
765    /**
766     * Must be called by the test subscriber when it has received an `onError` event.
767     */
768    void registerOnError(Throwable cause);
769
770  }
771
772  public interface SubscriberPuppet {
773    void triggerRequest(long elements);
774
775    void signalCancel();
776  }
777
778  public void notVerified() {
779    throw new SkipException("Not verified using this TCK.");
780  }
781
782  public void notVerified(String msg) {
783    throw new SkipException(msg);
784  }
785}