001/***************************************************
002 * Licensed under MIT No Attribution (SPDX: MIT-0) *
003 ***************************************************/
004
005package org.reactivestreams.tck;
006
007import org.reactivestreams.Publisher;
008import org.reactivestreams.Subscriber;
009import org.reactivestreams.Subscription;
010import org.reactivestreams.tck.TestEnvironment.ManualPublisher;
011import org.reactivestreams.tck.TestEnvironment.ManualSubscriber;
012import org.reactivestreams.tck.flow.support.Optional;
013import org.reactivestreams.tck.flow.support.SubscriberBlackboxVerificationRules;
014import org.reactivestreams.tck.flow.support.TestException;
015import org.testng.SkipException;
016import org.testng.annotations.AfterClass;
017import org.testng.annotations.BeforeClass;
018import org.testng.annotations.BeforeMethod;
019import org.testng.annotations.Test;
020
021import java.util.concurrent.ExecutorService;
022import java.util.concurrent.Executors;
023
024import static org.reactivestreams.tck.SubscriberWhiteboxVerification.BlackboxSubscriberProxy;
025import static org.testng.Assert.assertTrue;
026
027/**
028 * Provides tests for verifying {@link org.reactivestreams.Subscriber} and {@link org.reactivestreams.Subscription}
029 * specification rules, without any modifications to the tested implementation (also known as "Black Box" testing).
030 *
031 * This verification is NOT able to check many of the rules of the spec, and if you want more
032 * verification of your implementation you'll have to implement {@code org.reactivestreams.tck.SubscriberWhiteboxVerification}
033 * instead.
034 *
035 * @see org.reactivestreams.Subscriber
036 * @see org.reactivestreams.Subscription
037 */
038public abstract class SubscriberBlackboxVerification<T> extends WithHelperPublisher<T>
039  implements SubscriberBlackboxVerificationRules {
040
041  protected final TestEnvironment env;
042
043  protected SubscriberBlackboxVerification(TestEnvironment env) {
044    this.env = env;
045  }
046
047  // USER API
048
049  /**
050   * This is the main method you must implement in your test incarnation.
051   * It must create a new {@link org.reactivestreams.Subscriber} instance to be subjected to the testing logic.
052   */
053  public abstract Subscriber<T> createSubscriber();
054
055  /**
056   * Override this method if the Subscriber implementation you are verifying
057   * needs an external signal before it signals demand to its Publisher.
058   *
059   * By default this method does nothing.
060   */
061  public void triggerRequest(final Subscriber<? super T> subscriber) {
062    // this method is intentionally left blank
063  }
064
065  // ENV SETUP
066
067  /**
068   * Executor service used by the default provided asynchronous Publisher.
069   * @see #createHelperPublisher(long)
070   */
071  private ExecutorService publisherExecutor;
072  @BeforeClass public void startPublisherExecutorService() { publisherExecutor = Executors.newFixedThreadPool(4); }
073  @AfterClass public void shutdownPublisherExecutorService() { if (publisherExecutor != null) publisherExecutor.shutdown(); }
074  @Override public ExecutorService publisherExecutorService() { return publisherExecutor; }
075
076  ////////////////////// TEST ENV CLEANUP /////////////////////////////////////
077
078  @BeforeMethod
079  public void setUp() throws Exception {
080    env.clearAsyncErrors();
081  }
082
083  ////////////////////// SPEC RULE VERIFICATION ///////////////////////////////
084
085  @Override @Test
086  public void required_spec201_blackbox_mustSignalDemandViaSubscriptionRequest() throws Throwable {
087    blackboxSubscriberTest(new BlackboxTestStageTestRun() {
088      @Override
089      public void run(BlackboxTestStage stage) throws InterruptedException {
090        triggerRequest(stage.subProxy().sub());
091        final long requested = stage.expectRequest();// assuming subscriber wants to consume elements...
092        final long signalsToEmit = Math.min(requested, 512); // protecting against Subscriber which sends ridiculous large demand
093
094        // should cope with up to requested number of elements
095        for (int i = 0; i < signalsToEmit && sampleIsCancelled(stage, i, 10); i++)
096          stage.signalNext();
097
098        // we complete after `signalsToEmit` (which can be less than `requested`),
099        // which is legal under https://github.com/reactive-streams/reactive-streams-jvm#1.2
100        stage.sendCompletion();
101      }
102
103      /**
104       * In order to allow some "skid" and not check state on each iteration,
105       * only check {@code stage.isCancelled} every {@code checkInterval}'th iteration.
106       */
107      private boolean sampleIsCancelled(BlackboxTestStage stage, int i, int checkInterval) throws InterruptedException {
108        if (i % checkInterval == 0) return stage.isCancelled();
109        else return false;
110      }
111    });
112  }
113
114  @Override @Test
115  public void untested_spec202_blackbox_shouldAsynchronouslyDispatch() throws Exception {
116    notVerified(); // cannot be meaningfully tested, or can it?
117  }
118
119  @Override @Test
120  public void required_spec203_blackbox_mustNotCallMethodsOnSubscriptionOrPublisherInOnComplete() throws Throwable {
121    blackboxSubscriberWithoutSetupTest(new BlackboxTestStageTestRun() {
122      @Override
123      public void run(BlackboxTestStage stage) throws Throwable {
124        final String onCompleteMethod = "required_spec203_blackbox_mustNotCallMethodsOnSubscriptionOrPublisherInOnComplete_call";
125
126        final Subscription subs = new Subscription() {
127          @Override
128          public void request(long n) {
129            final Optional<StackTraceElement> onCompleteStackTraceElement = env.findCallerMethodInStackTrace(onCompleteMethod);
130            if (onCompleteStackTraceElement.isDefined()) {
131              final StackTraceElement stackElem = onCompleteStackTraceElement.get();
132              env.flop(String.format("Subscription::request MUST NOT be called from Subscriber::onComplete (Rule 2.3)! (Caller: %s::%s line %d)",
133                                     stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber()));
134            }
135          }
136
137          @Override
138          public void cancel() {
139            final Optional<StackTraceElement> onCompleteStackElement = env.findCallerMethodInStackTrace(onCompleteMethod);
140            if (onCompleteStackElement.isDefined()) {
141              final StackTraceElement stackElem = onCompleteStackElement.get();
142              env.flop(String.format("Subscription::cancel MUST NOT be called from Subscriber::onComplete (Rule 2.3)! (Caller: %s::%s line %d)",
143                                     stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber()));
144            }
145          }
146        };
147
148        final Subscriber<T> sub = createSubscriber();
149        sub.onSubscribe(subs);
150        required_spec203_blackbox_mustNotCallMethodsOnSubscriptionOrPublisherInOnComplete_call(sub);
151
152        env.verifyNoAsyncErrorsNoDelay();
153      }
154
155      /** Makes sure the onComplete is initiated with a recognizable stacktrace element on the current thread. */
156      void required_spec203_blackbox_mustNotCallMethodsOnSubscriptionOrPublisherInOnComplete_call(Subscriber<?> sub) {
157        sub.onComplete();
158      }
159    });
160  }
161
162  @Override @Test
163  public void required_spec203_blackbox_mustNotCallMethodsOnSubscriptionOrPublisherInOnError() throws Throwable {
164    blackboxSubscriberWithoutSetupTest(new BlackboxTestStageTestRun() {
165      @Override
166      public void run(BlackboxTestStage stage) throws Throwable {
167        final String onErrorMethod = "required_spec203_blackbox_mustNotCallMethodsOnSubscriptionOrPublisherInOnError_call";
168
169        final Subscription subs = new Subscription() {
170          @Override
171          public void request(long n) {
172            final Optional<StackTraceElement> onCompleteStackElement = env.findCallerMethodInStackTrace(onErrorMethod);
173            if (onCompleteStackElement.isDefined()) {
174              final StackTraceElement stackElem = onCompleteStackElement.get();
175              env.flop(String.format("Subscription::request MUST NOT be called from Subscriber::onError (Rule 2.3)! (Caller: %s::%s line %d)",
176                                     stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber()));
177            }
178          }
179
180          @Override
181          public void cancel() {
182            final Optional<StackTraceElement> onCompleteStackElement = env.findCallerMethodInStackTrace(onErrorMethod);
183            if (onCompleteStackElement.isDefined()) {
184              final StackTraceElement stackElem = onCompleteStackElement.get();
185              env.flop(String.format("Subscription::cancel MUST NOT be called from Subscriber::onError (Rule 2.3)! (Caller: %s::%s line %d)",
186                                     stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber()));
187            }
188          }
189        };
190
191        final Subscriber<T> sub = createSubscriber();
192        sub.onSubscribe(subs);
193        required_spec203_blackbox_mustNotCallMethodsOnSubscriptionOrPublisherInOnError_call(sub);
194
195        env.verifyNoAsyncErrorsNoDelay();
196      }
197
198      /** Makes sure the onError is initiated with a recognizable stacktrace element on the current thread. */
199      void required_spec203_blackbox_mustNotCallMethodsOnSubscriptionOrPublisherInOnError_call(Subscriber<?> sub) {
200        sub.onError(new TestException());
201      }
202    });
203  }
204
205  @Override @Test
206  public void untested_spec204_blackbox_mustConsiderTheSubscriptionAsCancelledInAfterRecievingOnCompleteOrOnError() throws Exception {
207    notVerified(); // cannot be meaningfully tested, or can it?
208  }
209
210  @Override @Test
211  public void required_spec205_blackbox_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal() throws Exception {
212    new BlackboxTestStage(env) {{
213      // try to subscribe another time, if the subscriber calls `probe.registerOnSubscribe` the test will fail
214      final TestEnvironment.Latch secondSubscriptionCancelled = new TestEnvironment.Latch(env);
215      sub().onSubscribe(
216          new Subscription() {
217            @Override
218            public void request(long elements) {
219              env.flop(String.format("Subscriber %s illegally called `subscription.request(%s)`!", sub(), elements));
220            }
221
222            @Override
223            public void cancel() {
224              secondSubscriptionCancelled.close();
225            }
226
227            @Override
228            public String toString() {
229              return "SecondSubscription(should get cancelled)";
230            }
231          });
232
233      secondSubscriptionCancelled.expectClose("Expected SecondSubscription given to subscriber to be cancelled, but `Subscription.cancel()` was not called.");
234      env.verifyNoAsyncErrorsNoDelay();
235      sendCompletion(); // we're done, complete the subscriber under test
236    }};
237  }
238
239  @Override @Test
240  public void untested_spec206_blackbox_mustCallSubscriptionCancelIfItIsNoLongerValid() throws Exception {
241    notVerified(); // cannot be meaningfully tested, or can it?
242  }
243
244  @Override @Test
245  public void untested_spec207_blackbox_mustEnsureAllCallsOnItsSubscriptionTakePlaceFromTheSameThreadOrTakeCareOfSynchronization() throws Exception {
246    notVerified(); // cannot be meaningfully tested, or can it?
247    // the same thread part of the clause can be verified but that is not very useful, or is it?
248  }
249
250  @Override @Test
251  public void untested_spec208_blackbox_mustBePreparedToReceiveOnNextSignalsAfterHavingCalledSubscriptionCancel() throws Throwable {
252    notVerified(); // cannot be meaningfully tested as black box, or can it?
253  }
254
255  @Override @Test
256  public void required_spec209_blackbox_mustBePreparedToReceiveAnOnCompleteSignalWithPrecedingRequestCall() throws Throwable {
257    blackboxSubscriberTest(new BlackboxTestStageTestRun() {
258      @Override @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
259      public void run(BlackboxTestStage stage) throws Throwable {
260        triggerRequest(stage.subProxy().sub());
261        final long notUsed = stage.expectRequest(); // received request signal
262        stage.sub().onComplete();
263        stage.subProxy().expectCompletion();
264      }
265    });
266  }
267
268  @Override @Test
269  public void required_spec209_blackbox_mustBePreparedToReceiveAnOnCompleteSignalWithoutPrecedingRequestCall() throws Throwable {
270    blackboxSubscriberTest(new BlackboxTestStageTestRun() {
271      @Override @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
272      public void run(BlackboxTestStage stage) throws Throwable {
273        final Subscriber<? super T> sub = stage.sub();
274        sub.onComplete();
275        stage.subProxy().expectCompletion();
276      }
277    });
278  }
279
280  @Override @Test
281  public void required_spec210_blackbox_mustBePreparedToReceiveAnOnErrorSignalWithPrecedingRequestCall() throws Throwable {
282    blackboxSubscriberTest(new BlackboxTestStageTestRun() {
283      @Override @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
284      public void run(BlackboxTestStage stage) throws Throwable {
285        triggerRequest(stage.subProxy().sub());
286        final long notUsed = stage.expectRequest(); // received request signal
287        stage.sub().onError(new TestException()); // in response to that, we fail
288        stage.subProxy().expectError(Throwable.class);
289      }
290    });
291  }
292
293  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.10
294  @Override @Test
295  public void required_spec210_blackbox_mustBePreparedToReceiveAnOnErrorSignalWithoutPrecedingRequestCall() throws Throwable {
296    blackboxSubscriberTest(new BlackboxTestStageTestRun() {
297      @Override @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
298      public void run(BlackboxTestStage stage) throws Throwable {
299
300        stage.sub().onError(new TestException());
301        stage.subProxy().expectError(Throwable.class);
302      }
303    });
304  }
305
306  @Override @Test
307  public void untested_spec211_blackbox_mustMakeSureThatAllCallsOnItsMethodsHappenBeforeTheProcessingOfTheRespectiveEvents() throws Exception {
308    notVerified(); // cannot be meaningfully tested, or can it?
309  }
310
311  @Override @Test
312  public void untested_spec212_blackbox_mustNotCallOnSubscribeMoreThanOnceBasedOnObjectEquality() throws Throwable {
313    notVerified(); // cannot be meaningfully tested as black box, or can it?
314  }
315
316  @Override @Test
317  public void untested_spec213_blackbox_failingOnSignalInvocation() throws Exception {
318    notVerified(); // cannot be meaningfully tested, or can it?
319  }
320
321  @Override @Test
322  public void required_spec213_blackbox_onSubscribe_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable {
323    blackboxSubscriberWithoutSetupTest(new BlackboxTestStageTestRun() {
324      @Override
325      public void run(BlackboxTestStage stage) throws Throwable {
326
327        {
328          final Subscriber<T> sub = createSubscriber();
329          boolean gotNPE = false;
330          try {
331            sub.onSubscribe(null);
332          } catch(final NullPointerException expected) {
333            gotNPE = true;
334          }
335          assertTrue(gotNPE, "onSubscribe(null) did not throw NullPointerException");
336        }
337
338        env.verifyNoAsyncErrorsNoDelay();
339      }
340    });
341  }
342
343  @Override @Test
344  public void required_spec213_blackbox_onNext_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable {
345    blackboxSubscriberWithoutSetupTest(new BlackboxTestStageTestRun() {
346      @Override
347      public void run(BlackboxTestStage stage) throws Throwable {
348        final Subscription subscription = new Subscription() {
349          @Override public void request(final long elements) {}
350          @Override public void cancel() {}
351        };
352
353        {
354          final Subscriber<T> sub = createSubscriber();
355          boolean gotNPE = false;
356          sub.onSubscribe(subscription);
357          try {
358            sub.onNext(null);
359          } catch(final NullPointerException expected) {
360            gotNPE = true;
361          }
362          assertTrue(gotNPE, "onNext(null) did not throw NullPointerException");
363        }
364
365        env.verifyNoAsyncErrorsNoDelay();
366      }
367    });
368  }
369
370  @Override @Test
371  public void required_spec213_blackbox_onError_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable {
372    blackboxSubscriberWithoutSetupTest(new BlackboxTestStageTestRun() {
373      @Override
374      public void run(BlackboxTestStage stage) throws Throwable {
375        final Subscription subscription = new Subscription() {
376          @Override public void request(final long elements) {}
377          @Override public void cancel() {}
378        };
379
380        {
381          final Subscriber<T> sub = createSubscriber();
382          boolean gotNPE = false;
383          sub.onSubscribe(subscription);
384          try {
385            sub.onError(null);
386          } catch(final NullPointerException expected) {
387            gotNPE = true;
388          }
389          assertTrue(gotNPE, "onError(null) did not throw NullPointerException");
390        }
391
392        env.verifyNoAsyncErrorsNoDelay();
393      }
394    });
395  }
396
397  ////////////////////// SUBSCRIPTION SPEC RULE VERIFICATION //////////////////
398
399  @Override @Test
400  public void untested_spec301_blackbox_mustNotBeCalledOutsideSubscriberContext() throws Exception {
401    notVerified(); // cannot be meaningfully tested, or can it?
402  }
403
404  @Override @Test
405  public void untested_spec308_blackbox_requestMustRegisterGivenNumberElementsToBeProduced() throws Throwable {
406    notVerified(); // cannot be meaningfully tested as black box, or can it?
407  }
408
409  @Override @Test
410  public void untested_spec310_blackbox_requestMaySynchronouslyCallOnNextOnSubscriber() throws Exception {
411    notVerified(); // cannot be meaningfully tested, or can it?
412  }
413
414  @Override @Test
415  public void untested_spec311_blackbox_requestMaySynchronouslyCallOnCompleteOrOnError() throws Exception {
416    notVerified(); // cannot be meaningfully tested, or can it?
417  }
418
419  @Override @Test
420  public void untested_spec314_blackbox_cancelMayCauseThePublisherToShutdownIfNoOtherSubscriptionExists() throws Exception {
421    notVerified(); // cannot be meaningfully tested, or can it?
422  }
423
424  @Override @Test
425  public void untested_spec315_blackbox_cancelMustNotThrowExceptionAndMustSignalOnError() throws Exception {
426    notVerified(); // cannot be meaningfully tested, or can it?
427  }
428
429  @Override @Test
430  public void untested_spec316_blackbox_requestMustNotThrowExceptionAndMustOnErrorTheSubscriber() throws Exception {
431    notVerified(); // cannot be meaningfully tested, or can it?
432  }
433
434  /////////////////////// ADDITIONAL "COROLLARY" TESTS ////////////////////////
435
436  /////////////////////// TEST INFRASTRUCTURE /////////////////////////////////
437
438  abstract class BlackboxTestStageTestRun {
439    public abstract void run(BlackboxTestStage stage) throws Throwable;
440  }
441
442  public void blackboxSubscriberTest(BlackboxTestStageTestRun body) throws Throwable {
443    BlackboxTestStage stage = new BlackboxTestStage(env, true);
444    body.run(stage);
445  }
446
447  public void blackboxSubscriberWithoutSetupTest(BlackboxTestStageTestRun body) throws Throwable {
448    BlackboxTestStage stage = new BlackboxTestStage(env, false);
449    body.run(stage);
450  }
451
452  public class BlackboxTestStage extends ManualPublisher<T> {
453    public Publisher<T> pub;
454    public ManualSubscriber<T> tees; // gives us access to an infinite stream of T values
455
456    public T lastT = null;
457    private Optional<BlackboxSubscriberProxy<T>> subProxy = Optional.empty();
458
459    public BlackboxTestStage(TestEnvironment env) throws InterruptedException {
460      this(env, true);
461    }
462
463    public BlackboxTestStage(TestEnvironment env, boolean runDefaultInit) throws InterruptedException {
464      super(env);
465      if (runDefaultInit) {
466        pub = this.createHelperPublisher(Long.MAX_VALUE);
467        tees = env.newManualSubscriber(pub);
468        Subscriber<T> sub = createSubscriber();
469        subProxy = Optional.of(createBlackboxSubscriberProxy(env, sub));
470        subscribe(subProxy.get());
471      }
472    }
473
474    public Subscriber<? super T> sub() {
475      return subscriber.value();
476    }
477
478    /**
479     * Proxy for the {@link #sub()} {@code Subscriber}, providing certain assertions on methods being called on the Subscriber.
480     */
481    public BlackboxSubscriberProxy<T> subProxy() {
482      return subProxy.get();
483    }
484
485    public Publisher<T> createHelperPublisher(long elements) {
486      return SubscriberBlackboxVerification.this.createHelperPublisher(elements);
487    }
488
489    public BlackboxSubscriberProxy<T> createBlackboxSubscriberProxy(TestEnvironment env, Subscriber<T> sub) {
490      return new BlackboxSubscriberProxy<T>(env, sub);
491    }
492
493    public T signalNext() throws InterruptedException {
494      T element = nextT();
495      sendNext(element);
496      return element;
497    }
498
499    public T nextT() throws InterruptedException {
500      lastT = tees.requestNextElement();
501      return lastT;
502    }
503
504  }
505
506  public void notVerified() {
507    throw new SkipException("Not verified using this TCK.");
508  }
509}