001/************************************************************************
002 * Licensed under Public Domain (CC0)                                    *
003 *                                                                       *
004 * To the extent possible under law, the person who associated CC0 with  *
005 * this code has waived all copyright and related or neighboring         *
006 * rights to this code.                                                  *
007 *                                                                       *
008 * You should have received a copy of the CC0 legalcode along with this  *
009 * work. If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.*
010 ************************************************************************/
011
012package org.reactivestreams.tck;
013
014import org.reactivestreams.Publisher;
015import org.reactivestreams.Subscriber;
016import org.reactivestreams.Subscription;
017import org.reactivestreams.tck.TestEnvironment.ManualPublisher;
018import org.reactivestreams.tck.TestEnvironment.ManualSubscriber;
019import org.reactivestreams.tck.flow.support.Optional;
020import org.reactivestreams.tck.flow.support.SubscriberBlackboxVerificationRules;
021import org.reactivestreams.tck.flow.support.TestException;
022import org.testng.SkipException;
023import org.testng.annotations.AfterClass;
024import org.testng.annotations.BeforeClass;
025import org.testng.annotations.BeforeMethod;
026import org.testng.annotations.Test;
027
028import java.util.concurrent.ExecutorService;
029import java.util.concurrent.Executors;
030
031import static org.reactivestreams.tck.SubscriberWhiteboxVerification.BlackboxSubscriberProxy;
032import static org.testng.Assert.assertTrue;
033
034/**
035 * Provides tests for verifying {@link org.reactivestreams.Subscriber} and {@link org.reactivestreams.Subscription}
036 * specification rules, without any modifications to the tested implementation (also known as "Black Box" testing).
037 *
038 * This verification is NOT able to check many of the rules of the spec, and if you want more
039 * verification of your implementation you'll have to implement {@code org.reactivestreams.tck.SubscriberWhiteboxVerification}
040 * instead.
041 *
042 * @see org.reactivestreams.Subscriber
043 * @see org.reactivestreams.Subscription
044 */
045public abstract class SubscriberBlackboxVerification<T> extends WithHelperPublisher<T>
046  implements SubscriberBlackboxVerificationRules {
047
048  protected final TestEnvironment env;
049
050  protected SubscriberBlackboxVerification(TestEnvironment env) {
051    this.env = env;
052  }
053
054  // USER API
055
056  /**
057   * This is the main method you must implement in your test incarnation.
058   * It must create a new {@link org.reactivestreams.Subscriber} instance to be subjected to the testing logic.
059   */
060  public abstract Subscriber<T> createSubscriber();
061
062  /**
063   * Override this method if the Subscriber implementation you are verifying
064   * needs an external signal before it signals demand to its Publisher.
065   *
066   * By default this method does nothing.
067   */
068  public void triggerRequest(final Subscriber<? super T> subscriber) {
069    // this method is intentionally left blank
070  }
071
072  // ENV SETUP
073
074  /**
075   * Executor service used by the default provided asynchronous Publisher.
076   * @see #createHelperPublisher(long)
077   */
078  private ExecutorService publisherExecutor;
079  @BeforeClass public void startPublisherExecutorService() { publisherExecutor = Executors.newFixedThreadPool(4); }
080  @AfterClass public void shutdownPublisherExecutorService() { if (publisherExecutor != null) publisherExecutor.shutdown(); }
081  @Override public ExecutorService publisherExecutorService() { return publisherExecutor; }
082
083  ////////////////////// TEST ENV CLEANUP /////////////////////////////////////
084
085  @BeforeMethod
086  public void setUp() throws Exception {
087    env.clearAsyncErrors();
088  }
089
090  ////////////////////// SPEC RULE VERIFICATION ///////////////////////////////
091
092  @Override @Test
093  public void required_spec201_blackbox_mustSignalDemandViaSubscriptionRequest() throws Throwable {
094    blackboxSubscriberTest(new BlackboxTestStageTestRun() {
095      @Override
096      public void run(BlackboxTestStage stage) throws InterruptedException {
097        triggerRequest(stage.subProxy().sub());
098        final long requested = stage.expectRequest();// assuming subscriber wants to consume elements...
099        final long signalsToEmit = Math.min(requested, 512); // protecting against Subscriber which sends ridiculous large demand
100
101        // should cope with up to requested number of elements
102        for (int i = 0; i < signalsToEmit && sampleIsCancelled(stage, i, 10); i++)
103          stage.signalNext();
104
105        // we complete after `signalsToEmit` (which can be less than `requested`),
106        // which is legal under https://github.com/reactive-streams/reactive-streams-jvm#1.2
107        stage.sendCompletion();
108      }
109
110      /**
111       * In order to allow some "skid" and not check state on each iteration,
112       * only check {@code stage.isCancelled} every {@code checkInterval}'th iteration.
113       */
114      private boolean sampleIsCancelled(BlackboxTestStage stage, int i, int checkInterval) throws InterruptedException {
115        if (i % checkInterval == 0) return stage.isCancelled();
116        else return false;
117      }
118    });
119  }
120
121  @Override @Test
122  public void untested_spec202_blackbox_shouldAsynchronouslyDispatch() throws Exception {
123    notVerified(); // cannot be meaningfully tested, or can it?
124  }
125
126  @Override @Test
127  public void required_spec203_blackbox_mustNotCallMethodsOnSubscriptionOrPublisherInOnComplete() throws Throwable {
128    blackboxSubscriberWithoutSetupTest(new BlackboxTestStageTestRun() {
129      @Override
130      public void run(BlackboxTestStage stage) throws Throwable {
131        final Subscription subs = new Subscription() {
132          @Override
133          public void request(long n) {
134            final Optional<StackTraceElement> onCompleteStackTraceElement = env.findCallerMethodInStackTrace("onComplete");
135            if (onCompleteStackTraceElement.isDefined()) {
136              final StackTraceElement stackElem = onCompleteStackTraceElement.get();
137              env.flop(String.format("Subscription::request MUST NOT be called from Subscriber::onComplete (Rule 2.3)! (Caller: %s::%s line %d)",
138                                     stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber()));
139            }
140          }
141
142          @Override
143          public void cancel() {
144            final Optional<StackTraceElement> onCompleteStackElement = env.findCallerMethodInStackTrace("onComplete");
145            if (onCompleteStackElement.isDefined()) {
146              final StackTraceElement stackElem = onCompleteStackElement.get();
147              env.flop(String.format("Subscription::cancel MUST NOT be called from Subscriber::onComplete (Rule 2.3)! (Caller: %s::%s line %d)",
148                                     stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber()));
149            }
150          }
151        };
152
153        final Subscriber<T> sub = createSubscriber();
154        sub.onSubscribe(subs);
155        sub.onComplete();
156
157        env.verifyNoAsyncErrorsNoDelay();
158      }
159    });
160  }
161
162  @Override @Test
163  public void required_spec203_blackbox_mustNotCallMethodsOnSubscriptionOrPublisherInOnError() throws Throwable {
164    blackboxSubscriberWithoutSetupTest(new BlackboxTestStageTestRun() {
165      @Override
166      public void run(BlackboxTestStage stage) throws Throwable {
167        final Subscription subs = new Subscription() {
168          @Override
169          public void request(long n) {
170            Throwable thr = new Throwable();
171            for (StackTraceElement stackElem : thr.getStackTrace()) {
172              if (stackElem.getMethodName().equals("onError")) {
173                env.flop(String.format("Subscription::request MUST NOT be called from Subscriber::onError (Rule 2.3)! (Caller: %s::%s line %d)",
174                                       stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber()));
175              }
176            }
177          }
178
179          @Override
180          public void cancel() {
181            Throwable thr = new Throwable();
182            for (StackTraceElement stackElem : thr.getStackTrace()) {
183              if (stackElem.getMethodName().equals("onError")) {
184                env.flop(String.format("Subscription::cancel MUST NOT be called from Subscriber::onError (Rule 2.3)! (Caller: %s::%s line %d)",
185                                       stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber()));
186              }
187            }
188          }
189        };
190
191        final Subscriber<T> sub = createSubscriber();
192        sub.onSubscribe(subs);
193        sub.onError(new TestException());
194
195        env.verifyNoAsyncErrorsNoDelay();
196      }
197    });
198  }
199
200  @Override @Test
201  public void untested_spec204_blackbox_mustConsiderTheSubscriptionAsCancelledInAfterRecievingOnCompleteOrOnError() throws Exception {
202    notVerified(); // cannot be meaningfully tested, or can it?
203  }
204
205  @Override @Test
206  public void required_spec205_blackbox_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal() throws Exception {
207    new BlackboxTestStage(env) {{
208      // try to subscribe another time, if the subscriber calls `probe.registerOnSubscribe` the test will fail
209      final TestEnvironment.Latch secondSubscriptionCancelled = new TestEnvironment.Latch(env);
210      sub().onSubscribe(
211          new Subscription() {
212            @Override
213            public void request(long elements) {
214              env.flop(String.format("Subscriber %s illegally called `subscription.request(%s)`!", sub(), elements));
215            }
216
217            @Override
218            public void cancel() {
219              secondSubscriptionCancelled.close();
220            }
221
222            @Override
223            public String toString() {
224              return "SecondSubscription(should get cancelled)";
225            }
226          });
227
228      secondSubscriptionCancelled.expectClose("Expected SecondSubscription given to subscriber to be cancelled, but `Subscription.cancel()` was not called.");
229      env.verifyNoAsyncErrorsNoDelay();
230      sendCompletion(); // we're done, complete the subscriber under test
231    }};
232  }
233
234  @Override @Test
235  public void untested_spec206_blackbox_mustCallSubscriptionCancelIfItIsNoLongerValid() throws Exception {
236    notVerified(); // cannot be meaningfully tested, or can it?
237  }
238
239  @Override @Test
240  public void untested_spec207_blackbox_mustEnsureAllCallsOnItsSubscriptionTakePlaceFromTheSameThreadOrTakeCareOfSynchronization() throws Exception {
241    notVerified(); // cannot be meaningfully tested, or can it?
242    // the same thread part of the clause can be verified but that is not very useful, or is it?
243  }
244
245  @Override @Test
246  public void untested_spec208_blackbox_mustBePreparedToReceiveOnNextSignalsAfterHavingCalledSubscriptionCancel() throws Throwable {
247    notVerified(); // cannot be meaningfully tested as black box, or can it?
248  }
249
250  @Override @Test
251  public void required_spec209_blackbox_mustBePreparedToReceiveAnOnCompleteSignalWithPrecedingRequestCall() throws Throwable {
252    blackboxSubscriberTest(new BlackboxTestStageTestRun() {
253      @Override @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
254      public void run(BlackboxTestStage stage) throws Throwable {
255        triggerRequest(stage.subProxy().sub());
256        final long notUsed = stage.expectRequest(); // received request signal
257        stage.sub().onComplete();
258        stage.subProxy().expectCompletion();
259      }
260    });
261  }
262
263  @Override @Test
264  public void required_spec209_blackbox_mustBePreparedToReceiveAnOnCompleteSignalWithoutPrecedingRequestCall() throws Throwable {
265    blackboxSubscriberTest(new BlackboxTestStageTestRun() {
266      @Override @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
267      public void run(BlackboxTestStage stage) throws Throwable {
268        final Subscriber<? super T> sub = stage.sub();
269        sub.onComplete();
270        stage.subProxy().expectCompletion();
271      }
272    });
273  }
274
275  @Override @Test
276  public void required_spec210_blackbox_mustBePreparedToReceiveAnOnErrorSignalWithPrecedingRequestCall() throws Throwable {
277    blackboxSubscriberTest(new BlackboxTestStageTestRun() {
278      @Override @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
279      public void run(BlackboxTestStage stage) throws Throwable {
280        triggerRequest(stage.subProxy().sub());
281        final long notUsed = stage.expectRequest(); // received request signal
282        stage.sub().onError(new TestException()); // in response to that, we fail
283        stage.subProxy().expectError(Throwable.class);
284      }
285    });
286  }
287
288  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.10
289  @Override @Test
290  public void required_spec210_blackbox_mustBePreparedToReceiveAnOnErrorSignalWithoutPrecedingRequestCall() throws Throwable {
291    blackboxSubscriberTest(new BlackboxTestStageTestRun() {
292      @Override @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
293      public void run(BlackboxTestStage stage) throws Throwable {
294
295        stage.sub().onError(new TestException());
296        stage.subProxy().expectError(Throwable.class);
297      }
298    });
299  }
300
301  @Override @Test
302  public void untested_spec211_blackbox_mustMakeSureThatAllCallsOnItsMethodsHappenBeforeTheProcessingOfTheRespectiveEvents() throws Exception {
303    notVerified(); // cannot be meaningfully tested, or can it?
304  }
305
306  @Override @Test
307  public void untested_spec212_blackbox_mustNotCallOnSubscribeMoreThanOnceBasedOnObjectEquality() throws Throwable {
308    notVerified(); // cannot be meaningfully tested as black box, or can it?
309  }
310
311  @Override @Test
312  public void untested_spec213_blackbox_failingOnSignalInvocation() throws Exception {
313    notVerified(); // cannot be meaningfully tested, or can it?
314  }
315
316  @Override @Test
317  public void required_spec213_blackbox_onSubscribe_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable {
318    blackboxSubscriberWithoutSetupTest(new BlackboxTestStageTestRun() {
319      @Override
320      public void run(BlackboxTestStage stage) throws Throwable {
321
322        {
323          final Subscriber<T> sub = createSubscriber();
324          boolean gotNPE = false;
325          try {
326            sub.onSubscribe(null);
327          } catch(final NullPointerException expected) {
328            gotNPE = true;
329          }
330          assertTrue(gotNPE, "onSubscribe(null) did not throw NullPointerException");
331        }
332
333        env.verifyNoAsyncErrorsNoDelay();
334      }
335    });
336  }
337
338  @Override @Test
339  public void required_spec213_blackbox_onNext_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable {
340    blackboxSubscriberWithoutSetupTest(new BlackboxTestStageTestRun() {
341      @Override
342      public void run(BlackboxTestStage stage) throws Throwable {
343        final Subscription subscription = new Subscription() {
344          @Override public void request(final long elements) {}
345          @Override public void cancel() {}
346        };
347
348        {
349          final Subscriber<T> sub = createSubscriber();
350          boolean gotNPE = false;
351          sub.onSubscribe(subscription);
352          try {
353            sub.onNext(null);
354          } catch(final NullPointerException expected) {
355            gotNPE = true;
356          }
357          assertTrue(gotNPE, "onNext(null) did not throw NullPointerException");
358        }
359
360        env.verifyNoAsyncErrorsNoDelay();
361      }
362    });
363  }
364
365  @Override @Test
366  public void required_spec213_blackbox_onError_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable {
367    blackboxSubscriberWithoutSetupTest(new BlackboxTestStageTestRun() {
368      @Override
369      public void run(BlackboxTestStage stage) throws Throwable {
370        final Subscription subscription = new Subscription() {
371          @Override public void request(final long elements) {}
372          @Override public void cancel() {}
373        };
374
375        {
376          final Subscriber<T> sub = createSubscriber();
377          boolean gotNPE = false;
378          sub.onSubscribe(subscription);
379          try {
380            sub.onError(null);
381          } catch(final NullPointerException expected) {
382            gotNPE = true;
383          }
384          assertTrue(gotNPE, "onError(null) did not throw NullPointerException");
385        }
386
387        env.verifyNoAsyncErrorsNoDelay();
388      }
389    });
390  }
391
392  ////////////////////// SUBSCRIPTION SPEC RULE VERIFICATION //////////////////
393
394  @Override @Test
395  public void untested_spec301_blackbox_mustNotBeCalledOutsideSubscriberContext() throws Exception {
396    notVerified(); // cannot be meaningfully tested, or can it?
397  }
398
399  @Override @Test
400  public void untested_spec308_blackbox_requestMustRegisterGivenNumberElementsToBeProduced() throws Throwable {
401    notVerified(); // cannot be meaningfully tested as black box, or can it?
402  }
403
404  @Override @Test
405  public void untested_spec310_blackbox_requestMaySynchronouslyCallOnNextOnSubscriber() throws Exception {
406    notVerified(); // cannot be meaningfully tested, or can it?
407  }
408
409  @Override @Test
410  public void untested_spec311_blackbox_requestMaySynchronouslyCallOnCompleteOrOnError() throws Exception {
411    notVerified(); // cannot be meaningfully tested, or can it?
412  }
413
414  @Override @Test
415  public void untested_spec314_blackbox_cancelMayCauseThePublisherToShutdownIfNoOtherSubscriptionExists() throws Exception {
416    notVerified(); // cannot be meaningfully tested, or can it?
417  }
418
419  @Override @Test
420  public void untested_spec315_blackbox_cancelMustNotThrowExceptionAndMustSignalOnError() throws Exception {
421    notVerified(); // cannot be meaningfully tested, or can it?
422  }
423
424  @Override @Test
425  public void untested_spec316_blackbox_requestMustNotThrowExceptionAndMustOnErrorTheSubscriber() throws Exception {
426    notVerified(); // cannot be meaningfully tested, or can it?
427  }
428
429  /////////////////////// ADDITIONAL "COROLLARY" TESTS ////////////////////////
430
431  /////////////////////// TEST INFRASTRUCTURE /////////////////////////////////
432
433  abstract class BlackboxTestStageTestRun {
434    public abstract void run(BlackboxTestStage stage) throws Throwable;
435  }
436
437  public void blackboxSubscriberTest(BlackboxTestStageTestRun body) throws Throwable {
438    BlackboxTestStage stage = new BlackboxTestStage(env, true);
439    body.run(stage);
440  }
441
442  public void blackboxSubscriberWithoutSetupTest(BlackboxTestStageTestRun body) throws Throwable {
443    BlackboxTestStage stage = new BlackboxTestStage(env, false);
444    body.run(stage);
445  }
446
447  public class BlackboxTestStage extends ManualPublisher<T> {
448    public Publisher<T> pub;
449    public ManualSubscriber<T> tees; // gives us access to an infinite stream of T values
450
451    public T lastT = null;
452    private Optional<BlackboxSubscriberProxy<T>> subProxy = Optional.empty();
453
454    public BlackboxTestStage(TestEnvironment env) throws InterruptedException {
455      this(env, true);
456    }
457
458    public BlackboxTestStage(TestEnvironment env, boolean runDefaultInit) throws InterruptedException {
459      super(env);
460      if (runDefaultInit) {
461        pub = this.createHelperPublisher(Long.MAX_VALUE);
462        tees = env.newManualSubscriber(pub);
463        Subscriber<T> sub = createSubscriber();
464        subProxy = Optional.of(createBlackboxSubscriberProxy(env, sub));
465        subscribe(subProxy.get());
466      }
467    }
468
469    public Subscriber<? super T> sub() {
470      return subscriber.value();
471    }
472
473    /**
474     * Proxy for the {@link #sub()} {@code Subscriber}, providing certain assertions on methods being called on the Subscriber.
475     */
476    public BlackboxSubscriberProxy<T> subProxy() {
477      return subProxy.get();
478    }
479
480    public Publisher<T> createHelperPublisher(long elements) {
481      return SubscriberBlackboxVerification.this.createHelperPublisher(elements);
482    }
483
484    public BlackboxSubscriberProxy<T> createBlackboxSubscriberProxy(TestEnvironment env, Subscriber<T> sub) {
485      return new BlackboxSubscriberProxy<T>(env, sub);
486    }
487
488    public T signalNext() throws InterruptedException {
489      T element = nextT();
490      sendNext(element);
491      return element;
492    }
493
494    public T nextT() throws InterruptedException {
495      lastT = tees.requestNextElement();
496      return lastT;
497    }
498
499  }
500
501  public void notVerified() {
502    throw new SkipException("Not verified using this TCK.");
503  }
504}