001/************************************************************************
002 * Licensed under Public Domain (CC0)                                    *
003 *                                                                       *
004 * To the extent possible under law, the person who associated CC0 with  *
005 * this code has waived all copyright and related or neighboring         *
006 * rights to this code.                                                  *
007 *                                                                       *
008 * You should have received a copy of the CC0 legalcode along with this  *
009 * work. If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.*
010 ************************************************************************/
011
012package org.reactivestreams.tck;
013
014import org.reactivestreams.Publisher;
015import org.reactivestreams.Subscriber;
016import org.reactivestreams.Subscription;
017import org.reactivestreams.tck.TestEnvironment.*;
018import org.reactivestreams.tck.flow.support.Optional;
019import org.reactivestreams.tck.flow.support.SubscriberWhiteboxVerificationRules;
020import org.reactivestreams.tck.flow.support.TestException;
021import org.testng.SkipException;
022import org.testng.annotations.AfterClass;
023import org.testng.annotations.BeforeClass;
024import org.testng.annotations.BeforeMethod;
025import org.testng.annotations.Test;
026
027import java.util.concurrent.ExecutorService;
028import java.util.concurrent.Executors;
029
030import static org.testng.Assert.assertTrue;
031
032/**
033 * Provides whitebox style tests for verifying {@link org.reactivestreams.Subscriber}
034 * and {@link org.reactivestreams.Subscription} specification rules.
035 *
036 * @see org.reactivestreams.Subscriber
037 * @see org.reactivestreams.Subscription
038 */
039public abstract class SubscriberWhiteboxVerification<T> extends WithHelperPublisher<T>
040  implements SubscriberWhiteboxVerificationRules {
041
042  private final TestEnvironment env;
043
044  protected SubscriberWhiteboxVerification(TestEnvironment env) {
045    this.env = env;
046  }
047
048  // USER API
049
050  /**
051   * This is the main method you must implement in your test incarnation.
052   * It must create a new {@link org.reactivestreams.Subscriber} instance to be subjected to the testing logic.
053   *
054   * In order to be meaningfully testable your Subscriber must inform the given
055   * `WhiteboxSubscriberProbe` of the respective events having been received.
056   */
057  public abstract Subscriber<T> createSubscriber(WhiteboxSubscriberProbe<T> probe);
058
059  // ENV SETUP
060
061  /**
062   * Executor service used by the default provided asynchronous Publisher.
063   * @see #createHelperPublisher(long)
064   */
065  private ExecutorService publisherExecutor;
066  @BeforeClass public void startPublisherExecutorService() { publisherExecutor = Executors.newFixedThreadPool(4); }
067  @AfterClass public void shutdownPublisherExecutorService() { if (publisherExecutor != null) publisherExecutor.shutdown(); }
068  @Override public ExecutorService publisherExecutorService() { return publisherExecutor; }
069
070  ////////////////////// TEST ENV CLEANUP /////////////////////////////////////
071
072  @BeforeMethod
073  public void setUp() throws Exception {
074    env.clearAsyncErrors();
075  }
076
077  ////////////////////// TEST SETUP VERIFICATION //////////////////////////////
078
079  @Test
080  public void required_exerciseWhiteboxHappyPath() throws Throwable {
081    subscriberTest(new TestStageTestRun() {
082      @Override
083      public void run(WhiteboxTestStage stage) throws InterruptedException {
084        stage.puppet().triggerRequest(1);
085        stage.puppet().triggerRequest(1);
086
087        long receivedRequests = stage.expectRequest();
088
089        stage.signalNext();
090        stage.probe.expectNext(stage.lastT);
091
092        stage.puppet().triggerRequest(1);
093        if (receivedRequests == 1) {
094          stage.expectRequest();
095        }
096
097        stage.signalNext();
098        stage.probe.expectNext(stage.lastT);
099
100        stage.puppet().signalCancel();
101        stage.expectCancelling();
102
103        stage.verifyNoAsyncErrors();
104      }
105    });
106  }
107
108  ////////////////////// SPEC RULE VERIFICATION ///////////////////////////////
109
110  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.1
111  @Override @Test
112  public void required_spec201_mustSignalDemandViaSubscriptionRequest() throws Throwable {
113    subscriberTest(new TestStageTestRun() {
114      @Override
115      public void run(WhiteboxTestStage stage) throws InterruptedException {
116        stage.puppet().triggerRequest(1);
117        stage.expectRequest();
118
119        stage.signalNext();
120      }
121    });
122  }
123
124  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.2
125  @Override @Test
126  public void untested_spec202_shouldAsynchronouslyDispatch() throws Exception {
127    notVerified(); // cannot be meaningfully tested, or can it?
128  }
129
130  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.3
131  @Override @Test
132  public void required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnComplete() throws Throwable {
133    subscriberTestWithoutSetup(new TestStageTestRun() {
134      @Override
135      public void run(WhiteboxTestStage stage) throws Throwable {
136        final Subscription subs = new Subscription() {
137          @Override
138          public void request(long n) {
139            final Optional<StackTraceElement> onCompleteStackTraceElement = env.findCallerMethodInStackTrace("onComplete");
140            if (onCompleteStackTraceElement.isDefined()) {
141              final StackTraceElement stackElem = onCompleteStackTraceElement.get();
142              env.flop(String.format("Subscription::request MUST NOT be called from Subscriber::onComplete (Rule 2.3)! (Caller: %s::%s line %d)",
143                                     stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber()));
144            }
145          }
146
147          @Override
148          public void cancel() {
149            final Optional<StackTraceElement> onCompleteStackElement = env.findCallerMethodInStackTrace("onComplete");
150            if (onCompleteStackElement.isDefined()) {
151              final StackTraceElement stackElem = onCompleteStackElement.get();
152              env.flop(String.format("Subscription::cancel MUST NOT be called from Subscriber::onComplete (Rule 2.3)! (Caller: %s::%s line %d)",
153                                     stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber()));
154            }
155          }
156        };
157
158        stage.probe = stage.createWhiteboxSubscriberProbe(env);
159        final Subscriber<T> sub = createSubscriber(stage.probe);
160
161        sub.onSubscribe(subs);
162        sub.onComplete();
163
164        env.verifyNoAsyncErrorsNoDelay();
165      }
166    });
167  }
168
169  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.3
170  @Override @Test
171  public void required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnError() throws Throwable {
172    subscriberTestWithoutSetup(new TestStageTestRun() {
173      @Override
174      public void run(WhiteboxTestStage stage) throws Throwable {
175        final Subscription subs = new Subscription() {
176          @Override
177          public void request(long n) {
178            Throwable thr = new Throwable();
179            for (StackTraceElement stackElem : thr.getStackTrace()) {
180              if (stackElem.getMethodName().equals("onError")) {
181                env.flop(String.format("Subscription::request MUST NOT be called from Subscriber::onError (Rule 2.3)! (Caller: %s::%s line %d)",
182                                       stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber()));
183              }
184            }
185          }
186
187          @Override
188          public void cancel() {
189            Throwable thr = new Throwable();
190            for (StackTraceElement stackElem : thr.getStackTrace()) {
191              if (stackElem.getMethodName().equals("onError")) {
192                env.flop(String.format("Subscription::cancel MUST NOT be called from Subscriber::onError (Rule 2.3)! (Caller: %s::%s line %d)",
193                                       stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber()));
194              }
195            }
196          }
197        };
198
199        stage.probe = stage.createWhiteboxSubscriberProbe(env);
200        final Subscriber<T> sub = createSubscriber(stage.probe);
201
202        sub.onSubscribe(subs);
203        sub.onError(new TestException());
204
205        env.verifyNoAsyncErrorsNoDelay();
206      }
207    });
208  }
209
210  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.4
211  @Override @Test
212  public void untested_spec204_mustConsiderTheSubscriptionAsCancelledInAfterRecievingOnCompleteOrOnError() throws Exception {
213    notVerified(); // cannot be meaningfully tested, or can it?
214  }
215
216  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.5
217  @Override @Test
218  public void required_spec205_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal() throws Throwable {
219    subscriberTest(new TestStageTestRun() {
220      @Override
221      public void run(WhiteboxTestStage stage) throws Throwable {
222        // try to subscribe another time, if the subscriber calls `probe.registerOnSubscribe` the test will fail
223        final Latch secondSubscriptionCancelled = new Latch(env);
224        final Subscriber<? super T> sub = stage.sub();
225        final Subscription subscription = new Subscription() {
226          @Override
227          public void request(long elements) {
228            // ignore...
229          }
230
231          @Override
232          public void cancel() {
233            secondSubscriptionCancelled.close();
234          }
235
236          @Override
237          public String toString() {
238            return "SecondSubscription(should get cancelled)";
239          }
240        };
241        sub.onSubscribe(subscription);
242
243        secondSubscriptionCancelled.expectClose("Expected 2nd Subscription given to subscriber to be cancelled, but `Subscription.cancel()` was not called");
244        env.verifyNoAsyncErrors();
245      }
246    });
247  }
248
249  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.6
250  @Override @Test
251  public void untested_spec206_mustCallSubscriptionCancelIfItIsNoLongerValid() throws Exception {
252    notVerified(); // cannot be meaningfully tested, or can it?
253  }
254
255  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.7
256  @Override @Test
257  public void untested_spec207_mustEnsureAllCallsOnItsSubscriptionTakePlaceFromTheSameThreadOrTakeCareOfSynchronization() throws Exception {
258    notVerified(); // cannot be meaningfully tested, or can it?
259    // the same thread part of the clause can be verified but that is not very useful, or is it?
260  }
261
262  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.8
263  @Override @Test
264  public void required_spec208_mustBePreparedToReceiveOnNextSignalsAfterHavingCalledSubscriptionCancel() throws Throwable {
265    subscriberTest(new TestStageTestRun() {
266      @Override
267      public void run(WhiteboxTestStage stage) throws InterruptedException {
268        stage.puppet().triggerRequest(1);
269        stage.puppet().signalCancel();
270        stage.signalNext();
271
272        stage.puppet().triggerRequest(1);
273        stage.puppet().triggerRequest(1);
274
275        stage.verifyNoAsyncErrors();
276      }
277    });
278  }
279
280  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.9
281  @Override @Test
282  public void required_spec209_mustBePreparedToReceiveAnOnCompleteSignalWithPrecedingRequestCall() throws Throwable {
283    subscriberTest(new TestStageTestRun() {
284      @Override
285      public void run(WhiteboxTestStage stage) throws InterruptedException {
286        stage.puppet().triggerRequest(1);
287        stage.sendCompletion();
288        stage.probe.expectCompletion();
289
290        stage.verifyNoAsyncErrors();
291      }
292    });
293  }
294
295  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.9
296  @Override @Test
297  public void required_spec209_mustBePreparedToReceiveAnOnCompleteSignalWithoutPrecedingRequestCall() throws Throwable {
298    subscriberTest(new TestStageTestRun() {
299      @Override
300      public void run(WhiteboxTestStage stage) throws InterruptedException {
301        stage.sendCompletion();
302        stage.probe.expectCompletion();
303
304        stage.verifyNoAsyncErrors();
305      }
306    });
307  }
308
309  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.10
310  @Override @Test
311  public void required_spec210_mustBePreparedToReceiveAnOnErrorSignalWithPrecedingRequestCall() throws Throwable {
312    subscriberTest(new TestStageTestRun() {
313      @Override
314      public void run(WhiteboxTestStage stage) throws InterruptedException {
315        stage.puppet().triggerRequest(1);
316        stage.puppet().triggerRequest(1);
317
318        Exception ex = new TestException();
319        stage.sendError(ex);
320        stage.probe.expectError(ex);
321
322        env.verifyNoAsyncErrorsNoDelay();
323      }
324    });
325  }
326
327  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.10
328  @Override @Test
329  public void required_spec210_mustBePreparedToReceiveAnOnErrorSignalWithoutPrecedingRequestCall() throws Throwable {
330    subscriberTest(new TestStageTestRun() {
331      @Override
332      public void run(WhiteboxTestStage stage) throws InterruptedException {
333        Exception ex = new TestException();
334        stage.sendError(ex);
335        stage.probe.expectError(ex);
336
337        env.verifyNoAsyncErrorsNoDelay();
338      }
339    });
340  }
341
342  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.11
343  @Override @Test
344  public void untested_spec211_mustMakeSureThatAllCallsOnItsMethodsHappenBeforeTheProcessingOfTheRespectiveEvents() throws Exception {
345    notVerified(); // cannot be meaningfully tested, or can it?
346  }
347
348  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.12
349  @Override @Test
350  public void untested_spec212_mustNotCallOnSubscribeMoreThanOnceBasedOnObjectEquality_specViolation() throws Throwable {
351    notVerified(); // cannot be meaningfully tested, or can it?
352  }
353
354  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.13
355  @Override @Test
356  public void untested_spec213_failingOnSignalInvocation() throws Exception {
357    notVerified(); // cannot be meaningfully tested, or can it?
358  }
359
360  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.13
361  @Override @Test
362  public void required_spec213_onSubscribe_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable {
363    subscriberTest(new TestStageTestRun() {
364      @Override
365      public void run(WhiteboxTestStage stage) throws Throwable {
366
367        final Subscriber<? super T> sub = stage.sub();
368        boolean gotNPE = false;
369        try {
370          sub.onSubscribe(null);
371        } catch (final NullPointerException expected) {
372          gotNPE = true;
373        }
374
375        assertTrue(gotNPE, "onSubscribe(null) did not throw NullPointerException");
376        env.verifyNoAsyncErrorsNoDelay();
377      }
378    });
379  }
380
381  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.13
382  @Override @Test
383  public void required_spec213_onNext_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable {
384    subscriberTest(new TestStageTestRun() {
385      @Override
386      public void run(WhiteboxTestStage stage) throws Throwable {
387
388        final Subscriber<? super T> sub = stage.sub();
389        boolean gotNPE = false;
390        try {
391          sub.onNext(null);
392        } catch (final NullPointerException expected) {
393          gotNPE = true;
394        }
395
396        assertTrue(gotNPE, "onNext(null) did not throw NullPointerException");
397        env.verifyNoAsyncErrorsNoDelay();
398      }
399    });
400  }
401
402  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.13
403  @Override @Test
404  public void required_spec213_onError_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable {
405    subscriberTest(new TestStageTestRun() {
406      @Override
407      public void run(WhiteboxTestStage stage) throws Throwable {
408
409          final Subscriber<? super T> sub = stage.sub();
410          boolean gotNPE = false;
411          try {
412            sub.onError(null);
413          } catch (final NullPointerException expected) {
414            gotNPE = true;
415          } finally {
416            assertTrue(gotNPE, "onError(null) did not throw NullPointerException");
417          }
418
419        env.verifyNoAsyncErrorsNoDelay();
420      }
421    });
422  }
423
424
425  ////////////////////// SUBSCRIPTION SPEC RULE VERIFICATION //////////////////
426
427  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.1
428  @Override @Test
429  public void untested_spec301_mustNotBeCalledOutsideSubscriberContext() throws Exception {
430    notVerified(); // cannot be meaningfully tested, or can it?
431  }
432
433  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.8
434  @Override @Test
435  public void required_spec308_requestMustRegisterGivenNumberElementsToBeProduced() throws Throwable {
436    subscriberTest(new TestStageTestRun() {
437      @Override
438      public void run(WhiteboxTestStage stage) throws InterruptedException {
439        stage.puppet().triggerRequest(2);
440        stage.probe.expectNext(stage.signalNext());
441        stage.probe.expectNext(stage.signalNext());
442
443        stage.probe.expectNone();
444        stage.puppet().triggerRequest(3);
445
446        stage.verifyNoAsyncErrors();
447      }
448    });
449  }
450
451  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.10
452  @Override @Test
453  public void untested_spec310_requestMaySynchronouslyCallOnNextOnSubscriber() throws Exception {
454    notVerified(); // cannot be meaningfully tested, or can it?
455  }
456
457  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.11
458  @Override @Test
459  public void untested_spec311_requestMaySynchronouslyCallOnCompleteOrOnError() throws Exception {
460    notVerified(); // cannot be meaningfully tested, or can it?
461  }
462
463  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.14
464  @Override @Test
465  public void untested_spec314_cancelMayCauseThePublisherToShutdownIfNoOtherSubscriptionExists() throws Exception {
466    notVerified(); // cannot be meaningfully tested, or can it?
467  }
468
469  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.15
470  @Override @Test
471  public void untested_spec315_cancelMustNotThrowExceptionAndMustSignalOnError() throws Exception {
472    notVerified(); // cannot be meaningfully tested, or can it?
473  }
474
475  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.16
476  @Override @Test
477  public void untested_spec316_requestMustNotThrowExceptionAndMustOnErrorTheSubscriber() throws Exception {
478    notVerified(); // cannot be meaningfully tested, or can it?
479  }
480
481  /////////////////////// ADDITIONAL "COROLLARY" TESTS ////////////////////////
482
483  /////////////////////// TEST INFRASTRUCTURE /////////////////////////////////
484
485  abstract class TestStageTestRun {
486    public abstract void run(WhiteboxTestStage stage) throws Throwable;
487  }
488
489  /**
490   * Prepares subscriber and publisher pair (by subscribing the first to the latter),
491   * and then hands over the tests {@link WhiteboxTestStage} over to the test.
492   *
493   * The test stage is, like in a puppet show, used to orchestrate what each participant should do.
494   * Since this is a whitebox test, this allows the stage to completely control when and how to signal / expect signals.
495   */
496  public void subscriberTest(TestStageTestRun body) throws Throwable {
497    WhiteboxTestStage stage = new WhiteboxTestStage(env, true);
498    body.run(stage);
499  }
500
501  /**
502   * Provides a {@link WhiteboxTestStage} without performing any additional setup,
503   * like the {@link #subscriberTest(SubscriberWhiteboxVerification.TestStageTestRun)} would.
504   *
505   * Use this method to write tests in which you need full control over when and how the initial {@code subscribe} is signalled.
506   */
507  public void subscriberTestWithoutSetup(TestStageTestRun body) throws Throwable {
508    WhiteboxTestStage stage = new WhiteboxTestStage(env, false);
509    body.run(stage);
510  }
511
512  /**
513   * Test for feature that MAY be implemented. This test will be marked as SKIPPED if it fails.
514   */
515  public void optionalSubscriberTestWithoutSetup(TestStageTestRun body) throws Throwable {
516    try {
517      subscriberTestWithoutSetup(body);
518    } catch (Exception ex) {
519      notVerified("Skipped because tested publisher does NOT implement this OPTIONAL requirement.");
520    }
521  }
522
523  public class WhiteboxTestStage extends ManualPublisher<T> {
524    public Publisher<T> pub;
525    public ManualSubscriber<T> tees; // gives us access to a stream T values
526    public WhiteboxSubscriberProbe<T> probe;
527
528    public T lastT = null;
529
530    public WhiteboxTestStage(TestEnvironment env) throws InterruptedException {
531      this(env, true);
532    }
533
534    public WhiteboxTestStage(TestEnvironment env, boolean runDefaultInit) throws InterruptedException {
535      super(env);
536      if (runDefaultInit) {
537        pub = this.createHelperPublisher(Long.MAX_VALUE);
538        tees = env.newManualSubscriber(pub);
539        probe = new WhiteboxSubscriberProbe<T>(env, subscriber);
540        subscribe(createSubscriber(probe));
541        probe.puppet.expectCompletion(env.defaultTimeoutMillis(), String.format("Subscriber %s did not `registerOnSubscribe`", sub()));
542        env.verifyNoAsyncErrorsNoDelay();
543      }
544    }
545
546    public Subscriber<? super T> sub() {
547      return subscriber.value();
548    }
549
550    public SubscriberPuppet puppet() {
551      return probe.puppet();
552    }
553
554    public WhiteboxSubscriberProbe<T> probe() {
555      return probe;
556    }
557
558    public Publisher<T> createHelperPublisher(long elements) {
559      return SubscriberWhiteboxVerification.this.createHelperPublisher(elements);
560    }
561
562    public WhiteboxSubscriberProbe<T> createWhiteboxSubscriberProbe(TestEnvironment env) {
563      return new WhiteboxSubscriberProbe<T>(env, subscriber);
564    }
565
566    public T signalNext() throws InterruptedException {
567      return signalNext(nextT());
568    }
569
570    private T signalNext(T element) throws InterruptedException {
571      sendNext(element);
572      return element;
573    }
574
575    public T nextT() throws InterruptedException {
576      lastT = tees.requestNextElement();
577      return lastT;
578    }
579
580    public void verifyNoAsyncErrors() {
581      env.verifyNoAsyncErrors();
582    }
583  }
584
585  /**
586   * This class is intented to be used as {@code Subscriber} decorator and should be used in {@code pub.subscriber(...)} calls,
587   * in order to allow intercepting calls on the underlying {@code Subscriber}.
588   * This delegation allows the proxy to implement {@link BlackboxProbe} assertions.
589   */
590  public static class BlackboxSubscriberProxy<T> extends BlackboxProbe<T> implements Subscriber<T> {
591
592    public BlackboxSubscriberProxy(TestEnvironment env, Subscriber<T> subscriber) {
593      super(env, Promise.<Subscriber<? super T>>completed(env, subscriber));
594    }
595
596    @Override
597    public void onSubscribe(Subscription s) {
598      sub().onSubscribe(s);
599    }
600
601    @Override
602    public void onNext(T t) {
603      registerOnNext(t);
604      sub().onNext(t);
605    }
606
607    @Override
608    public void onError(Throwable cause) {
609      registerOnError(cause);
610      sub().onError(cause);
611    }
612
613    @Override
614    public void onComplete() {
615      registerOnComplete();
616      sub().onComplete();
617    }
618  }
619
620  public static class BlackboxProbe<T> implements SubscriberProbe<T> {
621    protected final TestEnvironment env;
622    protected final Promise<Subscriber<? super T>> subscriber;
623
624    protected final Receptacle<T> elements;
625    protected final Promise<Throwable> error;
626
627    public BlackboxProbe(TestEnvironment env, Promise<Subscriber<? super T>> subscriber) {
628      this.env = env;
629      this.subscriber = subscriber;
630      elements = new Receptacle<T>(env);
631      error = new Promise<Throwable>(env);
632    }
633
634    @Override
635    public void registerOnNext(T element) {
636      elements.add(element);
637    }
638
639    @Override
640    public void registerOnComplete() {
641      try {
642        elements.complete();
643      } catch (IllegalStateException ex) {
644        // "Queue full", onComplete was already called
645        env.flop("subscriber::onComplete was called a second time, which is illegal according to Rule 1.7");
646      }
647    }
648
649    @Override
650    public void registerOnError(Throwable cause) {
651      try {
652        error.complete(cause);
653      } catch (IllegalStateException ex) {
654        // "Queue full", onError was already called
655        env.flop("subscriber::onError was called a second time, which is illegal according to Rule 1.7");
656      }
657    }
658
659    public T expectNext() throws InterruptedException {
660      return elements.next(env.defaultTimeoutMillis(), String.format("Subscriber %s did not call `registerOnNext(_)`", sub()));
661    }
662
663    public void expectNext(T expected) throws InterruptedException {
664      expectNext(expected, env.defaultTimeoutMillis());
665    }
666
667    public void expectNext(T expected, long timeoutMillis) throws InterruptedException {
668      T received = elements.next(timeoutMillis, String.format("Subscriber %s did not call `registerOnNext(%s)`", sub(), expected));
669      if (!received.equals(expected)) {
670        env.flop(String.format("Subscriber %s called `registerOnNext(%s)` rather than `registerOnNext(%s)`", sub(), received, expected));
671      }
672    }
673
674    public Subscriber<? super T> sub() {
675      return subscriber.value();
676    }
677
678    public void expectCompletion() throws InterruptedException {
679      expectCompletion(env.defaultTimeoutMillis());
680    }
681
682    public void expectCompletion(long timeoutMillis) throws InterruptedException {
683      expectCompletion(timeoutMillis, String.format("Subscriber %s did not call `registerOnComplete()`", sub()));
684    }
685
686    public void expectCompletion(long timeoutMillis, String msg) throws InterruptedException {
687      elements.expectCompletion(timeoutMillis, msg);
688    }
689
690    @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
691    public <E extends Throwable> void expectErrorWithMessage(Class<E> expected, String requiredMessagePart) throws InterruptedException {
692      final E err = expectError(expected);
693      String message = err.getMessage();
694      assertTrue(message.contains(requiredMessagePart),
695        String.format("Got expected exception %s but missing message [%s], was: %s", err.getClass(), requiredMessagePart, expected));
696    }
697
698    public <E extends Throwable> E expectError(Class<E> expected) throws InterruptedException {
699      return expectError(expected, env.defaultTimeoutMillis());
700    }
701
702    @SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"})
703    public <E extends Throwable> E expectError(Class<E> expected, long timeoutMillis) throws InterruptedException {
704      error.expectCompletion(timeoutMillis, String.format("Subscriber %s did not call `registerOnError(%s)`", sub(), expected));
705      if (error.value() == null) {
706        return env.flopAndFail(String.format("Subscriber %s did not call `registerOnError(%s)`", sub(), expected));
707      } else if (expected.isInstance(error.value())) {
708        return (E) error.value();
709      } else {
710        return env.flopAndFail(String.format("Subscriber %s called `registerOnError(%s)` rather than `registerOnError(%s)`", sub(), error.value(), expected));
711      }
712    }
713
714    public void expectError(Throwable expected) throws InterruptedException {
715      expectError(expected, env.defaultTimeoutMillis());
716    }
717
718    @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
719    public void expectError(Throwable expected, long timeoutMillis) throws InterruptedException {
720      error.expectCompletion(timeoutMillis, String.format("Subscriber %s did not call `registerOnError(%s)`", sub(), expected));
721      if (error.value() != expected) {
722        env.flop(String.format("Subscriber %s called `registerOnError(%s)` rather than `registerOnError(%s)`", sub(), error.value(), expected));
723      }
724    }
725
726    public void expectNone() throws InterruptedException {
727      expectNone(env.defaultNoSignalsTimeoutMillis());
728    }
729
730    public void expectNone(long withinMillis) throws InterruptedException {
731      elements.expectNone(withinMillis, "Expected nothing");
732    }
733
734  }
735
736  public static class WhiteboxSubscriberProbe<T> extends BlackboxProbe<T> implements SubscriberPuppeteer {
737    protected Promise<SubscriberPuppet> puppet;
738
739    public WhiteboxSubscriberProbe(TestEnvironment env, Promise<Subscriber<? super T>> subscriber) {
740      super(env, subscriber);
741      puppet = new Promise<SubscriberPuppet>(env);
742    }
743
744    private SubscriberPuppet puppet() {
745      return puppet.value();
746    }
747
748    @Override
749    public void registerOnSubscribe(SubscriberPuppet p) {
750      if (!puppet.isCompleted()) {
751        puppet.complete(p);
752      }
753    }
754
755  }
756
757  public interface SubscriberPuppeteer {
758
759    /**
760     * Must be called by the test subscriber when it has successfully registered a subscription
761     * inside the `onSubscribe` method.
762     */
763    void registerOnSubscribe(SubscriberPuppet puppet);
764  }
765
766  public interface SubscriberProbe<T> {
767
768    /**
769     * Must be called by the test subscriber when it has received an`onNext` event.
770     */
771    void registerOnNext(T element);
772
773    /**
774     * Must be called by the test subscriber when it has received an `onComplete` event.
775     */
776    void registerOnComplete();
777
778    /**
779     * Must be called by the test subscriber when it has received an `onError` event.
780     */
781    void registerOnError(Throwable cause);
782
783  }
784
785  /**
786   * Implement this puppet in your Whitebox style tests.
787   * The test suite will invoke the specific trigger/signal methods requesting you to execute the specific action.
788   * Since this is a whitebox style test, you're allowed and expected to use knowladge about your implementation to
789   * make implement these calls.
790   */
791  public interface SubscriberPuppet {
792    /**
793     * Trigger {@code request(elements)} on your {@link Subscriber}
794     */
795    void triggerRequest(long elements);
796
797    /**
798     * Trigger {@code cancel()} on your {@link Subscriber}
799     */
800    void signalCancel();
801  }
802
803  public void notVerified() {
804    throw new SkipException("Not verified using this TCK.");
805  }
806
807  public void notVerified(String msg) {
808    throw new SkipException(msg);
809  }
810}