001/***************************************************
002 * Licensed under MIT No Attribution (SPDX: MIT-0) *
003 ***************************************************/
004
005package org.reactivestreams.tck;
006
007import org.reactivestreams.Publisher;
008import org.reactivestreams.Subscriber;
009import org.reactivestreams.Subscription;
010import org.reactivestreams.tck.TestEnvironment.*;
011import org.reactivestreams.tck.flow.support.Optional;
012import org.reactivestreams.tck.flow.support.SubscriberWhiteboxVerificationRules;
013import org.reactivestreams.tck.flow.support.TestException;
014import org.testng.SkipException;
015import org.testng.annotations.AfterClass;
016import org.testng.annotations.BeforeClass;
017import org.testng.annotations.BeforeMethod;
018import org.testng.annotations.Test;
019
020import java.util.concurrent.ExecutorService;
021import java.util.concurrent.Executors;
022
023import static org.testng.Assert.assertTrue;
024
025/**
026 * Provides whitebox style tests for verifying {@link org.reactivestreams.Subscriber}
027 * and {@link org.reactivestreams.Subscription} specification rules.
028 *
029 * @see org.reactivestreams.Subscriber
030 * @see org.reactivestreams.Subscription
031 */
032public abstract class SubscriberWhiteboxVerification<T> extends WithHelperPublisher<T>
033  implements SubscriberWhiteboxVerificationRules {
034
035  private final TestEnvironment env;
036
037  protected SubscriberWhiteboxVerification(TestEnvironment env) {
038    this.env = env;
039  }
040
041  // USER API
042
043  /**
044   * This is the main method you must implement in your test incarnation.
045   * It must create a new {@link org.reactivestreams.Subscriber} instance to be subjected to the testing logic.
046   *
047   * In order to be meaningfully testable your Subscriber must inform the given
048   * `WhiteboxSubscriberProbe` of the respective events having been received.
049   */
050  public abstract Subscriber<T> createSubscriber(WhiteboxSubscriberProbe<T> probe);
051
052  // ENV SETUP
053
054  /**
055   * Executor service used by the default provided asynchronous Publisher.
056   * @see #createHelperPublisher(long)
057   */
058  private ExecutorService publisherExecutor;
059  @BeforeClass public void startPublisherExecutorService() { publisherExecutor = Executors.newFixedThreadPool(4); }
060  @AfterClass public void shutdownPublisherExecutorService() { if (publisherExecutor != null) publisherExecutor.shutdown(); }
061  @Override public ExecutorService publisherExecutorService() { return publisherExecutor; }
062
063  ////////////////////// TEST ENV CLEANUP /////////////////////////////////////
064
065  @BeforeMethod
066  public void setUp() throws Exception {
067    env.clearAsyncErrors();
068  }
069
070  ////////////////////// TEST SETUP VERIFICATION //////////////////////////////
071
072  @Test
073  public void required_exerciseWhiteboxHappyPath() throws Throwable {
074    subscriberTest(new TestStageTestRun() {
075      @Override
076      public void run(WhiteboxTestStage stage) throws InterruptedException {
077        stage.puppet().triggerRequest(1);
078        stage.puppet().triggerRequest(1);
079
080        long receivedRequests = stage.expectRequest();
081
082        stage.signalNext();
083        stage.probe.expectNext(stage.lastT);
084
085        stage.puppet().triggerRequest(1);
086        if (receivedRequests == 1) {
087          stage.expectRequest();
088        }
089
090        stage.signalNext();
091        stage.probe.expectNext(stage.lastT);
092
093        stage.puppet().signalCancel();
094        stage.expectCancelling();
095
096        stage.verifyNoAsyncErrors();
097      }
098    });
099  }
100
101  ////////////////////// SPEC RULE VERIFICATION ///////////////////////////////
102
103  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.1
104  @Override @Test
105  public void required_spec201_mustSignalDemandViaSubscriptionRequest() throws Throwable {
106    subscriberTest(new TestStageTestRun() {
107      @Override
108      public void run(WhiteboxTestStage stage) throws InterruptedException {
109        stage.puppet().triggerRequest(1);
110        stage.expectRequest();
111
112        stage.signalNext();
113      }
114    });
115  }
116
117  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.2
118  @Override @Test
119  public void untested_spec202_shouldAsynchronouslyDispatch() throws Exception {
120    notVerified(); // cannot be meaningfully tested, or can it?
121  }
122
123  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.3
124  @Override @Test
125  public void required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnComplete() throws Throwable {
126    subscriberTestWithoutSetup(new TestStageTestRun() {
127      @Override
128      public void run(WhiteboxTestStage stage) throws Throwable {
129        final String onCompleteMethod = "required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnComplete_call";
130        
131        final Subscription subs = new Subscription() {
132          @Override
133          public void request(long n) {
134            final Optional<StackTraceElement> onCompleteStackTraceElement = env.findCallerMethodInStackTrace(onCompleteMethod);
135            if (onCompleteStackTraceElement.isDefined()) {
136              final StackTraceElement stackElem = onCompleteStackTraceElement.get();
137              env.flop(String.format("Subscription::request MUST NOT be called from Subscriber::onComplete (Rule 2.3)! (Caller: %s::%s line %d)",
138                                     stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber()));
139            }
140          }
141
142          @Override
143          public void cancel() {
144            final Optional<StackTraceElement> onCompleteStackElement = env.findCallerMethodInStackTrace(onCompleteMethod);
145            if (onCompleteStackElement.isDefined()) {
146              final StackTraceElement stackElem = onCompleteStackElement.get();
147              env.flop(String.format("Subscription::cancel MUST NOT be called from Subscriber::onComplete (Rule 2.3)! (Caller: %s::%s line %d)",
148                                     stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber()));
149            }
150          }
151        };
152
153        stage.probe = stage.createWhiteboxSubscriberProbe(env);
154        final Subscriber<T> sub = createSubscriber(stage.probe);
155
156        sub.onSubscribe(subs);
157        required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnComplete_call(sub);
158
159        env.verifyNoAsyncErrorsNoDelay();
160      }
161
162      /** Makes sure the onComplete is initiated with a recognizable stacktrace element on the current thread. */
163      void required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnComplete_call(Subscriber<?> sub) {
164        sub.onComplete();
165      }
166    });
167  }
168
169  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.3
170  @Override @Test
171  public void required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnError() throws Throwable {
172    subscriberTestWithoutSetup(new TestStageTestRun() {
173      @Override
174      public void run(WhiteboxTestStage stage) throws Throwable {
175        final String onErrorMethod = "required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnError_call";
176
177        final Subscription subs = new Subscription() {
178          @Override
179          public void request(long n) {
180            final Optional<StackTraceElement> onCompleteStackElement = env.findCallerMethodInStackTrace(onErrorMethod);
181            if (onCompleteStackElement.isDefined()) {
182              final StackTraceElement stackElem = onCompleteStackElement.get();
183              env.flop(String.format("Subscription::request MUST NOT be called from Subscriber::onError (Rule 2.3)! (Caller: %s::%s line %d)",
184                                       stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber()));
185            }
186          }
187
188          @Override
189          public void cancel() {
190            final Optional<StackTraceElement> onCompleteStackElement = env.findCallerMethodInStackTrace(onErrorMethod);
191            if (onCompleteStackElement.isDefined()) {
192              final StackTraceElement stackElem = onCompleteStackElement.get();
193              env.flop(String.format("Subscription::cancel MUST NOT be called from Subscriber::onError (Rule 2.3)! (Caller: %s::%s line %d)",
194                                      stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber()));
195            }
196          }
197        };
198
199        stage.probe = stage.createWhiteboxSubscriberProbe(env);
200        final Subscriber<T> sub = createSubscriber(stage.probe);
201
202        sub.onSubscribe(subs);
203        required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnError_call(sub);
204
205        env.verifyNoAsyncErrorsNoDelay();
206      }
207
208      /** Makes sure the onError is initiated with a recognizable stacktrace element on the current thread. */
209      void required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnError_call(Subscriber<?> sub) {
210        sub.onError(new TestException());
211      }
212    });
213  }
214
215  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.4
216  @Override @Test
217  public void untested_spec204_mustConsiderTheSubscriptionAsCancelledInAfterRecievingOnCompleteOrOnError() throws Exception {
218    notVerified(); // cannot be meaningfully tested, or can it?
219  }
220
221  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.5
222  @Override @Test
223  public void required_spec205_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal() throws Throwable {
224    subscriberTest(new TestStageTestRun() {
225      @Override
226      public void run(WhiteboxTestStage stage) throws Throwable {
227        // try to subscribe another time, if the subscriber calls `probe.registerOnSubscribe` the test will fail
228        final Latch secondSubscriptionCancelled = new Latch(env);
229        final Subscriber<? super T> sub = stage.sub();
230        final Subscription subscription = new Subscription() {
231          @Override
232          public void request(long elements) {
233            // ignore...
234          }
235
236          @Override
237          public void cancel() {
238            secondSubscriptionCancelled.close();
239          }
240
241          @Override
242          public String toString() {
243            return "SecondSubscription(should get cancelled)";
244          }
245        };
246        sub.onSubscribe(subscription);
247
248        secondSubscriptionCancelled.expectClose("Expected 2nd Subscription given to subscriber to be cancelled, but `Subscription.cancel()` was not called");
249        env.verifyNoAsyncErrors();
250      }
251    });
252  }
253
254  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.6
255  @Override @Test
256  public void untested_spec206_mustCallSubscriptionCancelIfItIsNoLongerValid() throws Exception {
257    notVerified(); // cannot be meaningfully tested, or can it?
258  }
259
260  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.7
261  @Override @Test
262  public void untested_spec207_mustEnsureAllCallsOnItsSubscriptionTakePlaceFromTheSameThreadOrTakeCareOfSynchronization() throws Exception {
263    notVerified(); // cannot be meaningfully tested, or can it?
264    // the same thread part of the clause can be verified but that is not very useful, or is it?
265  }
266
267  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.8
268  @Override @Test
269  public void required_spec208_mustBePreparedToReceiveOnNextSignalsAfterHavingCalledSubscriptionCancel() throws Throwable {
270    subscriberTest(new TestStageTestRun() {
271      @Override
272      public void run(WhiteboxTestStage stage) throws InterruptedException {
273        stage.puppet().triggerRequest(1);
274        stage.expectRequest();
275        stage.puppet().signalCancel();
276        stage.expectCancelling();
277        stage.signalNext();
278
279        stage.puppet().triggerRequest(1);
280        stage.puppet().triggerRequest(1);
281
282        stage.verifyNoAsyncErrors();
283      }
284    });
285  }
286
287  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.9
288  @Override @Test
289  public void required_spec209_mustBePreparedToReceiveAnOnCompleteSignalWithPrecedingRequestCall() throws Throwable {
290    subscriberTest(new TestStageTestRun() {
291      @Override
292      public void run(WhiteboxTestStage stage) throws InterruptedException {
293        stage.puppet().triggerRequest(1);
294        stage.sendCompletion();
295        stage.probe.expectCompletion();
296
297        stage.verifyNoAsyncErrors();
298      }
299    });
300  }
301
302  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.9
303  @Override @Test
304  public void required_spec209_mustBePreparedToReceiveAnOnCompleteSignalWithoutPrecedingRequestCall() throws Throwable {
305    subscriberTest(new TestStageTestRun() {
306      @Override
307      public void run(WhiteboxTestStage stage) throws InterruptedException {
308        stage.sendCompletion();
309        stage.probe.expectCompletion();
310
311        stage.verifyNoAsyncErrors();
312      }
313    });
314  }
315
316  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.10
317  @Override @Test
318  public void required_spec210_mustBePreparedToReceiveAnOnErrorSignalWithPrecedingRequestCall() throws Throwable {
319    subscriberTest(new TestStageTestRun() {
320      @Override
321      public void run(WhiteboxTestStage stage) throws InterruptedException {
322        stage.puppet().triggerRequest(1);
323        stage.puppet().triggerRequest(1);
324
325        Exception ex = new TestException();
326        stage.sendError(ex);
327        stage.probe.expectError(ex);
328
329        env.verifyNoAsyncErrorsNoDelay();
330      }
331    });
332  }
333
334  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.10
335  @Override @Test
336  public void required_spec210_mustBePreparedToReceiveAnOnErrorSignalWithoutPrecedingRequestCall() throws Throwable {
337    subscriberTest(new TestStageTestRun() {
338      @Override
339      public void run(WhiteboxTestStage stage) throws InterruptedException {
340        Exception ex = new TestException();
341        stage.sendError(ex);
342        stage.probe.expectError(ex);
343
344        env.verifyNoAsyncErrorsNoDelay();
345      }
346    });
347  }
348
349  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.11
350  @Override @Test
351  public void untested_spec211_mustMakeSureThatAllCallsOnItsMethodsHappenBeforeTheProcessingOfTheRespectiveEvents() throws Exception {
352    notVerified(); // cannot be meaningfully tested, or can it?
353  }
354
355  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.12
356  @Override @Test
357  public void untested_spec212_mustNotCallOnSubscribeMoreThanOnceBasedOnObjectEquality_specViolation() throws Throwable {
358    notVerified(); // cannot be meaningfully tested, or can it?
359  }
360
361  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.13
362  @Override @Test
363  public void untested_spec213_failingOnSignalInvocation() throws Exception {
364    notVerified(); // cannot be meaningfully tested, or can it?
365  }
366
367  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.13
368  @Override @Test
369  public void required_spec213_onSubscribe_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable {
370    subscriberTest(new TestStageTestRun() {
371      @Override
372      public void run(WhiteboxTestStage stage) throws Throwable {
373
374        final Subscriber<? super T> sub = stage.sub();
375        boolean gotNPE = false;
376        try {
377          sub.onSubscribe(null);
378        } catch (final NullPointerException expected) {
379          gotNPE = true;
380        }
381
382        assertTrue(gotNPE, "onSubscribe(null) did not throw NullPointerException");
383        env.verifyNoAsyncErrorsNoDelay();
384      }
385    });
386  }
387
388  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.13
389  @Override @Test
390  public void required_spec213_onNext_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable {
391    subscriberTest(new TestStageTestRun() {
392      @Override
393      public void run(WhiteboxTestStage stage) throws Throwable {
394
395        final Subscriber<? super T> sub = stage.sub();
396        boolean gotNPE = false;
397        try {
398          sub.onNext(null);
399        } catch (final NullPointerException expected) {
400          gotNPE = true;
401        }
402
403        assertTrue(gotNPE, "onNext(null) did not throw NullPointerException");
404        env.verifyNoAsyncErrorsNoDelay();
405      }
406    });
407  }
408
409  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.13
410  @Override @Test
411  public void required_spec213_onError_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable {
412    subscriberTest(new TestStageTestRun() {
413      @Override
414      public void run(WhiteboxTestStage stage) throws Throwable {
415
416          final Subscriber<? super T> sub = stage.sub();
417          boolean gotNPE = false;
418          try {
419            sub.onError(null);
420          } catch (final NullPointerException expected) {
421            gotNPE = true;
422          } finally {
423            assertTrue(gotNPE, "onError(null) did not throw NullPointerException");
424          }
425
426        env.verifyNoAsyncErrorsNoDelay();
427      }
428    });
429  }
430
431
432  ////////////////////// SUBSCRIPTION SPEC RULE VERIFICATION //////////////////
433
434  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.1
435  @Override @Test
436  public void untested_spec301_mustNotBeCalledOutsideSubscriberContext() throws Exception {
437    notVerified(); // cannot be meaningfully tested, or can it?
438  }
439
440  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.8
441  @Override @Test
442  public void required_spec308_requestMustRegisterGivenNumberElementsToBeProduced() throws Throwable {
443    subscriberTest(new TestStageTestRun() {
444      @Override
445      public void run(WhiteboxTestStage stage) throws InterruptedException {
446        stage.puppet().triggerRequest(2);
447        long requestedElements = stage.expectRequest();
448        stage.probe.expectNext(stage.signalNext());
449        // Some subscribers may only request one element at a time.
450        if (requestedElements < 2) {
451          stage.expectRequest();
452        }
453        stage.probe.expectNext(stage.signalNext());
454
455        stage.probe.expectNone();
456        stage.puppet().triggerRequest(3);
457
458        stage.verifyNoAsyncErrors();
459      }
460    });
461  }
462
463  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.10
464  @Override @Test
465  public void untested_spec310_requestMaySynchronouslyCallOnNextOnSubscriber() throws Exception {
466    notVerified(); // cannot be meaningfully tested, or can it?
467  }
468
469  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.11
470  @Override @Test
471  public void untested_spec311_requestMaySynchronouslyCallOnCompleteOrOnError() throws Exception {
472    notVerified(); // cannot be meaningfully tested, or can it?
473  }
474
475  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.14
476  @Override @Test
477  public void untested_spec314_cancelMayCauseThePublisherToShutdownIfNoOtherSubscriptionExists() throws Exception {
478    notVerified(); // cannot be meaningfully tested, or can it?
479  }
480
481  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.15
482  @Override @Test
483  public void untested_spec315_cancelMustNotThrowExceptionAndMustSignalOnError() throws Exception {
484    notVerified(); // cannot be meaningfully tested, or can it?
485  }
486
487  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.16
488  @Override @Test
489  public void untested_spec316_requestMustNotThrowExceptionAndMustOnErrorTheSubscriber() throws Exception {
490    notVerified(); // cannot be meaningfully tested, or can it?
491  }
492
493  /////////////////////// ADDITIONAL "COROLLARY" TESTS ////////////////////////
494
495  /////////////////////// TEST INFRASTRUCTURE /////////////////////////////////
496
497  abstract class TestStageTestRun {
498    public abstract void run(WhiteboxTestStage stage) throws Throwable;
499  }
500
501  /**
502   * Prepares subscriber and publisher pair (by subscribing the first to the latter),
503   * and then hands over the tests {@link WhiteboxTestStage} over to the test.
504   *
505   * The test stage is, like in a puppet show, used to orchestrate what each participant should do.
506   * Since this is a whitebox test, this allows the stage to completely control when and how to signal / expect signals.
507   */
508  public void subscriberTest(TestStageTestRun body) throws Throwable {
509    WhiteboxTestStage stage = new WhiteboxTestStage(env, true);
510    body.run(stage);
511  }
512
513  /**
514   * Provides a {@link WhiteboxTestStage} without performing any additional setup,
515   * like the {@link #subscriberTest(SubscriberWhiteboxVerification.TestStageTestRun)} would.
516   *
517   * Use this method to write tests in which you need full control over when and how the initial {@code subscribe} is signalled.
518   */
519  public void subscriberTestWithoutSetup(TestStageTestRun body) throws Throwable {
520    WhiteboxTestStage stage = new WhiteboxTestStage(env, false);
521    body.run(stage);
522  }
523
524  /**
525   * Test for feature that MAY be implemented. This test will be marked as SKIPPED if it fails.
526   */
527  public void optionalSubscriberTestWithoutSetup(TestStageTestRun body) throws Throwable {
528    try {
529      subscriberTestWithoutSetup(body);
530    } catch (Exception ex) {
531      notVerified("Skipped because tested publisher does NOT implement this OPTIONAL requirement.");
532    }
533  }
534
535  public class WhiteboxTestStage extends ManualPublisher<T> {
536    public Publisher<T> pub;
537    public ManualSubscriber<T> tees; // gives us access to a stream T values
538    public WhiteboxSubscriberProbe<T> probe;
539
540    public T lastT = null;
541
542    public WhiteboxTestStage(TestEnvironment env) throws InterruptedException {
543      this(env, true);
544    }
545
546    public WhiteboxTestStage(TestEnvironment env, boolean runDefaultInit) throws InterruptedException {
547      super(env);
548      if (runDefaultInit) {
549        pub = this.createHelperPublisher(Long.MAX_VALUE);
550        tees = env.newManualSubscriber(pub);
551        probe = new WhiteboxSubscriberProbe<T>(env, subscriber);
552        subscribe(createSubscriber(probe));
553        probe.puppet.expectCompletion(env.defaultTimeoutMillis(), String.format("Subscriber %s did not `registerOnSubscribe`", sub()));
554        env.verifyNoAsyncErrorsNoDelay();
555      }
556    }
557
558    public Subscriber<? super T> sub() {
559      return subscriber.value();
560    }
561
562    public SubscriberPuppet puppet() {
563      return probe.puppet();
564    }
565
566    public WhiteboxSubscriberProbe<T> probe() {
567      return probe;
568    }
569
570    public Publisher<T> createHelperPublisher(long elements) {
571      return SubscriberWhiteboxVerification.this.createHelperPublisher(elements);
572    }
573
574    public WhiteboxSubscriberProbe<T> createWhiteboxSubscriberProbe(TestEnvironment env) {
575      return new WhiteboxSubscriberProbe<T>(env, subscriber);
576    }
577
578    public T signalNext() throws InterruptedException {
579      return signalNext(nextT());
580    }
581
582    private T signalNext(T element) throws InterruptedException {
583      sendNext(element);
584      return element;
585    }
586
587    public T nextT() throws InterruptedException {
588      lastT = tees.requestNextElement();
589      return lastT;
590    }
591
592    public void verifyNoAsyncErrors() {
593      env.verifyNoAsyncErrors();
594    }
595  }
596
597  /**
598   * This class is intented to be used as {@code Subscriber} decorator and should be used in {@code pub.subscriber(...)} calls,
599   * in order to allow intercepting calls on the underlying {@code Subscriber}.
600   * This delegation allows the proxy to implement {@link BlackboxProbe} assertions.
601   */
602  public static class BlackboxSubscriberProxy<T> extends BlackboxProbe<T> implements Subscriber<T> {
603
604    public BlackboxSubscriberProxy(TestEnvironment env, Subscriber<T> subscriber) {
605      super(env, Promise.<Subscriber<? super T>>completed(env, subscriber));
606    }
607
608    @Override
609    public void onSubscribe(Subscription s) {
610      sub().onSubscribe(s);
611    }
612
613    @Override
614    public void onNext(T t) {
615      registerOnNext(t);
616      sub().onNext(t);
617    }
618
619    @Override
620    public void onError(Throwable cause) {
621      registerOnError(cause);
622      sub().onError(cause);
623    }
624
625    @Override
626    public void onComplete() {
627      registerOnComplete();
628      sub().onComplete();
629    }
630  }
631
632  public static class BlackboxProbe<T> implements SubscriberProbe<T> {
633    protected final TestEnvironment env;
634    protected final Promise<Subscriber<? super T>> subscriber;
635
636    protected final Receptacle<T> elements;
637    protected final Promise<Throwable> error;
638
639    public BlackboxProbe(TestEnvironment env, Promise<Subscriber<? super T>> subscriber) {
640      this.env = env;
641      this.subscriber = subscriber;
642      elements = new Receptacle<T>(env);
643      error = new Promise<Throwable>(env);
644    }
645
646    @Override
647    public void registerOnNext(T element) {
648      elements.add(element);
649    }
650
651    @Override
652    public void registerOnComplete() {
653      try {
654        elements.complete();
655      } catch (IllegalStateException ex) {
656        // "Queue full", onComplete was already called
657        env.flop("subscriber::onComplete was called a second time, which is illegal according to Rule 1.7");
658      }
659    }
660
661    @Override
662    public void registerOnError(Throwable cause) {
663      try {
664        error.complete(cause);
665      } catch (IllegalStateException ex) {
666        // "Queue full", onError was already called
667        env.flop("subscriber::onError was called a second time, which is illegal according to Rule 1.7");
668      }
669    }
670
671    public T expectNext() throws InterruptedException {
672      return elements.next(env.defaultTimeoutMillis(), String.format("Subscriber %s did not call `registerOnNext(_)`", sub()));
673    }
674
675    public void expectNext(T expected) throws InterruptedException {
676      expectNext(expected, env.defaultTimeoutMillis());
677    }
678
679    public void expectNext(T expected, long timeoutMillis) throws InterruptedException {
680      T received = elements.next(timeoutMillis, String.format("Subscriber %s did not call `registerOnNext(%s)`", sub(), expected));
681      if (!received.equals(expected)) {
682        env.flop(String.format("Subscriber %s called `registerOnNext(%s)` rather than `registerOnNext(%s)`", sub(), received, expected));
683      }
684    }
685
686    public Subscriber<? super T> sub() {
687      return subscriber.value();
688    }
689
690    public void expectCompletion() throws InterruptedException {
691      expectCompletion(env.defaultTimeoutMillis());
692    }
693
694    public void expectCompletion(long timeoutMillis) throws InterruptedException {
695      expectCompletion(timeoutMillis, String.format("Subscriber %s did not call `registerOnComplete()`", sub()));
696    }
697
698    public void expectCompletion(long timeoutMillis, String msg) throws InterruptedException {
699      elements.expectCompletion(timeoutMillis, msg);
700    }
701
702    @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
703    public <E extends Throwable> void expectErrorWithMessage(Class<E> expected, String requiredMessagePart) throws InterruptedException {
704      final E err = expectError(expected);
705      String message = err.getMessage();
706      assertTrue(message.contains(requiredMessagePart),
707        String.format("Got expected exception %s but missing message [%s], was: %s", err.getClass(), requiredMessagePart, expected));
708    }
709
710    public <E extends Throwable> E expectError(Class<E> expected) throws InterruptedException {
711      return expectError(expected, env.defaultTimeoutMillis());
712    }
713
714    @SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"})
715    public <E extends Throwable> E expectError(Class<E> expected, long timeoutMillis) throws InterruptedException {
716      error.expectCompletion(timeoutMillis, String.format("Subscriber %s did not call `registerOnError(%s)`", sub(), expected));
717      if (error.value() == null) {
718        return env.flopAndFail(String.format("Subscriber %s did not call `registerOnError(%s)`", sub(), expected));
719      } else if (expected.isInstance(error.value())) {
720        return (E) error.value();
721      } else {
722        return env.flopAndFail(String.format("Subscriber %s called `registerOnError(%s)` rather than `registerOnError(%s)`", sub(), error.value(), expected));
723      }
724    }
725
726    public void expectError(Throwable expected) throws InterruptedException {
727      expectError(expected, env.defaultTimeoutMillis());
728    }
729
730    @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
731    public void expectError(Throwable expected, long timeoutMillis) throws InterruptedException {
732      error.expectCompletion(timeoutMillis, String.format("Subscriber %s did not call `registerOnError(%s)`", sub(), expected));
733      if (error.value() != expected) {
734        env.flop(String.format("Subscriber %s called `registerOnError(%s)` rather than `registerOnError(%s)`", sub(), error.value(), expected));
735      }
736    }
737
738    public void expectNone() throws InterruptedException {
739      expectNone(env.defaultNoSignalsTimeoutMillis());
740    }
741
742    public void expectNone(long withinMillis) throws InterruptedException {
743      elements.expectNone(withinMillis, "Expected nothing");
744    }
745
746  }
747
748  public static class WhiteboxSubscriberProbe<T> extends BlackboxProbe<T> implements SubscriberPuppeteer {
749    protected Promise<SubscriberPuppet> puppet;
750
751    public WhiteboxSubscriberProbe(TestEnvironment env, Promise<Subscriber<? super T>> subscriber) {
752      super(env, subscriber);
753      puppet = new Promise<SubscriberPuppet>(env);
754    }
755
756    private SubscriberPuppet puppet() {
757      return puppet.value();
758    }
759
760    @Override
761    public void registerOnSubscribe(SubscriberPuppet p) {
762      if (!puppet.isCompleted()) {
763        puppet.complete(p);
764      }
765    }
766
767  }
768
769  public interface SubscriberPuppeteer {
770
771    /**
772     * Must be called by the test subscriber when it has successfully registered a subscription
773     * inside the `onSubscribe` method.
774     */
775    void registerOnSubscribe(SubscriberPuppet puppet);
776  }
777
778  public interface SubscriberProbe<T> {
779
780    /**
781     * Must be called by the test subscriber when it has received an`onNext` event.
782     */
783    void registerOnNext(T element);
784
785    /**
786     * Must be called by the test subscriber when it has received an `onComplete` event.
787     */
788    void registerOnComplete();
789
790    /**
791     * Must be called by the test subscriber when it has received an `onError` event.
792     */
793    void registerOnError(Throwable cause);
794
795  }
796
797  /**
798   * Implement this puppet in your Whitebox style tests.
799   * The test suite will invoke the specific trigger/signal methods requesting you to execute the specific action.
800   * Since this is a whitebox style test, you're allowed and expected to use knowladge about your implementation to
801   * make implement these calls.
802   */
803  public interface SubscriberPuppet {
804
805    /**
806     * Ensure that at least {@code elements} are eventually requested by your {@link Subscriber}, if it hasn't already
807     * requested that many elements.
808     * <p>
809     * This does not necessarily have to correlate 1:1 with a {@code Subscription.request(elements)} call, but the sum
810     * of the elements requested by your {@code Subscriber} must eventually be at least the the sum of the elements
811     * triggered to be requested by all the invocations of this method.
812     * <p>
813     * Additionally, subscribers are permitted to delay requesting elements until previous requests for elements have
814     * been fulfilled. For example, a subscriber that only requests one element at a time may fulfill the request made
815     * by this method by requesting one element {@code elements} times, waiting for each element to arrive before the
816     * next request is made.
817     * <p>
818     * Before sending any element to the subscriber, the TCK must wait for the subscriber to request that element, and
819     * must be prepared for the subscriber to only request one element at a time, it is not enough for the TCK to
820     * simply invoke this method before sending elements.
821     * <p>
822     * An invocation of {@link #signalCancel()} may be coalesced into any elements that have not yet been requested,
823     * such that only a cancel signal is emitted.
824     */
825    void triggerRequest(long elements);
826
827    /**
828     * Trigger {@code cancel()} on your {@link Subscriber}.
829     * <p>
830     * An invocation of this method may be coalesced into any outstanding requests, as requested by
831     * {@link #triggerRequest(long)}, such that only a cancel signal is emitted.
832     */
833    void signalCancel();
834  }
835
836  public void notVerified() {
837    throw new SkipException("Not verified using this TCK.");
838  }
839
840  public void notVerified(String msg) {
841    throw new SkipException(msg);
842  }
843}