001package org.reactivestreams.tck;
002
003import org.reactivestreams.Publisher;
004import org.reactivestreams.Subscriber;
005import org.reactivestreams.Subscription;
006import org.reactivestreams.tck.TestEnvironment.ManualPublisher;
007import org.reactivestreams.tck.TestEnvironment.ManualSubscriber;
008import org.reactivestreams.tck.support.Optional;
009import org.reactivestreams.tck.support.SubscriberBlackboxVerificationRules;
010import org.reactivestreams.tck.support.TestException;
011import org.testng.SkipException;
012import org.testng.annotations.AfterClass;
013import org.testng.annotations.BeforeClass;
014import org.testng.annotations.BeforeMethod;
015import org.testng.annotations.Test;
016
017import java.util.concurrent.ExecutorService;
018import java.util.concurrent.Executors;
019
020import static org.reactivestreams.tck.SubscriberWhiteboxVerification.BlackboxSubscriberProxy;
021import static org.testng.Assert.assertTrue;
022
023/**
024 * Provides tests for verifying {@link org.reactivestreams.Subscriber} and {@link org.reactivestreams.Subscription}
025 * specification rules, without any modifications to the tested implementation (also known as "Black Box" testing).
026 *
027 * This verification is NOT able to check many of the rules of the spec, and if you want more
028 * verification of your implementation you'll have to implement {@code org.reactivestreams.tck.SubscriberWhiteboxVerification}
029 * instead.
030 *
031 * @see org.reactivestreams.Subscriber
032 * @see org.reactivestreams.Subscription
033 */
034public abstract class SubscriberBlackboxVerification<T> extends WithHelperPublisher<T> 
035  implements SubscriberBlackboxVerificationRules {
036
037  protected final TestEnvironment env;
038
039  protected SubscriberBlackboxVerification(TestEnvironment env) {
040    this.env = env;
041  }
042
043  // USER API
044
045  /**
046   * This is the main method you must implement in your test incarnation.
047   * It must create a new {@link org.reactivestreams.Subscriber} instance to be subjected to the testing logic.
048   */
049  public abstract Subscriber<T> createSubscriber();
050
051  /**
052   * Override this method if the Subscriber implementation you are verifying
053   * needs an external signal before it signals demand to its Publisher.
054   *
055   * By default this method does nothing.
056   */
057  public void triggerRequest(final Subscriber<? super T> subscriber) {
058
059  }
060
061  // ENV SETUP
062
063  /**
064   * Executor service used by the default provided asynchronous Publisher.
065   * @see #createHelperPublisher(long)
066   */
067  private ExecutorService publisherExecutor;
068  @BeforeClass public void startPublisherExecutorService() { publisherExecutor = Executors.newFixedThreadPool(4); }
069  @AfterClass public void shutdownPublisherExecutorService() { if (publisherExecutor != null) publisherExecutor.shutdown(); }
070  @Override public ExecutorService publisherExecutorService() { return publisherExecutor; }
071
072  ////////////////////// TEST ENV CLEANUP /////////////////////////////////////
073
074  @BeforeMethod
075  public void setUp() throws Exception {
076    env.clearAsyncErrors();
077  }
078
079  ////////////////////// SPEC RULE VERIFICATION ///////////////////////////////
080
081  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.1
082  @Override @Test
083  public void required_spec201_blackbox_mustSignalDemandViaSubscriptionRequest() throws Throwable {
084    blackboxSubscriberTest(new BlackboxTestStageTestRun() {
085      @Override
086      public void run(BlackboxTestStage stage) throws InterruptedException {
087        triggerRequest(stage.subProxy().sub());
088        final long n = stage.expectRequest();// assuming subscriber wants to consume elements...
089
090        // should cope with up to requested number of elements
091        for (int i = 0; i < n; i++)
092          stage.signalNext();
093      }
094    });
095  }
096
097  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.2
098  @Override @Test
099  public void untested_spec202_blackbox_shouldAsynchronouslyDispatch() throws Exception {
100    notVerified(); // cannot be meaningfully tested, or can it?
101  }
102
103  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.3
104  @Override @Test
105  public void required_spec203_blackbox_mustNotCallMethodsOnSubscriptionOrPublisherInOnComplete() throws Throwable {
106    blackboxSubscriberWithoutSetupTest(new BlackboxTestStageTestRun() {
107      @Override
108      public void run(BlackboxTestStage stage) throws Throwable {
109        final Subscription subs = new Subscription() {
110          @Override
111          public void request(long n) {
112            final Optional<StackTraceElement> onCompleteStackTraceElement = env.findCallerMethodInStackTrace("onComplete");
113            if (onCompleteStackTraceElement.isDefined()) {
114              final StackTraceElement stackElem = onCompleteStackTraceElement.get();
115              env.flop(String.format("Subscription::request MUST NOT be called from Subscriber::onComplete (Rule 2.3)! (Caller: %s::%s line %d)",
116                                     stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber()));
117            }
118          }
119
120          @Override
121          public void cancel() {
122            final Optional<StackTraceElement> onCompleteStackElement = env.findCallerMethodInStackTrace("onComplete");
123            if (onCompleteStackElement.isDefined()) {
124              final StackTraceElement stackElem = onCompleteStackElement.get();
125              env.flop(String.format("Subscription::cancel MUST NOT be called from Subscriber::onComplete (Rule 2.3)! (Caller: %s::%s line %d)",
126                                     stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber()));
127            }
128          }
129        };
130
131        final Subscriber<T> sub = createSubscriber();
132        sub.onSubscribe(subs);
133        sub.onComplete();
134
135        env.verifyNoAsyncErrorsNoDelay();
136      }
137    });
138  }
139
140  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.3
141  @Override @Test
142  public void required_spec203_blackbox_mustNotCallMethodsOnSubscriptionOrPublisherInOnError() throws Throwable {
143    blackboxSubscriberWithoutSetupTest(new BlackboxTestStageTestRun() {
144      @Override
145      public void run(BlackboxTestStage stage) throws Throwable {
146        final Subscription subs = new Subscription() {
147          @Override
148          public void request(long n) {
149            Throwable thr = new Throwable();
150            for (StackTraceElement stackElem : thr.getStackTrace()) {
151              if (stackElem.getMethodName().equals("onError")) {
152                env.flop(String.format("Subscription::request MUST NOT be called from Subscriber::onError (Rule 2.3)! (Caller: %s::%s line %d)",
153                                       stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber()));
154              }
155            }
156          }
157
158          @Override
159          public void cancel() {
160            Throwable thr = new Throwable();
161            for (StackTraceElement stackElem : thr.getStackTrace()) {
162              if (stackElem.getMethodName().equals("onError")) {
163                env.flop(String.format("Subscription::cancel MUST NOT be called from Subscriber::onError (Rule 2.3)! (Caller: %s::%s line %d)",
164                                       stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber()));
165              }
166            }
167          }
168        };
169
170        final Subscriber<T> sub = createSubscriber();
171        sub.onSubscribe(subs);
172        sub.onError(new TestException());
173
174        env.verifyNoAsyncErrorsNoDelay();
175      }
176    });
177  }
178
179  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.4
180  @Override @Test
181  public void untested_spec204_blackbox_mustConsiderTheSubscriptionAsCancelledInAfterRecievingOnCompleteOrOnError() throws Exception {
182    notVerified(); // cannot be meaningfully tested, or can it?
183  }
184
185  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.5
186  @Override @Test
187  public void required_spec205_blackbox_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal() throws Exception {
188    new BlackboxTestStage(env) {{
189      // try to subscribe another time, if the subscriber calls `probe.registerOnSubscribe` the test will fail
190      final TestEnvironment.Latch secondSubscriptionCancelled = new TestEnvironment.Latch(env);
191      sub().onSubscribe(
192          new Subscription() {
193            @Override
194            public void request(long elements) {
195              env.flop(String.format("Subscriber %s illegally called `subscription.request(%s)`!", sub(), elements));
196            }
197
198            @Override
199            public void cancel() {
200              secondSubscriptionCancelled.close();
201            }
202
203            @Override
204            public String toString() {
205              return "SecondSubscription(should get cancelled)";
206            }
207          });
208
209      secondSubscriptionCancelled.expectClose("Expected SecondSubscription given to subscriber to be cancelled, but `Subscription.cancel()` was not called.");
210      env.verifyNoAsyncErrorsNoDelay();
211    }};
212  }
213
214  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.6
215  @Override @Test
216  public void untested_spec206_blackbox_mustCallSubscriptionCancelIfItIsNoLongerValid() throws Exception {
217    notVerified(); // cannot be meaningfully tested, or can it?
218  }
219
220  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.7
221  @Override @Test
222  public void untested_spec207_blackbox_mustEnsureAllCallsOnItsSubscriptionTakePlaceFromTheSameThreadOrTakeCareOfSynchronization() throws Exception {
223    notVerified(); // cannot be meaningfully tested, or can it?
224    // the same thread part of the clause can be verified but that is not very useful, or is it?
225  }
226
227  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.8
228  @Override @Test
229  public void untested_spec208_blackbox_mustBePreparedToReceiveOnNextSignalsAfterHavingCalledSubscriptionCancel() throws Throwable {
230    notVerified(); // cannot be meaningfully tested as black box, or can it?
231  }
232
233  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.9
234  @Override @Test
235  public void required_spec209_blackbox_mustBePreparedToReceiveAnOnCompleteSignalWithPrecedingRequestCall() throws Throwable {
236    blackboxSubscriberWithoutSetupTest(new BlackboxTestStageTestRun() {
237      @Override
238      public void run(BlackboxTestStage stage) throws Throwable {
239        final Publisher<T> pub = new Publisher<T>() {
240          @Override public void subscribe(final Subscriber<? super T> s) {
241            s.onSubscribe(new Subscription() {
242              private boolean completed = false;
243
244              @Override public void request(long n) {
245                if (!completed) {
246                  completed = true;
247                  s.onComplete(); // Publisher now realises that it is in fact already completed
248                }
249              }
250
251              @Override public void cancel() {
252                // noop, ignore
253              }
254            });
255          }
256        };
257
258        final Subscriber<T> sub = createSubscriber();
259        final BlackboxSubscriberProxy<T> probe = stage.createBlackboxSubscriberProxy(env, sub);
260
261        pub.subscribe(probe);
262        triggerRequest(sub);
263        probe.expectCompletion();
264        probe.expectNone();
265
266        env.verifyNoAsyncErrorsNoDelay();
267      }
268    });
269  }
270
271  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.9
272  @Override @Test
273  public void required_spec209_blackbox_mustBePreparedToReceiveAnOnCompleteSignalWithoutPrecedingRequestCall() throws Throwable {
274    blackboxSubscriberWithoutSetupTest(new BlackboxTestStageTestRun() {
275      @Override
276      public void run(BlackboxTestStage stage) throws Throwable {
277        final Publisher<T> pub = new Publisher<T>() {
278          @Override
279          public void subscribe(Subscriber<? super T> s) {
280            s.onComplete();
281          }
282        };
283
284        final Subscriber<T> sub = createSubscriber();
285        final BlackboxSubscriberProxy<T> probe = stage.createBlackboxSubscriberProxy(env, sub);
286
287        pub.subscribe(probe);
288        probe.expectCompletion();
289
290        env.verifyNoAsyncErrorsNoDelay();
291      }
292    });
293  }
294
295  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.10
296  @Override @Test
297  public void required_spec210_blackbox_mustBePreparedToReceiveAnOnErrorSignalWithPrecedingRequestCall() throws Throwable {
298    blackboxSubscriberTest(new BlackboxTestStageTestRun() {
299      @Override
300      @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
301      public void run(BlackboxTestStage stage) throws Throwable {
302        stage.sub().onError(new TestException());
303        stage.subProxy().expectError(Throwable.class);
304      }
305    });
306  }
307
308  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.11
309  @Override @Test
310  public void untested_spec211_blackbox_mustMakeSureThatAllCallsOnItsMethodsHappenBeforeTheProcessingOfTheRespectiveEvents() throws Exception {
311    notVerified(); // cannot be meaningfully tested, or can it?
312  }
313
314  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.12
315  @Override @Test
316  public void untested_spec212_blackbox_mustNotCallOnSubscribeMoreThanOnceBasedOnObjectEquality() throws Throwable {
317    notVerified(); // cannot be meaningfully tested as black box, or can it?
318  }
319
320  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.13
321  @Override @Test
322  public void untested_spec213_blackbox_failingOnSignalInvocation() throws Exception {
323    notVerified(); // cannot be meaningfully tested, or can it?
324  }
325
326  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.13
327  @Override @Test
328  public void required_spec213_blackbox_onSubscribe_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable {
329    blackboxSubscriberWithoutSetupTest(new BlackboxTestStageTestRun() {
330      @Override
331      public void run(BlackboxTestStage stage) throws Throwable {
332
333        {
334          final Subscriber<T> sub = createSubscriber();
335          boolean gotNPE = false;
336          try {
337            sub.onSubscribe(null);
338          } catch(final NullPointerException expected) {
339            gotNPE = true;
340          }
341          assertTrue(gotNPE, "onSubscribe(null) did not throw NullPointerException");
342        }
343
344        env.verifyNoAsyncErrorsNoDelay();
345      }
346    });
347  }
348
349  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.13
350  @Override @Test
351  public void required_spec213_blackbox_onNext_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable {
352    blackboxSubscriberWithoutSetupTest(new BlackboxTestStageTestRun() {
353      @Override
354      public void run(BlackboxTestStage stage) throws Throwable {
355        final Subscription subscription = new Subscription() {
356          @Override public void request(final long elements) {}
357          @Override public void cancel() {}
358        };
359
360        {
361          final Subscriber<T> sub = createSubscriber();
362          boolean gotNPE = false;
363          sub.onSubscribe(subscription);
364          try {
365            sub.onNext(null);
366          } catch(final NullPointerException expected) {
367            gotNPE = true;
368          }
369          assertTrue(gotNPE, "onNext(null) did not throw NullPointerException");
370        }
371
372        env.verifyNoAsyncErrorsNoDelay();
373      }
374    });
375  }
376
377  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.13
378  @Override @Test
379  public void required_spec213_blackbox_onError_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable {
380    blackboxSubscriberWithoutSetupTest(new BlackboxTestStageTestRun() {
381      @Override
382      public void run(BlackboxTestStage stage) throws Throwable {
383        final Subscription subscription = new Subscription() {
384          @Override public void request(final long elements) {}
385          @Override public void cancel() {}
386        };
387
388        {
389          final Subscriber<T> sub = createSubscriber();
390          boolean gotNPE = false;
391          sub.onSubscribe(subscription);
392          try {
393            sub.onError(null);
394          } catch(final NullPointerException expected) {
395            gotNPE = true;
396          }
397          assertTrue(gotNPE, "onError(null) did not throw NullPointerException");
398        }
399
400        env.verifyNoAsyncErrorsNoDelay();
401      }
402    });
403  }
404
405  ////////////////////// SUBSCRIPTION SPEC RULE VERIFICATION //////////////////
406
407  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.1
408  @Override @Test
409  public void untested_spec301_blackbox_mustNotBeCalledOutsideSubscriberContext() throws Exception {
410    notVerified(); // cannot be meaningfully tested, or can it?
411  }
412
413  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.8
414  @Override @Test
415  public void untested_spec308_blackbox_requestMustRegisterGivenNumberElementsToBeProduced() throws Throwable {
416    notVerified(); // cannot be meaningfully tested as black box, or can it?
417  }
418
419  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.10
420  @Override @Test
421  public void untested_spec310_blackbox_requestMaySynchronouslyCallOnNextOnSubscriber() throws Exception {
422    notVerified(); // cannot be meaningfully tested, or can it?
423  }
424
425  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.11
426  @Override @Test
427  public void untested_spec311_blackbox_requestMaySynchronouslyCallOnCompleteOrOnError() throws Exception {
428    notVerified(); // cannot be meaningfully tested, or can it?
429  }
430
431  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.14
432  @Override @Test
433  public void untested_spec314_blackbox_cancelMayCauseThePublisherToShutdownIfNoOtherSubscriptionExists() throws Exception {
434    notVerified(); // cannot be meaningfully tested, or can it?
435  }
436
437  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.15
438  @Override @Test
439  public void untested_spec315_blackbox_cancelMustNotThrowExceptionAndMustSignalOnError() throws Exception {
440    notVerified(); // cannot be meaningfully tested, or can it?
441  }
442
443  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.16
444  @Override @Test
445  public void untested_spec316_blackbox_requestMustNotThrowExceptionAndMustOnErrorTheSubscriber() throws Exception {
446    notVerified(); // cannot be meaningfully tested, or can it?
447  }
448
449  /////////////////////// ADDITIONAL "COROLLARY" TESTS ////////////////////////
450
451  /////////////////////// TEST INFRASTRUCTURE /////////////////////////////////
452
453  abstract class BlackboxTestStageTestRun {
454    public abstract void run(BlackboxTestStage stage) throws Throwable;
455  }
456
457  public void blackboxSubscriberTest(BlackboxTestStageTestRun body) throws Throwable {
458    BlackboxTestStage stage = new BlackboxTestStage(env, true);
459    body.run(stage);
460  }
461
462  public void blackboxSubscriberWithoutSetupTest(BlackboxTestStageTestRun body) throws Throwable {
463    BlackboxTestStage stage = new BlackboxTestStage(env, false);
464    body.run(stage);
465  }
466
467  public class BlackboxTestStage extends ManualPublisher<T> {
468    public Publisher<T> pub;
469    public ManualSubscriber<T> tees; // gives us access to an infinite stream of T values
470
471    public T lastT = null;
472    private Optional<BlackboxSubscriberProxy<T>> subProxy = Optional.empty();
473
474    public BlackboxTestStage(TestEnvironment env) throws InterruptedException {
475      this(env, true);
476    }
477
478    public BlackboxTestStage(TestEnvironment env, boolean runDefaultInit) throws InterruptedException {
479      super(env);
480      if (runDefaultInit) {
481        pub = this.createHelperPublisher(Long.MAX_VALUE);
482        tees = env.newManualSubscriber(pub);
483        Subscriber<T> sub = createSubscriber();
484        subProxy = Optional.of(createBlackboxSubscriberProxy(env, sub));
485        subscribe(subProxy.get());
486      }
487    }
488
489    public Subscriber<? super T> sub() {
490      return subscriber.value();
491    }
492
493    /**
494     * Proxy for the {@link #sub()} {@code Subscriber}, providing certain assertions on methods being called on the Subscriber.
495     */
496    public BlackboxSubscriberProxy<T> subProxy() {
497      return subProxy.get();
498    }
499
500    public Publisher<T> createHelperPublisher(long elements) {
501      return SubscriberBlackboxVerification.this.createHelperPublisher(elements);
502    }
503
504    public BlackboxSubscriberProxy<T> createBlackboxSubscriberProxy(TestEnvironment env, Subscriber<T> sub) {
505      return new BlackboxSubscriberProxy<T>(env, sub);
506    }
507
508    public T signalNext() throws InterruptedException {
509      T element = nextT();
510      sendNext(element);
511      return element;
512    }
513
514    public T nextT() throws InterruptedException {
515      lastT = tees.requestNextElement();
516      return lastT;
517    }
518
519  }
520
521  public void notVerified() {
522    throw new SkipException("Not verified using this TCK.");
523  }
524}