001/************************************************************************
002 * Licensed under Public Domain (CC0)                                    *
003 *                                                                       *
004 * To the extent possible under law, the person who associated CC0 with  *
005 * this code has waived all copyright and related or neighboring         *
006 * rights to this code.                                                  *
007 *                                                                       *
008 * You should have received a copy of the CC0 legalcode along with this  *
009 * work. If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.*
010 ************************************************************************/
011
012package org.reactivestreams.tck;
013
014import org.reactivestreams.Publisher;
015import org.reactivestreams.Subscriber;
016import org.reactivestreams.Subscription;
017import org.reactivestreams.tck.TestEnvironment.*;
018import org.reactivestreams.tck.support.Optional;
019import org.reactivestreams.tck.support.SubscriberWhiteboxVerificationRules;
020import org.reactivestreams.tck.support.TestException;
021import org.testng.SkipException;
022import org.testng.annotations.AfterClass;
023import org.testng.annotations.BeforeClass;
024import org.testng.annotations.BeforeMethod;
025import org.testng.annotations.Test;
026
027import java.util.concurrent.ExecutorService;
028import java.util.concurrent.Executors;
029
030import static org.testng.Assert.assertTrue;
031
032/**
033 * Provides tests for verifying {@link org.reactivestreams.Subscriber} and {@link org.reactivestreams.Subscription} specification rules.
034 *
035 * @see org.reactivestreams.Subscriber
036 * @see org.reactivestreams.Subscription
037 */
038public abstract class SubscriberWhiteboxVerification<T> extends WithHelperPublisher<T>
039  implements SubscriberWhiteboxVerificationRules {
040
041  private final TestEnvironment env;
042
043  protected SubscriberWhiteboxVerification(TestEnvironment env) {
044    this.env = env;
045  }
046
047  // USER API
048
049  /**
050   * This is the main method you must implement in your test incarnation.
051   * It must create a new {@link org.reactivestreams.Subscriber} instance to be subjected to the testing logic.
052   *
053   * In order to be meaningfully testable your Subscriber must inform the given
054   * `WhiteboxSubscriberProbe` of the respective events having been received.
055   */
056  public abstract Subscriber<T> createSubscriber(WhiteboxSubscriberProbe<T> probe);
057
058  // ENV SETUP
059
060  /**
061   * Executor service used by the default provided asynchronous Publisher.
062   * @see #createHelperPublisher(long)
063   */
064  private ExecutorService publisherExecutor;
065  @BeforeClass public void startPublisherExecutorService() { publisherExecutor = Executors.newFixedThreadPool(4); }
066  @AfterClass public void shutdownPublisherExecutorService() { if (publisherExecutor != null) publisherExecutor.shutdown(); }
067  @Override public ExecutorService publisherExecutorService() { return publisherExecutor; }
068
069  ////////////////////// TEST ENV CLEANUP /////////////////////////////////////
070
071  @BeforeMethod
072  public void setUp() throws Exception {
073    env.clearAsyncErrors();
074  }
075
076  ////////////////////// TEST SETUP VERIFICATION //////////////////////////////
077
078  @Test
079  public void required_exerciseWhiteboxHappyPath() throws Throwable {
080    subscriberTest(new TestStageTestRun() {
081      @Override
082      public void run(WhiteboxTestStage stage) throws InterruptedException {
083        stage.puppet().triggerRequest(1);
084        stage.puppet().triggerRequest(1);
085
086        long receivedRequests = stage.expectRequest();
087
088        stage.signalNext();
089        stage.probe.expectNext(stage.lastT);
090
091        stage.puppet().triggerRequest(1);
092        if (receivedRequests == 1) {
093          stage.expectRequest();
094        }
095
096        stage.signalNext();
097        stage.probe.expectNext(stage.lastT);
098
099        stage.puppet().signalCancel();
100        stage.expectCancelling();
101
102        stage.verifyNoAsyncErrors();
103      }
104    });
105  }
106
107  ////////////////////// SPEC RULE VERIFICATION ///////////////////////////////
108
109  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.1
110  @Override @Test
111  public void required_spec201_mustSignalDemandViaSubscriptionRequest() throws Throwable {
112    subscriberTest(new TestStageTestRun() {
113      @Override
114      public void run(WhiteboxTestStage stage) throws InterruptedException {
115        stage.puppet().triggerRequest(1);
116        stage.expectRequest();
117
118        stage.signalNext();
119      }
120    });
121  }
122
123  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.2
124  @Override @Test
125  public void untested_spec202_shouldAsynchronouslyDispatch() throws Exception {
126    notVerified(); // cannot be meaningfully tested, or can it?
127  }
128
129  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.3
130  @Override @Test
131  public void required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnComplete() throws Throwable {
132    subscriberTestWithoutSetup(new TestStageTestRun() {
133      @Override
134      public void run(WhiteboxTestStage stage) throws Throwable {
135        final Subscription subs = new Subscription() {
136          @Override
137          public void request(long n) {
138            final Optional<StackTraceElement> onCompleteStackTraceElement = env.findCallerMethodInStackTrace("onComplete");
139            if (onCompleteStackTraceElement.isDefined()) {
140              final StackTraceElement stackElem = onCompleteStackTraceElement.get();
141              env.flop(String.format("Subscription::request MUST NOT be called from Subscriber::onComplete (Rule 2.3)! (Caller: %s::%s line %d)",
142                                     stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber()));
143            }
144          }
145
146          @Override
147          public void cancel() {
148            final Optional<StackTraceElement> onCompleteStackElement = env.findCallerMethodInStackTrace("onComplete");
149            if (onCompleteStackElement.isDefined()) {
150              final StackTraceElement stackElem = onCompleteStackElement.get();
151              env.flop(String.format("Subscription::cancel MUST NOT be called from Subscriber::onComplete (Rule 2.3)! (Caller: %s::%s line %d)",
152                                     stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber()));
153            }
154          }
155        };
156
157        stage.probe = stage.createWhiteboxSubscriberProbe(env);
158        final Subscriber<T> sub = createSubscriber(stage.probe);
159
160        sub.onSubscribe(subs);
161        sub.onComplete();
162
163        env.verifyNoAsyncErrorsNoDelay();
164      }
165    });
166  }
167
168  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.3
169  @Override @Test
170  public void required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnError() throws Throwable {
171    subscriberTestWithoutSetup(new TestStageTestRun() {
172      @Override
173      public void run(WhiteboxTestStage stage) throws Throwable {
174        final Subscription subs = new Subscription() {
175          @Override
176          public void request(long n) {
177            Throwable thr = new Throwable();
178            for (StackTraceElement stackElem : thr.getStackTrace()) {
179              if (stackElem.getMethodName().equals("onError")) {
180                env.flop(String.format("Subscription::request MUST NOT be called from Subscriber::onError (Rule 2.3)! (Caller: %s::%s line %d)",
181                                       stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber()));
182              }
183            }
184          }
185
186          @Override
187          public void cancel() {
188            Throwable thr = new Throwable();
189            for (StackTraceElement stackElem : thr.getStackTrace()) {
190              if (stackElem.getMethodName().equals("onError")) {
191                env.flop(String.format("Subscription::cancel MUST NOT be called from Subscriber::onError (Rule 2.3)! (Caller: %s::%s line %d)",
192                                       stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber()));
193              }
194            }
195          }
196        };
197
198        stage.probe = stage.createWhiteboxSubscriberProbe(env);
199        final Subscriber<T> sub = createSubscriber(stage.probe);
200
201        sub.onSubscribe(subs);
202        sub.onError(new TestException());
203
204        env.verifyNoAsyncErrorsNoDelay();
205      }
206    });
207  }
208
209  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.4
210  @Override @Test
211  public void untested_spec204_mustConsiderTheSubscriptionAsCancelledInAfterRecievingOnCompleteOrOnError() throws Exception {
212    notVerified(); // cannot be meaningfully tested, or can it?
213  }
214
215  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.5
216  @Override @Test
217  public void required_spec205_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal() throws Throwable {
218    subscriberTest(new TestStageTestRun() {
219      @Override
220      public void run(WhiteboxTestStage stage) throws Throwable {
221        // try to subscribe another time, if the subscriber calls `probe.registerOnSubscribe` the test will fail
222        final Latch secondSubscriptionCancelled = new Latch(env);
223        final Subscriber<? super T> sub = stage.sub();
224        final Subscription subscription = new Subscription() {
225          @Override
226          public void request(long elements) {
227            // ignore...
228          }
229
230          @Override
231          public void cancel() {
232            secondSubscriptionCancelled.close();
233          }
234
235          @Override
236          public String toString() {
237            return "SecondSubscription(should get cancelled)";
238          }
239        };
240        sub.onSubscribe(subscription);
241
242        secondSubscriptionCancelled.expectClose("Expected 2nd Subscription given to subscriber to be cancelled, but `Subscription.cancel()` was not called");
243        env.verifyNoAsyncErrors();
244      }
245    });
246  }
247
248  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.6
249  @Override @Test
250  public void untested_spec206_mustCallSubscriptionCancelIfItIsNoLongerValid() throws Exception {
251    notVerified(); // cannot be meaningfully tested, or can it?
252  }
253
254  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.7
255  @Override @Test
256  public void untested_spec207_mustEnsureAllCallsOnItsSubscriptionTakePlaceFromTheSameThreadOrTakeCareOfSynchronization() throws Exception {
257    notVerified(); // cannot be meaningfully tested, or can it?
258    // the same thread part of the clause can be verified but that is not very useful, or is it?
259  }
260
261  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.8
262  @Override @Test
263  public void required_spec208_mustBePreparedToReceiveOnNextSignalsAfterHavingCalledSubscriptionCancel() throws Throwable {
264    subscriberTest(new TestStageTestRun() {
265      @Override
266      public void run(WhiteboxTestStage stage) throws InterruptedException {
267        stage.puppet().triggerRequest(1);
268        stage.puppet().signalCancel();
269        stage.signalNext();
270
271        stage.puppet().triggerRequest(1);
272        stage.puppet().triggerRequest(1);
273
274        stage.verifyNoAsyncErrors();
275      }
276    });
277  }
278
279  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.9
280  @Override @Test
281  public void required_spec209_mustBePreparedToReceiveAnOnCompleteSignalWithPrecedingRequestCall() throws Throwable {
282    subscriberTest(new TestStageTestRun() {
283      @Override
284      public void run(WhiteboxTestStage stage) throws InterruptedException {
285        stage.puppet().triggerRequest(1);
286        stage.sendCompletion();
287        stage.probe.expectCompletion();
288
289        stage.verifyNoAsyncErrors();
290      }
291    });
292  }
293
294  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.9
295  @Override @Test
296  public void required_spec209_mustBePreparedToReceiveAnOnCompleteSignalWithoutPrecedingRequestCall() throws Throwable {
297    subscriberTest(new TestStageTestRun() {
298      @Override
299      public void run(WhiteboxTestStage stage) throws InterruptedException {
300        stage.sendCompletion();
301        stage.probe.expectCompletion();
302
303        stage.verifyNoAsyncErrors();
304      }
305    });
306  }
307
308  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.10
309  @Override @Test
310  public void required_spec210_mustBePreparedToReceiveAnOnErrorSignalWithPrecedingRequestCall() throws Throwable {
311    subscriberTest(new TestStageTestRun() {
312      @Override
313      public void run(WhiteboxTestStage stage) throws InterruptedException {
314        stage.puppet().triggerRequest(1);
315        stage.puppet().triggerRequest(1);
316
317        Exception ex = new TestException();
318        stage.sendError(ex);
319        stage.probe.expectError(ex);
320
321        env.verifyNoAsyncErrorsNoDelay();
322      }
323    });
324  }
325
326  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.10
327  @Override @Test
328  public void required_spec210_mustBePreparedToReceiveAnOnErrorSignalWithoutPrecedingRequestCall() throws Throwable {
329    subscriberTest(new TestStageTestRun() {
330      @Override
331      public void run(WhiteboxTestStage stage) throws InterruptedException {
332        Exception ex = new TestException();
333        stage.sendError(ex);
334        stage.probe.expectError(ex);
335
336        env.verifyNoAsyncErrorsNoDelay();
337      }
338    });
339  }
340
341  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.11
342  @Override @Test
343  public void untested_spec211_mustMakeSureThatAllCallsOnItsMethodsHappenBeforeTheProcessingOfTheRespectiveEvents() throws Exception {
344    notVerified(); // cannot be meaningfully tested, or can it?
345  }
346
347  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.12
348  @Override @Test
349  public void untested_spec212_mustNotCallOnSubscribeMoreThanOnceBasedOnObjectEquality_specViolation() throws Throwable {
350    notVerified(); // cannot be meaningfully tested, or can it?
351  }
352
353  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.13
354  @Override @Test
355  public void untested_spec213_failingOnSignalInvocation() throws Exception {
356    notVerified(); // cannot be meaningfully tested, or can it?
357  }
358
359  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.13
360  @Override @Test
361  public void required_spec213_onSubscribe_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable {
362    subscriberTest(new TestStageTestRun() {
363      @Override
364      public void run(WhiteboxTestStage stage) throws Throwable {
365
366        final Subscriber<? super T> sub = stage.sub();
367        boolean gotNPE = false;
368        try {
369          sub.onSubscribe(null);
370        } catch (final NullPointerException expected) {
371          gotNPE = true;
372        } 
373
374        assertTrue(gotNPE, "onSubscribe(null) did not throw NullPointerException");
375        env.verifyNoAsyncErrorsNoDelay();
376      }
377    });
378  }
379
380  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.13
381  @Override @Test
382  public void required_spec213_onNext_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable {
383    subscriberTest(new TestStageTestRun() {
384      @Override
385      public void run(WhiteboxTestStage stage) throws Throwable {
386
387        final Subscriber<? super T> sub = stage.sub();
388        boolean gotNPE = false;
389        try {
390          sub.onNext(null);
391        } catch (final NullPointerException expected) {
392          gotNPE = true;
393        }
394
395        assertTrue(gotNPE, "onNext(null) did not throw NullPointerException");
396        env.verifyNoAsyncErrorsNoDelay();
397      }
398    });
399  }
400
401  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.13
402  @Override @Test
403  public void required_spec213_onError_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable {
404    subscriberTest(new TestStageTestRun() {
405      @Override
406      public void run(WhiteboxTestStage stage) throws Throwable {
407
408          final Subscriber<? super T> sub = stage.sub();
409          boolean gotNPE = false;
410          try {
411            sub.onError(null);
412          } catch (final NullPointerException expected) {
413            gotNPE = true;
414          } finally {
415            assertTrue(gotNPE, "onError(null) did not throw NullPointerException");
416          }
417
418        env.verifyNoAsyncErrorsNoDelay();
419      }
420    });
421  }
422
423
424  ////////////////////// SUBSCRIPTION SPEC RULE VERIFICATION //////////////////
425
426  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.1
427  @Override @Test
428  public void untested_spec301_mustNotBeCalledOutsideSubscriberContext() throws Exception {
429    notVerified(); // cannot be meaningfully tested, or can it?
430  }
431
432  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.8
433  @Override @Test
434  public void required_spec308_requestMustRegisterGivenNumberElementsToBeProduced() throws Throwable {
435    subscriberTest(new TestStageTestRun() {
436      @Override
437      public void run(WhiteboxTestStage stage) throws InterruptedException {
438        stage.puppet().triggerRequest(2);
439        stage.probe.expectNext(stage.signalNext());
440        stage.probe.expectNext(stage.signalNext());
441
442        stage.probe.expectNone();
443        stage.puppet().triggerRequest(3);
444
445        stage.verifyNoAsyncErrors();
446      }
447    });
448  }
449
450  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.10
451  @Override @Test
452  public void untested_spec310_requestMaySynchronouslyCallOnNextOnSubscriber() throws Exception {
453    notVerified(); // cannot be meaningfully tested, or can it?
454  }
455
456  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.11
457  @Override @Test
458  public void untested_spec311_requestMaySynchronouslyCallOnCompleteOrOnError() throws Exception {
459    notVerified(); // cannot be meaningfully tested, or can it?
460  }
461
462  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.14
463  @Override @Test
464  public void untested_spec314_cancelMayCauseThePublisherToShutdownIfNoOtherSubscriptionExists() throws Exception {
465    notVerified(); // cannot be meaningfully tested, or can it?
466  }
467
468  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.15
469  @Override @Test
470  public void untested_spec315_cancelMustNotThrowExceptionAndMustSignalOnError() throws Exception {
471    notVerified(); // cannot be meaningfully tested, or can it?
472  }
473
474  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.16
475  @Override @Test
476  public void untested_spec316_requestMustNotThrowExceptionAndMustOnErrorTheSubscriber() throws Exception {
477    notVerified(); // cannot be meaningfully tested, or can it?
478  }
479
480  /////////////////////// ADDITIONAL "COROLLARY" TESTS ////////////////////////
481
482  /////////////////////// TEST INFRASTRUCTURE /////////////////////////////////
483
484  abstract class TestStageTestRun {
485    public abstract void run(WhiteboxTestStage stage) throws Throwable;
486  }
487
488  /**
489   * Prepares subscriber and publisher pair (by subscribing the first to the latter),
490   * and then hands over the tests {@link WhiteboxTestStage} over to the test.
491   *
492   * The test stage is, like in a puppet show, used to orchestrate what each participant should do.
493   * Since this is a whitebox test, this allows the stage to completely control when and how to signal / expect signals.
494   */
495  public void subscriberTest(TestStageTestRun body) throws Throwable {
496    WhiteboxTestStage stage = new WhiteboxTestStage(env, true);
497    body.run(stage);
498  }
499
500  /**
501   * Provides a {@link WhiteboxTestStage} without performing any additional setup,
502   * like the {@link #subscriberTest(SubscriberWhiteboxVerification.TestStageTestRun)} would.
503   *
504   * Use this method to write tests in which you need full control over when and how the initial {@code subscribe} is signalled.
505   */
506  public void subscriberTestWithoutSetup(TestStageTestRun body) throws Throwable {
507    WhiteboxTestStage stage = new WhiteboxTestStage(env, false);
508    body.run(stage);
509  }
510
511  /**
512   * Test for feature that MAY be implemented. This test will be marked as SKIPPED if it fails.
513   */
514  public void optionalSubscriberTestWithoutSetup(TestStageTestRun body) throws Throwable {
515    try {
516      subscriberTestWithoutSetup(body);
517    } catch (Exception ex) {
518      notVerified("Skipped because tested publisher does NOT implement this OPTIONAL requirement.");
519    }
520  }
521
522  public class WhiteboxTestStage extends ManualPublisher<T> {
523    public Publisher<T> pub;
524    public ManualSubscriber<T> tees; // gives us access to a stream T values
525    public WhiteboxSubscriberProbe<T> probe;
526
527    public T lastT = null;
528
529    public WhiteboxTestStage(TestEnvironment env) throws InterruptedException {
530      this(env, true);
531    }
532
533    public WhiteboxTestStage(TestEnvironment env, boolean runDefaultInit) throws InterruptedException {
534      super(env);
535      if (runDefaultInit) {
536        pub = this.createHelperPublisher(Long.MAX_VALUE);
537        tees = env.newManualSubscriber(pub);
538        probe = new WhiteboxSubscriberProbe<T>(env, subscriber);
539        subscribe(createSubscriber(probe));
540        probe.puppet.expectCompletion(env.defaultTimeoutMillis(), String.format("Subscriber %s did not `registerOnSubscribe`", sub()));
541      }
542    }
543
544    public Subscriber<? super T> sub() {
545      return subscriber.value();
546    }
547
548    public SubscriberPuppet puppet() {
549      return probe.puppet();
550    }
551
552    public WhiteboxSubscriberProbe<T> probe() {
553      return probe;
554    }
555
556    public Publisher<T> createHelperPublisher(long elements) {
557      return SubscriberWhiteboxVerification.this.createHelperPublisher(elements);
558    }
559
560    public WhiteboxSubscriberProbe<T> createWhiteboxSubscriberProbe(TestEnvironment env) {
561      return new WhiteboxSubscriberProbe<T>(env, subscriber);
562    }
563
564    public T signalNext() throws InterruptedException {
565      return signalNext(nextT());
566    }
567
568    private T signalNext(T element) throws InterruptedException {
569      sendNext(element);
570      return element;
571    }
572
573    public T nextT() throws InterruptedException {
574      lastT = tees.requestNextElement();
575      return lastT;
576    }
577
578    public void verifyNoAsyncErrors() {
579      env.verifyNoAsyncErrors();
580    }
581  }
582
583  /**
584   * This class is intented to be used as {@code Subscriber} decorator and should be used in {@code pub.subscriber(...)} calls,
585   * in order to allow intercepting calls on the underlying {@code Subscriber}.
586   * This delegation allows the proxy to implement {@link BlackboxProbe} assertions.
587   */
588  public static class BlackboxSubscriberProxy<T> extends BlackboxProbe<T> implements Subscriber<T> {
589
590    public BlackboxSubscriberProxy(TestEnvironment env, Subscriber<T> subscriber) {
591      super(env, Promise.<Subscriber<? super T>>completed(env, subscriber));
592    }
593
594    @Override
595    public void onSubscribe(Subscription s) {
596      sub().onSubscribe(s);
597    }
598
599    @Override
600    public void onNext(T t) {
601      registerOnNext(t);
602      sub().onNext(t);
603    }
604
605    @Override
606    public void onError(Throwable cause) {
607      registerOnError(cause);
608      sub().onError(cause);
609    }
610
611    @Override
612    public void onComplete() {
613      registerOnComplete();
614      sub().onComplete();
615    }
616  }
617
618  public static class BlackboxProbe<T> implements SubscriberProbe<T> {
619    protected final TestEnvironment env;
620    protected final Promise<Subscriber<? super T>> subscriber;
621
622    protected final Receptacle<T> elements;
623    protected final Promise<Throwable> error;
624
625    public BlackboxProbe(TestEnvironment env, Promise<Subscriber<? super T>> subscriber) {
626      this.env = env;
627      this.subscriber = subscriber;
628      elements = new Receptacle<T>(env);
629      error = new Promise<Throwable>(env);
630    }
631
632    @Override
633    public void registerOnNext(T element) {
634      elements.add(element);
635    }
636
637    @Override
638    public void registerOnComplete() {
639      try {
640        elements.complete();
641      } catch (IllegalStateException ex) {
642        // "Queue full", onComplete was already called
643        env.flop("subscriber::onComplete was called a second time, which is illegal according to Rule 1.7");
644      }
645    }
646
647    @Override
648    public void registerOnError(Throwable cause) {
649      try {
650        error.complete(cause);
651      } catch (IllegalStateException ex) {
652        // "Queue full", onError was already called
653        env.flop("subscriber::onError was called a second time, which is illegal according to Rule 1.7");
654      }
655    }
656
657    public T expectNext() throws InterruptedException {
658      return elements.next(env.defaultTimeoutMillis(), String.format("Subscriber %s did not call `registerOnNext(_)`", sub()));
659    }
660
661    public void expectNext(T expected) throws InterruptedException {
662      expectNext(expected, env.defaultTimeoutMillis());
663    }
664
665    public void expectNext(T expected, long timeoutMillis) throws InterruptedException {
666      T received = elements.next(timeoutMillis, String.format("Subscriber %s did not call `registerOnNext(%s)`", sub(), expected));
667      if (!received.equals(expected)) {
668        env.flop(String.format("Subscriber %s called `registerOnNext(%s)` rather than `registerOnNext(%s)`", sub(), received, expected));
669      }
670    }
671
672    public Subscriber<? super T> sub() {
673      return subscriber.value();
674    }
675
676    public void expectCompletion() throws InterruptedException {
677      expectCompletion(env.defaultTimeoutMillis());
678    }
679
680    public void expectCompletion(long timeoutMillis) throws InterruptedException {
681      expectCompletion(timeoutMillis, String.format("Subscriber %s did not call `registerOnComplete()`", sub()));
682    }
683
684    public void expectCompletion(long timeoutMillis, String msg) throws InterruptedException {
685      elements.expectCompletion(timeoutMillis, msg);
686    }
687
688    @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
689    public <E extends Throwable> void expectErrorWithMessage(Class<E> expected, String requiredMessagePart) throws InterruptedException {
690      final E err = expectError(expected);
691      String message = err.getMessage();
692      assertTrue(message.contains(requiredMessagePart),
693        String.format("Got expected exception %s but missing message [%s], was: %s", err.getClass(), requiredMessagePart, expected));
694    }
695
696    public <E extends Throwable> E expectError(Class<E> expected) throws InterruptedException {
697      return expectError(expected, env.defaultTimeoutMillis());
698    }
699
700    @SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"})
701    public <E extends Throwable> E expectError(Class<E> expected, long timeoutMillis) throws InterruptedException {
702      error.expectCompletion(timeoutMillis, String.format("Subscriber %s did not call `registerOnError(%s)`", sub(), expected));
703      if (error.value() == null) {
704        return env.flopAndFail(String.format("Subscriber %s did not call `registerOnError(%s)`", sub(), expected));
705      } else if (expected.isInstance(error.value())) {
706        return (E) error.value();
707      } else {
708        return env.flopAndFail(String.format("Subscriber %s called `registerOnError(%s)` rather than `registerOnError(%s)`", sub(), error.value(), expected));
709      }
710    }
711
712    public void expectError(Throwable expected) throws InterruptedException {
713      expectError(expected, env.defaultTimeoutMillis());
714    }
715
716    @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
717    public void expectError(Throwable expected, long timeoutMillis) throws InterruptedException {
718      error.expectCompletion(timeoutMillis, String.format("Subscriber %s did not call `registerOnError(%s)`", sub(), expected));
719      if (error.value() != expected) {
720        env.flop(String.format("Subscriber %s called `registerOnError(%s)` rather than `registerOnError(%s)`", sub(), error.value(), expected));
721      }
722    }
723
724    public void expectNone() throws InterruptedException {
725      expectNone(env.defaultNoSignalsTimeoutMillis());
726    }
727
728    public void expectNone(long withinMillis) throws InterruptedException {
729      elements.expectNone(withinMillis, "Expected nothing");
730    }
731
732  }
733
734  public static class WhiteboxSubscriberProbe<T> extends BlackboxProbe<T> implements SubscriberPuppeteer {
735    protected Promise<SubscriberPuppet> puppet;
736
737    public WhiteboxSubscriberProbe(TestEnvironment env, Promise<Subscriber<? super T>> subscriber) {
738      super(env, subscriber);
739      puppet = new Promise<SubscriberPuppet>(env);
740    }
741
742    private SubscriberPuppet puppet() {
743      return puppet.value();
744    }
745
746    @Override
747    public void registerOnSubscribe(SubscriberPuppet p) {
748      if (!puppet.isCompleted()) {
749        puppet.complete(p);
750      } 
751    }
752
753  }
754
755  public interface SubscriberPuppeteer {
756
757    /**
758     * Must be called by the test subscriber when it has successfully registered a subscription
759     * inside the `onSubscribe` method.
760     */
761    void registerOnSubscribe(SubscriberPuppet puppet);
762  }
763
764  public interface SubscriberProbe<T> {
765
766    /**
767     * Must be called by the test subscriber when it has received an`onNext` event.
768     */
769    void registerOnNext(T element);
770
771    /**
772     * Must be called by the test subscriber when it has received an `onComplete` event.
773     */
774    void registerOnComplete();
775
776    /**
777     * Must be called by the test subscriber when it has received an `onError` event.
778     */
779    void registerOnError(Throwable cause);
780
781  }
782
783  public interface SubscriberPuppet {
784    void triggerRequest(long elements);
785
786    void signalCancel();
787  }
788
789  public void notVerified() {
790    throw new SkipException("Not verified using this TCK.");
791  }
792
793  public void notVerified(String msg) {
794    throw new SkipException(msg);
795  }
796}