001/************************************************************************
002 * Licensed under Public Domain (CC0)                                    *
003 *                                                                       *
004 * To the extent possible under law, the person who associated CC0 with  *
005 * this code has waived all copyright and related or neighboring         *
006 * rights to this code.                                                  *
007 *                                                                       *
008 * You should have received a copy of the CC0 legalcode along with this  *
009 * work. If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.*
010 ************************************************************************/
011
012package org.reactivestreams.tck;
013
014import org.reactivestreams.Publisher;
015import org.reactivestreams.Subscriber;
016import org.reactivestreams.Subscription;
017import org.reactivestreams.tck.TestEnvironment.*;
018import org.reactivestreams.tck.flow.support.Optional;
019import org.reactivestreams.tck.flow.support.SubscriberWhiteboxVerificationRules;
020import org.reactivestreams.tck.flow.support.TestException;
021import org.testng.SkipException;
022import org.testng.annotations.AfterClass;
023import org.testng.annotations.BeforeClass;
024import org.testng.annotations.BeforeMethod;
025import org.testng.annotations.Test;
026
027import java.util.concurrent.ExecutorService;
028import java.util.concurrent.Executors;
029
030import static org.testng.Assert.assertTrue;
031
032/**
033 * Provides whitebox style tests for verifying {@link org.reactivestreams.Subscriber}
034 * and {@link org.reactivestreams.Subscription} specification rules.
035 *
036 * @see org.reactivestreams.Subscriber
037 * @see org.reactivestreams.Subscription
038 */
039public abstract class SubscriberWhiteboxVerification<T> extends WithHelperPublisher<T>
040  implements SubscriberWhiteboxVerificationRules {
041
042  private final TestEnvironment env;
043
044  protected SubscriberWhiteboxVerification(TestEnvironment env) {
045    this.env = env;
046  }
047
048  // USER API
049
050  /**
051   * This is the main method you must implement in your test incarnation.
052   * It must create a new {@link org.reactivestreams.Subscriber} instance to be subjected to the testing logic.
053   *
054   * In order to be meaningfully testable your Subscriber must inform the given
055   * `WhiteboxSubscriberProbe` of the respective events having been received.
056   */
057  public abstract Subscriber<T> createSubscriber(WhiteboxSubscriberProbe<T> probe);
058
059  // ENV SETUP
060
061  /**
062   * Executor service used by the default provided asynchronous Publisher.
063   * @see #createHelperPublisher(long)
064   */
065  private ExecutorService publisherExecutor;
066  @BeforeClass public void startPublisherExecutorService() { publisherExecutor = Executors.newFixedThreadPool(4); }
067  @AfterClass public void shutdownPublisherExecutorService() { if (publisherExecutor != null) publisherExecutor.shutdown(); }
068  @Override public ExecutorService publisherExecutorService() { return publisherExecutor; }
069
070  ////////////////////// TEST ENV CLEANUP /////////////////////////////////////
071
072  @BeforeMethod
073  public void setUp() throws Exception {
074    env.clearAsyncErrors();
075  }
076
077  ////////////////////// TEST SETUP VERIFICATION //////////////////////////////
078
079  @Test
080  public void required_exerciseWhiteboxHappyPath() throws Throwable {
081    subscriberTest(new TestStageTestRun() {
082      @Override
083      public void run(WhiteboxTestStage stage) throws InterruptedException {
084        stage.puppet().triggerRequest(1);
085        stage.puppet().triggerRequest(1);
086
087        long receivedRequests = stage.expectRequest();
088
089        stage.signalNext();
090        stage.probe.expectNext(stage.lastT);
091
092        stage.puppet().triggerRequest(1);
093        if (receivedRequests == 1) {
094          stage.expectRequest();
095        }
096
097        stage.signalNext();
098        stage.probe.expectNext(stage.lastT);
099
100        stage.puppet().signalCancel();
101        stage.expectCancelling();
102
103        stage.verifyNoAsyncErrors();
104      }
105    });
106  }
107
108  ////////////////////// SPEC RULE VERIFICATION ///////////////////////////////
109
110  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.1
111  @Override @Test
112  public void required_spec201_mustSignalDemandViaSubscriptionRequest() throws Throwable {
113    subscriberTest(new TestStageTestRun() {
114      @Override
115      public void run(WhiteboxTestStage stage) throws InterruptedException {
116        stage.puppet().triggerRequest(1);
117        stage.expectRequest();
118
119        stage.signalNext();
120      }
121    });
122  }
123
124  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.2
125  @Override @Test
126  public void untested_spec202_shouldAsynchronouslyDispatch() throws Exception {
127    notVerified(); // cannot be meaningfully tested, or can it?
128  }
129
130  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.3
131  @Override @Test
132  public void required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnComplete() throws Throwable {
133    subscriberTestWithoutSetup(new TestStageTestRun() {
134      @Override
135      public void run(WhiteboxTestStage stage) throws Throwable {
136        final Subscription subs = new Subscription() {
137          @Override
138          public void request(long n) {
139            final Optional<StackTraceElement> onCompleteStackTraceElement = env.findCallerMethodInStackTrace("onComplete");
140            if (onCompleteStackTraceElement.isDefined()) {
141              final StackTraceElement stackElem = onCompleteStackTraceElement.get();
142              env.flop(String.format("Subscription::request MUST NOT be called from Subscriber::onComplete (Rule 2.3)! (Caller: %s::%s line %d)",
143                                     stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber()));
144            }
145          }
146
147          @Override
148          public void cancel() {
149            final Optional<StackTraceElement> onCompleteStackElement = env.findCallerMethodInStackTrace("onComplete");
150            if (onCompleteStackElement.isDefined()) {
151              final StackTraceElement stackElem = onCompleteStackElement.get();
152              env.flop(String.format("Subscription::cancel MUST NOT be called from Subscriber::onComplete (Rule 2.3)! (Caller: %s::%s line %d)",
153                                     stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber()));
154            }
155          }
156        };
157
158        stage.probe = stage.createWhiteboxSubscriberProbe(env);
159        final Subscriber<T> sub = createSubscriber(stage.probe);
160
161        sub.onSubscribe(subs);
162        sub.onComplete();
163
164        env.verifyNoAsyncErrorsNoDelay();
165      }
166    });
167  }
168
169  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.3
170  @Override @Test
171  public void required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnError() throws Throwable {
172    subscriberTestWithoutSetup(new TestStageTestRun() {
173      @Override
174      public void run(WhiteboxTestStage stage) throws Throwable {
175        final Subscription subs = new Subscription() {
176          @Override
177          public void request(long n) {
178            Throwable thr = new Throwable();
179            for (StackTraceElement stackElem : thr.getStackTrace()) {
180              if (stackElem.getMethodName().equals("onError")) {
181                env.flop(String.format("Subscription::request MUST NOT be called from Subscriber::onError (Rule 2.3)! (Caller: %s::%s line %d)",
182                                       stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber()));
183              }
184            }
185          }
186
187          @Override
188          public void cancel() {
189            Throwable thr = new Throwable();
190            for (StackTraceElement stackElem : thr.getStackTrace()) {
191              if (stackElem.getMethodName().equals("onError")) {
192                env.flop(String.format("Subscription::cancel MUST NOT be called from Subscriber::onError (Rule 2.3)! (Caller: %s::%s line %d)",
193                                       stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber()));
194              }
195            }
196          }
197        };
198
199        stage.probe = stage.createWhiteboxSubscriberProbe(env);
200        final Subscriber<T> sub = createSubscriber(stage.probe);
201
202        sub.onSubscribe(subs);
203        sub.onError(new TestException());
204
205        env.verifyNoAsyncErrorsNoDelay();
206      }
207    });
208  }
209
210  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.4
211  @Override @Test
212  public void untested_spec204_mustConsiderTheSubscriptionAsCancelledInAfterRecievingOnCompleteOrOnError() throws Exception {
213    notVerified(); // cannot be meaningfully tested, or can it?
214  }
215
216  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.5
217  @Override @Test
218  public void required_spec205_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal() throws Throwable {
219    subscriberTest(new TestStageTestRun() {
220      @Override
221      public void run(WhiteboxTestStage stage) throws Throwable {
222        // try to subscribe another time, if the subscriber calls `probe.registerOnSubscribe` the test will fail
223        final Latch secondSubscriptionCancelled = new Latch(env);
224        final Subscriber<? super T> sub = stage.sub();
225        final Subscription subscription = new Subscription() {
226          @Override
227          public void request(long elements) {
228            // ignore...
229          }
230
231          @Override
232          public void cancel() {
233            secondSubscriptionCancelled.close();
234          }
235
236          @Override
237          public String toString() {
238            return "SecondSubscription(should get cancelled)";
239          }
240        };
241        sub.onSubscribe(subscription);
242
243        secondSubscriptionCancelled.expectClose("Expected 2nd Subscription given to subscriber to be cancelled, but `Subscription.cancel()` was not called");
244        env.verifyNoAsyncErrors();
245      }
246    });
247  }
248
249  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.6
250  @Override @Test
251  public void untested_spec206_mustCallSubscriptionCancelIfItIsNoLongerValid() throws Exception {
252    notVerified(); // cannot be meaningfully tested, or can it?
253  }
254
255  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.7
256  @Override @Test
257  public void untested_spec207_mustEnsureAllCallsOnItsSubscriptionTakePlaceFromTheSameThreadOrTakeCareOfSynchronization() throws Exception {
258    notVerified(); // cannot be meaningfully tested, or can it?
259    // the same thread part of the clause can be verified but that is not very useful, or is it?
260  }
261
262  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.8
263  @Override @Test
264  public void required_spec208_mustBePreparedToReceiveOnNextSignalsAfterHavingCalledSubscriptionCancel() throws Throwable {
265    subscriberTest(new TestStageTestRun() {
266      @Override
267      public void run(WhiteboxTestStage stage) throws InterruptedException {
268        stage.puppet().triggerRequest(1);
269        stage.expectRequest();
270        stage.puppet().signalCancel();
271        stage.expectCancelling();
272        stage.signalNext();
273
274        stage.puppet().triggerRequest(1);
275        stage.puppet().triggerRequest(1);
276
277        stage.verifyNoAsyncErrors();
278      }
279    });
280  }
281
282  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.9
283  @Override @Test
284  public void required_spec209_mustBePreparedToReceiveAnOnCompleteSignalWithPrecedingRequestCall() throws Throwable {
285    subscriberTest(new TestStageTestRun() {
286      @Override
287      public void run(WhiteboxTestStage stage) throws InterruptedException {
288        stage.puppet().triggerRequest(1);
289        stage.sendCompletion();
290        stage.probe.expectCompletion();
291
292        stage.verifyNoAsyncErrors();
293      }
294    });
295  }
296
297  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.9
298  @Override @Test
299  public void required_spec209_mustBePreparedToReceiveAnOnCompleteSignalWithoutPrecedingRequestCall() throws Throwable {
300    subscriberTest(new TestStageTestRun() {
301      @Override
302      public void run(WhiteboxTestStage stage) throws InterruptedException {
303        stage.sendCompletion();
304        stage.probe.expectCompletion();
305
306        stage.verifyNoAsyncErrors();
307      }
308    });
309  }
310
311  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.10
312  @Override @Test
313  public void required_spec210_mustBePreparedToReceiveAnOnErrorSignalWithPrecedingRequestCall() throws Throwable {
314    subscriberTest(new TestStageTestRun() {
315      @Override
316      public void run(WhiteboxTestStage stage) throws InterruptedException {
317        stage.puppet().triggerRequest(1);
318        stage.puppet().triggerRequest(1);
319
320        Exception ex = new TestException();
321        stage.sendError(ex);
322        stage.probe.expectError(ex);
323
324        env.verifyNoAsyncErrorsNoDelay();
325      }
326    });
327  }
328
329  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.10
330  @Override @Test
331  public void required_spec210_mustBePreparedToReceiveAnOnErrorSignalWithoutPrecedingRequestCall() throws Throwable {
332    subscriberTest(new TestStageTestRun() {
333      @Override
334      public void run(WhiteboxTestStage stage) throws InterruptedException {
335        Exception ex = new TestException();
336        stage.sendError(ex);
337        stage.probe.expectError(ex);
338
339        env.verifyNoAsyncErrorsNoDelay();
340      }
341    });
342  }
343
344  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.11
345  @Override @Test
346  public void untested_spec211_mustMakeSureThatAllCallsOnItsMethodsHappenBeforeTheProcessingOfTheRespectiveEvents() throws Exception {
347    notVerified(); // cannot be meaningfully tested, or can it?
348  }
349
350  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.12
351  @Override @Test
352  public void untested_spec212_mustNotCallOnSubscribeMoreThanOnceBasedOnObjectEquality_specViolation() throws Throwable {
353    notVerified(); // cannot be meaningfully tested, or can it?
354  }
355
356  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.13
357  @Override @Test
358  public void untested_spec213_failingOnSignalInvocation() throws Exception {
359    notVerified(); // cannot be meaningfully tested, or can it?
360  }
361
362  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.13
363  @Override @Test
364  public void required_spec213_onSubscribe_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable {
365    subscriberTest(new TestStageTestRun() {
366      @Override
367      public void run(WhiteboxTestStage stage) throws Throwable {
368
369        final Subscriber<? super T> sub = stage.sub();
370        boolean gotNPE = false;
371        try {
372          sub.onSubscribe(null);
373        } catch (final NullPointerException expected) {
374          gotNPE = true;
375        }
376
377        assertTrue(gotNPE, "onSubscribe(null) did not throw NullPointerException");
378        env.verifyNoAsyncErrorsNoDelay();
379      }
380    });
381  }
382
383  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.13
384  @Override @Test
385  public void required_spec213_onNext_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable {
386    subscriberTest(new TestStageTestRun() {
387      @Override
388      public void run(WhiteboxTestStage stage) throws Throwable {
389
390        final Subscriber<? super T> sub = stage.sub();
391        boolean gotNPE = false;
392        try {
393          sub.onNext(null);
394        } catch (final NullPointerException expected) {
395          gotNPE = true;
396        }
397
398        assertTrue(gotNPE, "onNext(null) did not throw NullPointerException");
399        env.verifyNoAsyncErrorsNoDelay();
400      }
401    });
402  }
403
404  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.13
405  @Override @Test
406  public void required_spec213_onError_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable {
407    subscriberTest(new TestStageTestRun() {
408      @Override
409      public void run(WhiteboxTestStage stage) throws Throwable {
410
411          final Subscriber<? super T> sub = stage.sub();
412          boolean gotNPE = false;
413          try {
414            sub.onError(null);
415          } catch (final NullPointerException expected) {
416            gotNPE = true;
417          } finally {
418            assertTrue(gotNPE, "onError(null) did not throw NullPointerException");
419          }
420
421        env.verifyNoAsyncErrorsNoDelay();
422      }
423    });
424  }
425
426
427  ////////////////////// SUBSCRIPTION SPEC RULE VERIFICATION //////////////////
428
429  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.1
430  @Override @Test
431  public void untested_spec301_mustNotBeCalledOutsideSubscriberContext() throws Exception {
432    notVerified(); // cannot be meaningfully tested, or can it?
433  }
434
435  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.8
436  @Override @Test
437  public void required_spec308_requestMustRegisterGivenNumberElementsToBeProduced() throws Throwable {
438    subscriberTest(new TestStageTestRun() {
439      @Override
440      public void run(WhiteboxTestStage stage) throws InterruptedException {
441        stage.puppet().triggerRequest(2);
442        long requestedElements = stage.expectRequest();
443        stage.probe.expectNext(stage.signalNext());
444        // Some subscribers may only request one element at a time.
445        if (requestedElements < 2) {
446          stage.expectRequest();
447        }
448        stage.probe.expectNext(stage.signalNext());
449
450        stage.probe.expectNone();
451        stage.puppet().triggerRequest(3);
452
453        stage.verifyNoAsyncErrors();
454      }
455    });
456  }
457
458  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.10
459  @Override @Test
460  public void untested_spec310_requestMaySynchronouslyCallOnNextOnSubscriber() throws Exception {
461    notVerified(); // cannot be meaningfully tested, or can it?
462  }
463
464  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.11
465  @Override @Test
466  public void untested_spec311_requestMaySynchronouslyCallOnCompleteOrOnError() throws Exception {
467    notVerified(); // cannot be meaningfully tested, or can it?
468  }
469
470  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.14
471  @Override @Test
472  public void untested_spec314_cancelMayCauseThePublisherToShutdownIfNoOtherSubscriptionExists() throws Exception {
473    notVerified(); // cannot be meaningfully tested, or can it?
474  }
475
476  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.15
477  @Override @Test
478  public void untested_spec315_cancelMustNotThrowExceptionAndMustSignalOnError() throws Exception {
479    notVerified(); // cannot be meaningfully tested, or can it?
480  }
481
482  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.16
483  @Override @Test
484  public void untested_spec316_requestMustNotThrowExceptionAndMustOnErrorTheSubscriber() throws Exception {
485    notVerified(); // cannot be meaningfully tested, or can it?
486  }
487
488  /////////////////////// ADDITIONAL "COROLLARY" TESTS ////////////////////////
489
490  /////////////////////// TEST INFRASTRUCTURE /////////////////////////////////
491
492  abstract class TestStageTestRun {
493    public abstract void run(WhiteboxTestStage stage) throws Throwable;
494  }
495
496  /**
497   * Prepares subscriber and publisher pair (by subscribing the first to the latter),
498   * and then hands over the tests {@link WhiteboxTestStage} over to the test.
499   *
500   * The test stage is, like in a puppet show, used to orchestrate what each participant should do.
501   * Since this is a whitebox test, this allows the stage to completely control when and how to signal / expect signals.
502   */
503  public void subscriberTest(TestStageTestRun body) throws Throwable {
504    WhiteboxTestStage stage = new WhiteboxTestStage(env, true);
505    body.run(stage);
506  }
507
508  /**
509   * Provides a {@link WhiteboxTestStage} without performing any additional setup,
510   * like the {@link #subscriberTest(SubscriberWhiteboxVerification.TestStageTestRun)} would.
511   *
512   * Use this method to write tests in which you need full control over when and how the initial {@code subscribe} is signalled.
513   */
514  public void subscriberTestWithoutSetup(TestStageTestRun body) throws Throwable {
515    WhiteboxTestStage stage = new WhiteboxTestStage(env, false);
516    body.run(stage);
517  }
518
519  /**
520   * Test for feature that MAY be implemented. This test will be marked as SKIPPED if it fails.
521   */
522  public void optionalSubscriberTestWithoutSetup(TestStageTestRun body) throws Throwable {
523    try {
524      subscriberTestWithoutSetup(body);
525    } catch (Exception ex) {
526      notVerified("Skipped because tested publisher does NOT implement this OPTIONAL requirement.");
527    }
528  }
529
530  public class WhiteboxTestStage extends ManualPublisher<T> {
531    public Publisher<T> pub;
532    public ManualSubscriber<T> tees; // gives us access to a stream T values
533    public WhiteboxSubscriberProbe<T> probe;
534
535    public T lastT = null;
536
537    public WhiteboxTestStage(TestEnvironment env) throws InterruptedException {
538      this(env, true);
539    }
540
541    public WhiteboxTestStage(TestEnvironment env, boolean runDefaultInit) throws InterruptedException {
542      super(env);
543      if (runDefaultInit) {
544        pub = this.createHelperPublisher(Long.MAX_VALUE);
545        tees = env.newManualSubscriber(pub);
546        probe = new WhiteboxSubscriberProbe<T>(env, subscriber);
547        subscribe(createSubscriber(probe));
548        probe.puppet.expectCompletion(env.defaultTimeoutMillis(), String.format("Subscriber %s did not `registerOnSubscribe`", sub()));
549        env.verifyNoAsyncErrorsNoDelay();
550      }
551    }
552
553    public Subscriber<? super T> sub() {
554      return subscriber.value();
555    }
556
557    public SubscriberPuppet puppet() {
558      return probe.puppet();
559    }
560
561    public WhiteboxSubscriberProbe<T> probe() {
562      return probe;
563    }
564
565    public Publisher<T> createHelperPublisher(long elements) {
566      return SubscriberWhiteboxVerification.this.createHelperPublisher(elements);
567    }
568
569    public WhiteboxSubscriberProbe<T> createWhiteboxSubscriberProbe(TestEnvironment env) {
570      return new WhiteboxSubscriberProbe<T>(env, subscriber);
571    }
572
573    public T signalNext() throws InterruptedException {
574      return signalNext(nextT());
575    }
576
577    private T signalNext(T element) throws InterruptedException {
578      sendNext(element);
579      return element;
580    }
581
582    public T nextT() throws InterruptedException {
583      lastT = tees.requestNextElement();
584      return lastT;
585    }
586
587    public void verifyNoAsyncErrors() {
588      env.verifyNoAsyncErrors();
589    }
590  }
591
592  /**
593   * This class is intented to be used as {@code Subscriber} decorator and should be used in {@code pub.subscriber(...)} calls,
594   * in order to allow intercepting calls on the underlying {@code Subscriber}.
595   * This delegation allows the proxy to implement {@link BlackboxProbe} assertions.
596   */
597  public static class BlackboxSubscriberProxy<T> extends BlackboxProbe<T> implements Subscriber<T> {
598
599    public BlackboxSubscriberProxy(TestEnvironment env, Subscriber<T> subscriber) {
600      super(env, Promise.<Subscriber<? super T>>completed(env, subscriber));
601    }
602
603    @Override
604    public void onSubscribe(Subscription s) {
605      sub().onSubscribe(s);
606    }
607
608    @Override
609    public void onNext(T t) {
610      registerOnNext(t);
611      sub().onNext(t);
612    }
613
614    @Override
615    public void onError(Throwable cause) {
616      registerOnError(cause);
617      sub().onError(cause);
618    }
619
620    @Override
621    public void onComplete() {
622      registerOnComplete();
623      sub().onComplete();
624    }
625  }
626
627  public static class BlackboxProbe<T> implements SubscriberProbe<T> {
628    protected final TestEnvironment env;
629    protected final Promise<Subscriber<? super T>> subscriber;
630
631    protected final Receptacle<T> elements;
632    protected final Promise<Throwable> error;
633
634    public BlackboxProbe(TestEnvironment env, Promise<Subscriber<? super T>> subscriber) {
635      this.env = env;
636      this.subscriber = subscriber;
637      elements = new Receptacle<T>(env);
638      error = new Promise<Throwable>(env);
639    }
640
641    @Override
642    public void registerOnNext(T element) {
643      elements.add(element);
644    }
645
646    @Override
647    public void registerOnComplete() {
648      try {
649        elements.complete();
650      } catch (IllegalStateException ex) {
651        // "Queue full", onComplete was already called
652        env.flop("subscriber::onComplete was called a second time, which is illegal according to Rule 1.7");
653      }
654    }
655
656    @Override
657    public void registerOnError(Throwable cause) {
658      try {
659        error.complete(cause);
660      } catch (IllegalStateException ex) {
661        // "Queue full", onError was already called
662        env.flop("subscriber::onError was called a second time, which is illegal according to Rule 1.7");
663      }
664    }
665
666    public T expectNext() throws InterruptedException {
667      return elements.next(env.defaultTimeoutMillis(), String.format("Subscriber %s did not call `registerOnNext(_)`", sub()));
668    }
669
670    public void expectNext(T expected) throws InterruptedException {
671      expectNext(expected, env.defaultTimeoutMillis());
672    }
673
674    public void expectNext(T expected, long timeoutMillis) throws InterruptedException {
675      T received = elements.next(timeoutMillis, String.format("Subscriber %s did not call `registerOnNext(%s)`", sub(), expected));
676      if (!received.equals(expected)) {
677        env.flop(String.format("Subscriber %s called `registerOnNext(%s)` rather than `registerOnNext(%s)`", sub(), received, expected));
678      }
679    }
680
681    public Subscriber<? super T> sub() {
682      return subscriber.value();
683    }
684
685    public void expectCompletion() throws InterruptedException {
686      expectCompletion(env.defaultTimeoutMillis());
687    }
688
689    public void expectCompletion(long timeoutMillis) throws InterruptedException {
690      expectCompletion(timeoutMillis, String.format("Subscriber %s did not call `registerOnComplete()`", sub()));
691    }
692
693    public void expectCompletion(long timeoutMillis, String msg) throws InterruptedException {
694      elements.expectCompletion(timeoutMillis, msg);
695    }
696
697    @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
698    public <E extends Throwable> void expectErrorWithMessage(Class<E> expected, String requiredMessagePart) throws InterruptedException {
699      final E err = expectError(expected);
700      String message = err.getMessage();
701      assertTrue(message.contains(requiredMessagePart),
702        String.format("Got expected exception %s but missing message [%s], was: %s", err.getClass(), requiredMessagePart, expected));
703    }
704
705    public <E extends Throwable> E expectError(Class<E> expected) throws InterruptedException {
706      return expectError(expected, env.defaultTimeoutMillis());
707    }
708
709    @SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"})
710    public <E extends Throwable> E expectError(Class<E> expected, long timeoutMillis) throws InterruptedException {
711      error.expectCompletion(timeoutMillis, String.format("Subscriber %s did not call `registerOnError(%s)`", sub(), expected));
712      if (error.value() == null) {
713        return env.flopAndFail(String.format("Subscriber %s did not call `registerOnError(%s)`", sub(), expected));
714      } else if (expected.isInstance(error.value())) {
715        return (E) error.value();
716      } else {
717        return env.flopAndFail(String.format("Subscriber %s called `registerOnError(%s)` rather than `registerOnError(%s)`", sub(), error.value(), expected));
718      }
719    }
720
721    public void expectError(Throwable expected) throws InterruptedException {
722      expectError(expected, env.defaultTimeoutMillis());
723    }
724
725    @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
726    public void expectError(Throwable expected, long timeoutMillis) throws InterruptedException {
727      error.expectCompletion(timeoutMillis, String.format("Subscriber %s did not call `registerOnError(%s)`", sub(), expected));
728      if (error.value() != expected) {
729        env.flop(String.format("Subscriber %s called `registerOnError(%s)` rather than `registerOnError(%s)`", sub(), error.value(), expected));
730      }
731    }
732
733    public void expectNone() throws InterruptedException {
734      expectNone(env.defaultNoSignalsTimeoutMillis());
735    }
736
737    public void expectNone(long withinMillis) throws InterruptedException {
738      elements.expectNone(withinMillis, "Expected nothing");
739    }
740
741  }
742
743  public static class WhiteboxSubscriberProbe<T> extends BlackboxProbe<T> implements SubscriberPuppeteer {
744    protected Promise<SubscriberPuppet> puppet;
745
746    public WhiteboxSubscriberProbe(TestEnvironment env, Promise<Subscriber<? super T>> subscriber) {
747      super(env, subscriber);
748      puppet = new Promise<SubscriberPuppet>(env);
749    }
750
751    private SubscriberPuppet puppet() {
752      return puppet.value();
753    }
754
755    @Override
756    public void registerOnSubscribe(SubscriberPuppet p) {
757      if (!puppet.isCompleted()) {
758        puppet.complete(p);
759      }
760    }
761
762  }
763
764  public interface SubscriberPuppeteer {
765
766    /**
767     * Must be called by the test subscriber when it has successfully registered a subscription
768     * inside the `onSubscribe` method.
769     */
770    void registerOnSubscribe(SubscriberPuppet puppet);
771  }
772
773  public interface SubscriberProbe<T> {
774
775    /**
776     * Must be called by the test subscriber when it has received an`onNext` event.
777     */
778    void registerOnNext(T element);
779
780    /**
781     * Must be called by the test subscriber when it has received an `onComplete` event.
782     */
783    void registerOnComplete();
784
785    /**
786     * Must be called by the test subscriber when it has received an `onError` event.
787     */
788    void registerOnError(Throwable cause);
789
790  }
791
792  /**
793   * Implement this puppet in your Whitebox style tests.
794   * The test suite will invoke the specific trigger/signal methods requesting you to execute the specific action.
795   * Since this is a whitebox style test, you're allowed and expected to use knowladge about your implementation to
796   * make implement these calls.
797   */
798  public interface SubscriberPuppet {
799
800    /**
801     * Ensure that at least {@code elements} are eventually requested by your {@link Subscriber}, if it hasn't already
802     * requested that many elements.
803     * <p>
804     * This does not necessarily have to correlate 1:1 with a {@code Subscription.request(elements)} call, but the sum
805     * of the elements requested by your {@code Subscriber} must eventually be at least the the sum of the elements
806     * triggered to be requested by all the invocations of this method.
807     * <p>
808     * Additionally, subscribers are permitted to delay requesting elements until previous requests for elements have
809     * been fulfilled. For example, a subscriber that only requests one element at a time may fulfill the request made
810     * by this method by requesting one element {@code elements} times, waiting for each element to arrive before the
811     * next request is made.
812     * <p>
813     * Before sending any element to the subscriber, the TCK must wait for the subscriber to request that element, and
814     * must be prepared for the subscriber to only request one element at a time, it is not enough for the TCK to
815     * simply invoke this method before sending elements.
816     * <p>
817     * An invocation of {@link #signalCancel()} may be coalesced into any elements that have not yet been requested,
818     * such that only a cancel signal is emitted.
819     */
820    void triggerRequest(long elements);
821
822    /**
823     * Trigger {@code cancel()} on your {@link Subscriber}.
824     * <p>
825     * An invocation of this method may be coalesced into any outstanding requests, as requested by
826     * {@link #triggerRequest(long)}, such that only a cancel signal is emitted.
827     */
828    void signalCancel();
829  }
830
831  public void notVerified() {
832    throw new SkipException("Not verified using this TCK.");
833  }
834
835  public void notVerified(String msg) {
836    throw new SkipException(msg);
837  }
838}