001/************************************************************************
002 * Licensed under Public Domain (CC0)                                    *
003 *                                                                       *
004 * To the extent possible under law, the person who associated CC0 with  *
005 * this code has waived all copyright and related or neighboring         *
006 * rights to this code.                                                  *
007 *                                                                       *
008 * You should have received a copy of the CC0 legalcode along with this  *
009 * work. If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.*
010 ************************************************************************/
011
012package org.reactivestreams.tck;
013
014import org.reactivestreams.Publisher;
015import org.reactivestreams.Subscriber;
016import org.reactivestreams.Subscription;
017import org.reactivestreams.tck.TestEnvironment.BlackholeSubscriberWithSubscriptionSupport;
018import org.reactivestreams.tck.TestEnvironment.Latch;
019import org.reactivestreams.tck.TestEnvironment.ManualSubscriber;
020import org.reactivestreams.tck.TestEnvironment.ManualSubscriberWithSubscriptionSupport;
021import org.reactivestreams.tck.flow.support.Function;
022import org.reactivestreams.tck.flow.support.Optional;
023import org.reactivestreams.tck.flow.support.PublisherVerificationRules;
024import org.testng.SkipException;
025import org.testng.annotations.BeforeMethod;
026import org.testng.annotations.Test;
027
028import java.lang.Override;
029import java.lang.ref.ReferenceQueue;
030import java.lang.ref.WeakReference;
031import java.util.ArrayList;
032import java.util.Arrays;
033import java.util.Collections;
034import java.util.List;
035import java.util.Random;
036import java.util.concurrent.atomic.AtomicInteger;
037import java.util.concurrent.atomic.AtomicReference;
038
039import static org.testng.Assert.assertEquals;
040import static org.testng.Assert.assertTrue;
041
042/**
043 * Provides tests for verifying {@code Publisher} specification rules.
044 *
045 * @see org.reactivestreams.Publisher
046 */
047public abstract class PublisherVerification<T> implements PublisherVerificationRules {
048
049  private static final String PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS_ENV = "PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS";
050  private static final long DEFAULT_PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS = 300L;
051
052  private final TestEnvironment env;
053
054  /**
055   * The amount of time after which a cancelled Subscriber reference should be dropped.
056   * See Rule 3.13 for details.
057   */
058  private final long publisherReferenceGCTimeoutMillis;
059
060  /**
061   * Constructs a new verification class using the given env and configuration.
062   *
063   * @param publisherReferenceGCTimeoutMillis used to determine after how much time a reference to a Subscriber should be already dropped by the Publisher.
064   */
065  public PublisherVerification(TestEnvironment env, long publisherReferenceGCTimeoutMillis) {
066    this.env = env;
067    this.publisherReferenceGCTimeoutMillis = publisherReferenceGCTimeoutMillis;
068  }
069
070  /**
071   * Constructs a new verification class using the given env and configuration.
072   *
073   * The value for {@code publisherReferenceGCTimeoutMillis} will be obtained by using {@link PublisherVerification#envPublisherReferenceGCTimeoutMillis()}.
074   */
075  public PublisherVerification(TestEnvironment env) {
076    this.env = env;
077    this.publisherReferenceGCTimeoutMillis = envPublisherReferenceGCTimeoutMillis();
078  }
079
080  /**
081   * Tries to parse the env variable {@code PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS} as long and returns the value if present,
082   * OR its default value ({@link PublisherVerification#DEFAULT_PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS}).
083   *
084   * This value is used to determine after how much time a reference to a Subscriber should be already dropped by the Publisher.
085   *
086   * @throws java.lang.IllegalArgumentException when unable to parse the env variable
087   */
088  public static long envPublisherReferenceGCTimeoutMillis() {
089    final String envMillis = System.getenv(PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS_ENV);
090    if (envMillis == null) return DEFAULT_PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS;
091    else try {
092      return Long.parseLong(envMillis);
093    } catch (NumberFormatException ex) {
094      throw new IllegalArgumentException(String.format("Unable to parse %s env value [%s] as long!", PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS_ENV, envMillis), ex);
095    }
096  }
097
098  /**
099   * This is the main method you must implement in your test incarnation.
100   * It must create a Publisher for a stream with exactly the given number of elements.
101   * If `elements` is `Long.MAX_VALUE` the produced stream must be infinite.
102   */
103  public abstract Publisher<T> createPublisher(long elements);
104
105  /**
106   * By implementing this method, additional TCK tests concerning a "failed" publishers will be run.
107   *
108   * The expected behaviour of the {@link Publisher} returned by this method is hand out a subscription,
109   * followed by signalling {@code onError} on it, as specified by Rule 1.9.
110   *
111   * If you ignore these additional tests, return {@code null} from this method.
112   */
113  public abstract Publisher<T> createFailedPublisher();
114
115
116  /**
117   * Override and return lower value if your Publisher is only able to produce a known number of elements.
118   * For example, if it is designed to return at-most-one element, return {@code 1} from this method.
119   *
120   * Defaults to {@code Long.MAX_VALUE - 1}, meaning that the Publisher can be produce a huge but NOT an unbounded number of elements.
121   *
122   * To mark your Publisher will *never* signal an {@code onComplete} override this method and return {@code Long.MAX_VALUE},
123   * which will result in *skipping all tests which require an onComplete to be triggered* (!).
124   */
125  public long maxElementsFromPublisher() {
126    return Long.MAX_VALUE - 1;
127  }
128
129  /**
130   * Override and return {@code true} in order to skip executing tests marked as {@code Stochastic}.
131   * Stochastic in this case means that the Rule is impossible or infeasible to deterministically verify—
132   * usually this means that this test case can yield false positives ("be green") even if for some case,
133   * the given implementation may violate the tested behaviour.
134   */
135  public boolean skipStochasticTests() {
136    return false;
137  }
138
139  /**
140   * In order to verify rule 3.3 of the reactive streams spec, this number will be used to check if a
141   * {@code Subscription} actually solves the "unbounded recursion" problem by not allowing the number of
142   * recursive calls to exceed the number returned by this method.
143   *
144   * @see <a href="https://github.com/reactive-streams/reactive-streams-jvm#3.3">reactive streams spec, rule 3.3</a>
145   * @see PublisherVerification#required_spec303_mustNotAllowUnboundedRecursion()
146   */
147  public long boundedDepthOfOnNextAndRequestRecursion() {
148    return 1;
149  }
150
151  ////////////////////// TEST ENV CLEANUP /////////////////////////////////////
152
153  @BeforeMethod
154  public void setUp() throws Exception {
155    env.clearAsyncErrors();
156  }
157
158  ////////////////////// TEST SETUP VERIFICATION //////////////////////////////
159
160  @Override @Test
161  public void required_createPublisher1MustProduceAStreamOfExactly1Element() throws Throwable {
162    activePublisherTest(1, true, new PublisherTestRun<T>() {
163      @Override
164      public void run(Publisher<T> pub) throws InterruptedException {
165        ManualSubscriber<T> sub = env.newManualSubscriber(pub);
166        assertTrue(requestNextElementOrEndOfStream(pub, sub).isDefined(), String.format("Publisher %s produced no elements", pub));
167        sub.requestEndOfStream();
168      }
169
170      Optional<T> requestNextElementOrEndOfStream(Publisher<T> pub, ManualSubscriber<T> sub) throws InterruptedException {
171        return sub.requestNextElementOrEndOfStream(String.format("Timeout while waiting for next element from Publisher %s", pub));
172      }
173
174    });
175  }
176
177  @Override @Test
178  public void required_createPublisher3MustProduceAStreamOfExactly3Elements() throws Throwable {
179    activePublisherTest(3, true, new PublisherTestRun<T>() {
180      @Override
181      public void run(Publisher<T> pub) throws InterruptedException {
182        ManualSubscriber<T> sub = env.newManualSubscriber(pub);
183        assertTrue(requestNextElementOrEndOfStream(pub, sub).isDefined(), String.format("Publisher %s produced no elements", pub));
184        assertTrue(requestNextElementOrEndOfStream(pub, sub).isDefined(), String.format("Publisher %s produced only 1 element", pub));
185        assertTrue(requestNextElementOrEndOfStream(pub, sub).isDefined(), String.format("Publisher %s produced only 2 elements", pub));
186        sub.requestEndOfStream();
187      }
188
189      Optional<T> requestNextElementOrEndOfStream(Publisher<T> pub, ManualSubscriber<T> sub) throws InterruptedException {
190        return sub.requestNextElementOrEndOfStream(String.format("Timeout while waiting for next element from Publisher %s", pub));
191      }
192
193    });
194  }
195
196  @Override @Test
197  public void required_validate_maxElementsFromPublisher() throws Exception {
198    assertTrue(maxElementsFromPublisher() >= 0, "maxElementsFromPublisher MUST return a number >= 0");
199  }
200
201  @Override @Test
202  public void required_validate_boundedDepthOfOnNextAndRequestRecursion() throws Exception {
203    assertTrue(boundedDepthOfOnNextAndRequestRecursion() >= 1, "boundedDepthOfOnNextAndRequestRecursion must return a number >= 1");
204  }
205
206
207  ////////////////////// SPEC RULE VERIFICATION ///////////////////////////////
208
209  @Override @Test
210  public void required_spec101_subscriptionRequestMustResultInTheCorrectNumberOfProducedElements() throws Throwable {
211    activePublisherTest(5, false, new PublisherTestRun<T>() {
212      @Override
213      public void run(Publisher<T> pub) throws InterruptedException {
214
215        ManualSubscriber<T> sub = env.newManualSubscriber(pub);
216        try {
217            sub.expectNone(String.format("Publisher %s produced value before the first `request`: ", pub));
218            sub.request(1);
219            sub.nextElement(String.format("Publisher %s produced no element after first `request`", pub));
220            sub.expectNone(String.format("Publisher %s produced unrequested: ", pub));
221    
222            sub.request(1);
223            sub.request(2);
224            sub.nextElements(3, env.defaultTimeoutMillis(), String.format("Publisher %s produced less than 3 elements after two respective `request` calls", pub));
225
226            sub.expectNone(String.format("Publisher %sproduced unrequested ", pub));
227        } finally {
228            sub.cancel();
229        }
230      }
231    });
232  }
233
234  @Override @Test
235  public void required_spec102_maySignalLessThanRequestedAndTerminateSubscription() throws Throwable {
236    final int elements = 3;
237    final int requested = 10;
238
239    activePublisherTest(elements, true, new PublisherTestRun<T>() {
240      @Override
241      public void run(Publisher<T> pub) throws Throwable {
242        final ManualSubscriber<T> sub = env.newManualSubscriber(pub);
243        sub.request(requested);
244        sub.nextElements(elements);
245        sub.expectCompletion();
246      }
247    });
248  }
249
250  @Override @Test
251  public void stochastic_spec103_mustSignalOnMethodsSequentially() throws Throwable {
252    final int iterations = 100;
253    final int elements = 10;
254
255    stochasticTest(iterations, new Function<Integer, Void>() {
256      @Override
257      public Void apply(final Integer runNumber) throws Throwable {
258        activePublisherTest(elements, true, new PublisherTestRun<T>() {
259          @Override
260          public void run(Publisher<T> pub) throws Throwable {
261            final Latch completionLatch = new Latch(env);
262
263            final AtomicInteger gotElements = new AtomicInteger(0);
264            pub.subscribe(new Subscriber<T>() {
265              private Subscription subs;
266
267              private ConcurrentAccessBarrier concurrentAccessBarrier = new ConcurrentAccessBarrier();
268
269              /**
270               * Concept wise very similar to a {@link org.reactivestreams.tck.TestEnvironment.Latch}, serves to protect
271               * a critical section from concurrent access, with the added benefit of Thread tracking and same-thread-access awareness.
272               *
273               * Since a <i>Synchronous</i> Publisher may choose to synchronously (using the same {@link Thread}) call
274               * {@code onNext} directly from either {@code subscribe} or {@code request} a plain Latch is not enough
275               * to verify concurrent access safety - one needs to track if the caller is not still using the calling thread
276               * to enter subsequent critical sections ("nesting" them effectively).
277               */
278              final class ConcurrentAccessBarrier {
279                private AtomicReference<Thread> currentlySignallingThread = new AtomicReference<Thread>(null);
280                private volatile String previousSignal = null;
281
282                public void enterSignal(String signalName) {
283                  if((!currentlySignallingThread.compareAndSet(null, Thread.currentThread())) && !isSynchronousSignal()) {
284                    env.flop(String.format(
285                      "Illegal concurrent access detected (entering critical section)! " +
286                        "%s emited %s signal, before %s finished its %s signal.",
287                        Thread.currentThread(), signalName, currentlySignallingThread.get(), previousSignal));
288                  }
289                  this.previousSignal = signalName;
290                }
291
292                public void leaveSignal(String signalName) {
293                  currentlySignallingThread.set(null);
294                  this.previousSignal = signalName;
295                }
296
297                private boolean isSynchronousSignal() {
298                  return (previousSignal != null) && Thread.currentThread().equals(currentlySignallingThread.get());
299                }
300
301              }
302
303              @Override
304              public void onSubscribe(Subscription s) {
305                final String signal = "onSubscribe()";
306                concurrentAccessBarrier.enterSignal(signal);
307
308                subs = s;
309                subs.request(1);
310
311                concurrentAccessBarrier.leaveSignal(signal);
312              }
313
314              @Override
315              public void onNext(T ignore) {
316                final String signal = String.format("onNext(%s)", ignore);
317                concurrentAccessBarrier.enterSignal(signal);
318
319                if (gotElements.incrementAndGet() <= elements) // requesting one more than we know are in the stream (some Publishers need this)
320                  subs.request(1);
321
322                concurrentAccessBarrier.leaveSignal(signal);
323              }
324
325              @Override
326              public void onError(Throwable t) {
327                final String signal = String.format("onError(%s)", t.getMessage());
328                concurrentAccessBarrier.enterSignal(signal);
329
330                // ignore value
331
332                concurrentAccessBarrier.leaveSignal(signal);
333              }
334
335              @Override
336              public void onComplete() {
337                final String signal = "onComplete()";
338                concurrentAccessBarrier.enterSignal(signal);
339
340                // entering for completeness
341
342                concurrentAccessBarrier.leaveSignal(signal);
343                completionLatch.close();
344              }
345            });
346
347            completionLatch.expectClose(
348              elements * env.defaultTimeoutMillis(),
349              String.format("Failed in iteration %d of %d. Expected completion signal after signalling %d elements (signalled %d), yet did not receive it",
350                            runNumber, iterations, elements, gotElements.get()));
351          }
352        });
353        return null;
354      }
355    });
356  }
357
358  @Override @Test
359  public void optional_spec104_mustSignalOnErrorWhenFails() throws Throwable {
360    try {
361      whenHasErrorPublisherTest(new PublisherTestRun<T>() {
362        @Override
363        public void run(final Publisher<T> pub) throws InterruptedException {
364          final Latch onErrorlatch = new Latch(env);
365          final Latch onSubscribeLatch = new Latch(env);
366          pub.subscribe(new TestEnvironment.TestSubscriber<T>(env) {
367            @Override
368            public void onSubscribe(Subscription subs) {
369              onSubscribeLatch.assertOpen("Only one onSubscribe call expected");
370              onSubscribeLatch.close();
371            }
372            @Override
373            public void onError(Throwable cause) {
374              onSubscribeLatch.assertClosed("onSubscribe should be called prior to onError always");
375              onErrorlatch.assertOpen(String.format("Error-state Publisher %s called `onError` twice on new Subscriber", pub));
376              onErrorlatch.close();
377            }
378          });
379
380          onSubscribeLatch.expectClose("Should have received onSubscribe");
381          onErrorlatch.expectClose(String.format("Error-state Publisher %s did not call `onError` on new Subscriber", pub));
382
383          env.verifyNoAsyncErrors();
384          }
385      });
386    } catch (SkipException se) {
387      throw se;
388    } catch (Throwable ex) {
389      // we also want to catch AssertionErrors and anything the publisher may have thrown inside subscribe
390      // which was wrong of him - he should have signalled on error using onError
391      throw new RuntimeException(String.format("Publisher threw exception (%s) instead of signalling error via onError!", ex.getMessage()), ex);
392    }
393  }
394
395  @Override @Test
396  public void required_spec105_mustSignalOnCompleteWhenFiniteStreamTerminates() throws Throwable {
397    activePublisherTest(3, true, new PublisherTestRun<T>() {
398      @Override
399      public void run(Publisher<T> pub) throws Throwable {
400        ManualSubscriber<T> sub = env.newManualSubscriber(pub);
401        sub.requestNextElement();
402        sub.requestNextElement();
403        sub.requestNextElement();
404        sub.requestEndOfStream();
405        sub.expectNone();
406      }
407    });
408  }
409
410  @Override @Test
411  public void optional_spec105_emptyStreamMustTerminateBySignallingOnComplete() throws Throwable {
412    optionalActivePublisherTest(0, true, new PublisherTestRun<T>() {
413      @Override
414      public void run(Publisher<T> pub) throws Throwable {
415        ManualSubscriber<T> sub = env.newManualSubscriber(pub);
416        sub.request(1);
417        sub.expectCompletion();
418        sub.expectNone();
419      }
420    });
421  }
422
423  @Override @Test
424  public void untested_spec106_mustConsiderSubscriptionCancelledAfterOnErrorOrOnCompleteHasBeenCalled() throws Throwable {
425    notVerified(); // not really testable without more control over the Publisher
426  }
427
428  @Override @Test
429  public void required_spec107_mustNotEmitFurtherSignalsOnceOnCompleteHasBeenSignalled() throws Throwable {
430    activePublisherTest(1, true, new PublisherTestRun<T>() {
431      @Override
432      public void run(Publisher<T> pub) throws Throwable {
433        ManualSubscriber<T> sub = env.newManualSubscriber(pub);
434        sub.request(10);
435        sub.nextElement();
436        sub.expectCompletion();
437
438        sub.request(10);
439        sub.expectNone();
440      }
441    });
442  }
443
444  @Override @Test
445  public void untested_spec107_mustNotEmitFurtherSignalsOnceOnErrorHasBeenSignalled() throws Throwable {
446    notVerified(); // can we meaningfully test this, without more control over the publisher?
447  }
448
449  @Override @Test
450  public void untested_spec108_possiblyCanceledSubscriptionShouldNotReceiveOnErrorOrOnCompleteSignals() throws Throwable {
451    notVerified(); // can we meaningfully test this?
452  }
453
454  @Override @Test
455  public void untested_spec109_subscribeShouldNotThrowNonFatalThrowable() throws Throwable {
456    notVerified(); // can we meaningfully test this?
457  }
458
459  @Override @Test
460  public void required_spec109_subscribeThrowNPEOnNullSubscriber() throws Throwable {
461    activePublisherTest(0, false, new PublisherTestRun<T>() {
462      @Override
463      public void run(Publisher<T> pub) throws Throwable {
464        try {
465            pub.subscribe(null);
466            env.flop("Publisher did not throw a NullPointerException when given a null Subscribe in subscribe");
467        } catch (NullPointerException ignored) {
468          // valid behaviour
469        }
470        env.verifyNoAsyncErrorsNoDelay();
471      }
472    });
473  }
474
475  @Override @Test
476  public void required_spec109_mustIssueOnSubscribeForNonNullSubscriber() throws Throwable {
477    activePublisherTest(0, false, new PublisherTestRun<T>() {
478      @Override
479      public void run(Publisher<T> pub) throws Throwable {
480        final Latch onSubscribeLatch = new Latch(env);
481        final AtomicReference<Subscription> cancel = new AtomicReference<Subscription>();
482        try {
483          pub.subscribe(new Subscriber<T>() {
484            @Override
485            public void onError(Throwable cause) {
486              onSubscribeLatch.assertClosed("onSubscribe should be called prior to onError always");
487            }
488    
489            @Override
490            public void onSubscribe(Subscription subs) {
491              cancel.set(subs);
492              onSubscribeLatch.assertOpen("Only one onSubscribe call expected");
493              onSubscribeLatch.close();
494            }
495    
496            @Override
497            public void onNext(T elem) {
498              onSubscribeLatch.assertClosed("onSubscribe should be called prior to onNext always");
499            }
500    
501            @Override
502            public void onComplete() {
503              onSubscribeLatch.assertClosed("onSubscribe should be called prior to onComplete always");
504            }
505          });
506          onSubscribeLatch.expectClose("Should have received onSubscribe");
507          env.verifyNoAsyncErrorsNoDelay();
508        } finally {
509          Subscription s = cancel.getAndSet(null);
510          if (s != null) {
511            s.cancel();
512          }
513        }
514      }
515    });
516  }
517
518  @Override @Test
519  public void required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorAfterOnSubscribe() throws Throwable {
520    whenHasErrorPublisherTest(new PublisherTestRun<T>() {
521      @Override
522      public void run(Publisher<T> pub) throws Throwable {
523        final Latch onErrorLatch = new Latch(env);
524        final Latch onSubscribeLatch = new Latch(env);
525        ManualSubscriberWithSubscriptionSupport<T> sub = new ManualSubscriberWithSubscriptionSupport<T>(env) {
526          @Override
527          public void onError(Throwable cause) {
528            onSubscribeLatch.assertClosed("onSubscribe should be called prior to onError always");
529            onErrorLatch.assertOpen("Only one onError call expected");
530            onErrorLatch.close();
531          }
532
533          @Override
534          public void onSubscribe(Subscription subs) {
535            onSubscribeLatch.assertOpen("Only one onSubscribe call expected");
536            onSubscribeLatch.close();
537          }
538        };
539        pub.subscribe(sub);
540        onSubscribeLatch.expectClose("Should have received onSubscribe");
541        onErrorLatch.expectClose("Should have received onError");
542
543        env.verifyNoAsyncErrorsNoDelay();
544      }
545    });
546  }
547
548  @Override @Test
549  public void untested_spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice() throws Throwable {
550    notVerified(); // can we meaningfully test this?
551  }
552
553  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#1.11
554  @Override @Test
555  public void optional_spec111_maySupportMultiSubscribe() throws Throwable {
556    optionalActivePublisherTest(1, false, new PublisherTestRun<T>() {
557      @Override
558      public void run(Publisher<T> pub) throws Throwable {
559        ManualSubscriber<T> sub1 = env.newManualSubscriber(pub);
560        ManualSubscriber<T> sub2 = env.newManualSubscriber(pub);
561
562        try {
563          env.verifyNoAsyncErrors();
564        } finally {
565          try {
566            sub1.cancel();
567          } finally {
568            sub2.cancel();
569          }
570        }
571      }
572    });
573  }
574
575  @Override @Test
576  public void optional_spec111_registeredSubscribersMustReceiveOnNextOrOnCompleteSignals() throws Throwable {
577    optionalActivePublisherTest(1, false, new PublisherTestRun<T>() {
578      @Override
579      public void run(Publisher<T> pub) throws Throwable {
580        ManualSubscriber<T> sub1 = env.newManualSubscriber(pub);
581        ManualSubscriber<T> sub2 = env.newManualSubscriber(pub);
582        // Since we're testing the case when the Publisher DOES support the optional multi-subscribers scenario,
583        // and decides if it handles them uni-cast or multi-cast, we don't know which subscriber will receive an
584        // onNext (and optional onComplete) signal(s) and which just onComplete signal.
585        // Plus, even if subscription assumed to be unicast, it's implementation choice, which one will be signalled
586        // with onNext.
587        sub1.requestNextElementOrEndOfStream();
588        sub2.requestNextElementOrEndOfStream();
589        try {
590            env.verifyNoAsyncErrors();
591        } finally {
592            try {
593                sub1.cancel();
594            } finally {
595                sub2.cancel();
596            }
597        }
598      }
599    });
600  }
601
602  @Override @Test
603  public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne() throws Throwable {
604    optionalActivePublisherTest(5, true, new PublisherTestRun<T>() { // This test is skipped if the publisher is unbounded (never sends onComplete)
605      @Override
606      public void run(Publisher<T> pub) throws InterruptedException {
607        ManualSubscriber<T> sub1 = env.newManualSubscriber(pub);
608        ManualSubscriber<T> sub2 = env.newManualSubscriber(pub);
609        ManualSubscriber<T> sub3 = env.newManualSubscriber(pub);
610
611        sub1.request(1);
612        T x1 = sub1.nextElement(String.format("Publisher %s did not produce the requested 1 element on 1st subscriber", pub));
613        sub2.request(2);
614        List<T> y1 = sub2.nextElements(2, String.format("Publisher %s did not produce the requested 2 elements on 2nd subscriber", pub));
615        sub1.request(1);
616        T x2 = sub1.nextElement(String.format("Publisher %s did not produce the requested 1 element on 1st subscriber", pub));
617        sub3.request(3);
618        List<T> z1 = sub3.nextElements(3, String.format("Publisher %s did not produce the requested 3 elements on 3rd subscriber", pub));
619        sub3.request(1);
620        T z2 = sub3.nextElement(String.format("Publisher %s did not produce the requested 1 element on 3rd subscriber", pub));
621        sub3.request(1);
622        T z3 = sub3.nextElement(String.format("Publisher %s did not produce the requested 1 element on 3rd subscriber", pub));
623        sub3.requestEndOfStream(String.format("Publisher %s did not complete the stream as expected on 3rd subscriber", pub));
624        sub2.request(3);
625        List<T> y2 = sub2.nextElements(3, String.format("Publisher %s did not produce the requested 3 elements on 2nd subscriber", pub));
626        sub2.requestEndOfStream(String.format("Publisher %s did not complete the stream as expected on 2nd subscriber", pub));
627        sub1.request(2);
628        List<T> x3 = sub1.nextElements(2, String.format("Publisher %s did not produce the requested 2 elements on 1st subscriber", pub));
629        sub1.request(1);
630        T x4 = sub1.nextElement(String.format("Publisher %s did not produce the requested 1 element on 1st subscriber", pub));
631        sub1.requestEndOfStream(String.format("Publisher %s did not complete the stream as expected on 1st subscriber", pub));
632
633        @SuppressWarnings("unchecked")
634        List<T> r = new ArrayList<T>(Arrays.asList(x1, x2));
635        r.addAll(x3);
636        r.addAll(Collections.singleton(x4));
637
638        List<T> check1 = new ArrayList<T>(y1);
639        check1.addAll(y2);
640
641        //noinspection unchecked
642        List<T> check2 = new ArrayList<T>(z1);
643        check2.add(z2);
644        check2.add(z3);
645
646        assertEquals(r, check1, String.format("Publisher %s did not produce the same element sequence for subscribers 1 and 2", pub));
647        assertEquals(r, check2, String.format("Publisher %s did not produce the same element sequence for subscribers 1 and 3", pub));
648      }
649    });
650  }
651
652  @Override @Test
653  public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront() throws Throwable {
654    optionalActivePublisherTest(3, false, new PublisherTestRun<T>() { // This test is skipped if the publisher cannot produce enough elements
655      @Override
656      public void run(Publisher<T> pub) throws Throwable {
657        ManualSubscriber<T> sub1 = env.newManualSubscriber(pub);
658        ManualSubscriber<T> sub2 = env.newManualSubscriber(pub);
659        ManualSubscriber<T> sub3 = env.newManualSubscriber(pub);
660
661        List<T> received1 = new ArrayList<T>();
662        List<T> received2 = new ArrayList<T>();
663        List<T> received3 = new ArrayList<T>();
664
665        // if the publisher must touch it's source to notice it's been drained, the OnComplete won't come until we ask for more than it actually contains...
666        // edgy edge case?
667        sub1.request(4);
668        sub2.request(4);
669        sub3.request(4);
670
671        received1.addAll(sub1.nextElements(3));
672        received2.addAll(sub2.nextElements(3));
673        received3.addAll(sub3.nextElements(3));
674
675        // NOTE: can't check completion, the Publisher may not be able to signal it
676        //       a similar test *with* completion checking is implemented
677
678        assertEquals(received1, received2, String.format("Expected elements to be signaled in the same sequence to 1st and 2nd subscribers"));
679        assertEquals(received2, received3, String.format("Expected elements to be signaled in the same sequence to 2nd and 3rd subscribers"));
680      }
681    });
682  }
683
684  @Override @Test
685  public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected() throws Throwable {
686    optionalActivePublisherTest(3, true, new PublisherTestRun<T>() { // This test is skipped if the publisher is unbounded (never sends onComplete)
687      @Override
688      public void run(Publisher<T> pub) throws Throwable {
689        ManualSubscriber<T> sub1 = env.newManualSubscriber(pub);
690        ManualSubscriber<T> sub2 = env.newManualSubscriber(pub);
691        ManualSubscriber<T> sub3 = env.newManualSubscriber(pub);
692
693        List<T> received1 = new ArrayList<T>();
694        List<T> received2 = new ArrayList<T>();
695        List<T> received3 = new ArrayList<T>();
696
697        // if the publisher must touch it's source to notice it's been drained, the OnComplete won't come until we ask for more than it actually contains...
698        // edgy edge case?
699        sub1.request(4);
700        sub2.request(4);
701        sub3.request(4);
702
703        received1.addAll(sub1.nextElements(3));
704        received2.addAll(sub2.nextElements(3));
705        received3.addAll(sub3.nextElements(3));
706
707        sub1.expectCompletion();
708        sub2.expectCompletion();
709        sub3.expectCompletion();
710
711        assertEquals(received1, received2, String.format("Expected elements to be signaled in the same sequence to 1st and 2nd subscribers"));
712        assertEquals(received2, received3, String.format("Expected elements to be signaled in the same sequence to 2nd and 3rd subscribers"));
713      }
714    });
715  }
716
717  ///////////////////// SUBSCRIPTION TESTS //////////////////////////////////
718
719  @Override @Test
720  public void required_spec302_mustAllowSynchronousRequestCallsFromOnNextAndOnSubscribe() throws Throwable {
721    activePublisherTest(6, false, new PublisherTestRun<T>() {
722      @Override
723      public void run(Publisher<T> pub) throws Throwable {
724        ManualSubscriber<T> sub = new ManualSubscriber<T>(env) {
725          @Override
726          public void onSubscribe(Subscription subs) {
727            this.subscription.completeImmediatly(subs);
728
729            subs.request(1);
730            subs.request(1);
731            subs.request(1);
732          }
733
734          @Override
735          public void onNext(T element) {
736            Subscription subs = this.subscription.value();
737            subs.request(1);
738          }
739        };
740
741        env.subscribe(pub, sub);
742
743        env.verifyNoAsyncErrors();
744      }
745    });
746  }
747
748  @Override @Test
749  public void required_spec303_mustNotAllowUnboundedRecursion() throws Throwable {
750    final long oneMoreThanBoundedLimit = boundedDepthOfOnNextAndRequestRecursion() + 1;
751
752    activePublisherTest(oneMoreThanBoundedLimit, false, new PublisherTestRun<T>() {
753      @Override
754      public void run(Publisher<T> pub) throws Throwable {
755        final ThreadLocal<Long> stackDepthCounter = new ThreadLocal<Long>() {
756          @Override
757          protected Long initialValue() {
758            return 0L;
759          }
760        };
761
762        final Latch runCompleted = new Latch(env);
763
764        final ManualSubscriber<T> sub = new ManualSubscriberWithSubscriptionSupport<T>(env) {
765          // counts the number of signals received, used to break out from possibly infinite request/onNext loops
766          long signalsReceived = 0L;
767
768          @Override
769          public void onNext(T element) {
770            // NOT calling super.onNext as this test only cares about stack depths, not the actual values of elements
771            // which also simplifies this test as we do not have to drain the test buffer, which would otherwise be in danger of overflowing
772
773            signalsReceived += 1;
774            stackDepthCounter.set(stackDepthCounter.get() + 1);
775            if (env.debugEnabled()) {
776              env.debug(String.format("%s(recursion depth: %d)::onNext(%s)", this, stackDepthCounter.get(), element));
777            }
778
779            final long callsUntilNow = stackDepthCounter.get();
780            if (callsUntilNow > boundedDepthOfOnNextAndRequestRecursion()) {
781              env.flop(String.format("Got %d onNext calls within thread: %s, yet expected recursive bound was %d",
782                                     callsUntilNow, Thread.currentThread(), boundedDepthOfOnNextAndRequestRecursion()));
783
784              // stop the recursive call chain
785              runCompleted.close();
786              return;
787            } else if (signalsReceived >= oneMoreThanBoundedLimit) {
788              // since max number of signals reached, and recursion depth not exceeded, we judge this as a success and
789              // stop the recursive call chain
790              runCompleted.close();
791              return;
792            }
793
794            // request more right away, the Publisher must break the recursion
795            subscription.value().request(1);
796
797            stackDepthCounter.set(stackDepthCounter.get() - 1);
798          }
799
800          @Override
801          public void onComplete() {
802            super.onComplete();
803            runCompleted.close();
804          }
805
806          @Override
807          public void onError(Throwable cause) {
808            super.onError(cause);
809            runCompleted.close();
810          }
811        };
812
813        try {
814          env.subscribe(pub, sub);
815
816          sub.request(1); // kick-off the `request -> onNext -> request -> onNext -> ...`
817
818          final String msg = String.format("Unable to validate call stack depth safety, " +
819                                               "awaited at-most %s signals (`maxOnNextSignalsInRecursionTest()`) or completion",
820                                           oneMoreThanBoundedLimit);
821          runCompleted.expectClose(env.defaultTimeoutMillis(), msg);
822          env.verifyNoAsyncErrorsNoDelay();
823        } finally {
824          // since the request/onNext recursive calls may keep the publisher running "forever",
825          // we MUST cancel it manually before exiting this test case
826          sub.cancel();
827        }
828      }
829    });
830  }
831
832  @Override @Test
833  public void untested_spec304_requestShouldNotPerformHeavyComputations() throws Exception {
834    notVerified(); // cannot be meaningfully tested, or can it?
835  }
836
837  @Override @Test
838  public void untested_spec305_cancelMustNotSynchronouslyPerformHeavyComputation() throws Exception {
839    notVerified(); // cannot be meaningfully tested, or can it?
840  }
841
842  @Override @Test
843  public void required_spec306_afterSubscriptionIsCancelledRequestMustBeNops() throws Throwable {
844    activePublisherTest(3, false, new PublisherTestRun<T>() {
845      @Override
846      public void run(Publisher<T> pub) throws Throwable {
847
848        // override ManualSubscriberWithSubscriptionSupport#cancel because by default a ManualSubscriber will drop the
849        // subscription once it's cancelled (as expected).
850        // In this test however it must keep the cancelled Subscription and keep issuing `request(long)` to it.
851        ManualSubscriber<T> sub = new ManualSubscriberWithSubscriptionSupport<T>(env) {
852          @Override
853          public void cancel() {
854            if (subscription.isCompleted()) {
855              subscription.value().cancel();
856            } else {
857              env.flop("Cannot cancel a subscription before having received it");
858            }
859          }
860        };
861
862        env.subscribe(pub, sub);
863
864        sub.cancel();
865        sub.request(1);
866        sub.request(1);
867        sub.request(1);
868
869        sub.expectNone();
870        env.verifyNoAsyncErrorsNoDelay();
871      }
872    });
873  }
874
875  @Override @Test
876  public void required_spec307_afterSubscriptionIsCancelledAdditionalCancelationsMustBeNops() throws Throwable {
877    activePublisherTest(1, false, new PublisherTestRun<T>() {
878      @Override
879      public void run(Publisher<T> pub) throws Throwable {
880        final ManualSubscriber<T> sub = env.newManualSubscriber(pub);
881
882        // leak the Subscription
883        final Subscription subs = sub.subscription.value();
884
885        subs.cancel();
886        subs.cancel();
887        subs.cancel();
888
889        sub.expectNone();
890        env.verifyNoAsyncErrorsNoDelay();
891      }
892    });
893  }
894
895  @Override @Test
896  public void required_spec309_requestZeroMustSignalIllegalArgumentException() throws Throwable {
897    activePublisherTest(10, false, new PublisherTestRun<T>() {
898      @Override public void run(Publisher<T> pub) throws Throwable {
899        final ManualSubscriber<T> sub = env.newManualSubscriber(pub);
900        sub.request(0);
901        sub.expectError(IllegalArgumentException.class);
902      }
903    });
904  }
905
906  @Override @Test
907  public void required_spec309_requestNegativeNumberMustSignalIllegalArgumentException() throws Throwable {
908    activePublisherTest(10, false, new PublisherTestRun<T>() {
909      @Override
910      public void run(Publisher<T> pub) throws Throwable {
911        final ManualSubscriber<T> sub = env.newManualSubscriber(pub);
912        final Random r = new Random();
913        sub.request(-r.nextInt(Integer.MAX_VALUE) - 1);
914        // we do require implementations to mention the rule number at the very least, or mentioning that the non-negative request is the problem
915        sub.expectError(IllegalArgumentException.class); 
916      }
917    });
918  }
919
920  @Override @Test
921  public void optional_spec309_requestNegativeNumberMaySignalIllegalArgumentExceptionWithSpecificMessage() throws Throwable {
922    optionalActivePublisherTest(10, false, new PublisherTestRun<T>() {
923      @Override
924      public void run(Publisher<T> pub) throws Throwable {
925        final ManualSubscriber<T> sub = env.newManualSubscriber(pub);
926        final Random r = new Random();
927        sub.request(-r.nextInt(Integer.MAX_VALUE) - 1);
928        // we do require implementations to mention the rule number at the very least, or mentioning that the non-negative request is the problem
929        sub.expectErrorWithMessage(IllegalArgumentException.class, Arrays.asList("3.9", "non-positive subscription request", "negative subscription request")); 
930      }
931    });
932  }
933
934  @Override @Test
935  public void required_spec312_cancelMustMakeThePublisherToEventuallyStopSignaling() throws Throwable {
936    // the publisher is able to signal more elements than the subscriber will be requesting in total
937    final int publisherElements = 20;
938
939    final int demand1 = 10;
940    final int demand2 = 5;
941    final int totalDemand = demand1 + demand2;
942
943    activePublisherTest(publisherElements, false, new PublisherTestRun<T>() {
944      @Override @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
945      public void run(Publisher<T> pub) throws Throwable {
946        final ManualSubscriber<T> sub = env.newManualSubscriber(pub);
947
948        sub.request(demand1);
949        sub.request(demand2);
950
951        /*
952          NOTE: The order of the nextElement/cancel calls below is very important (!)
953
954          If this ordering was reversed, given an asynchronous publisher,
955          the following scenario would be *legal* and would break this test:
956
957          > AsyncPublisher receives request(10) - it does not emit data right away, it's asynchronous
958          > AsyncPublisher receives request(5) - demand is now 15
959          ! AsyncPublisher didn't emit any onNext yet (!)
960          > AsyncPublisher receives cancel() - handles it right away, by "stopping itself" for example
961          ! cancel was handled hefore the AsyncPublisher ever got the chance to emit data
962          ! the subscriber ends up never receiving even one element - the test is stuck (and fails, even on valid Publisher)
963
964          Which is why we must first expect an element, and then cancel, once the producing is "running".
965         */
966        sub.nextElement();
967        sub.cancel();
968
969        int onNextsSignalled = 1;
970
971        boolean stillBeingSignalled;
972        do {
973          // put asyncError if onNext signal received
974          sub.expectNone();
975          Throwable error = env.dropAsyncError();
976
977          if (error == null) {
978            stillBeingSignalled = false;
979          } else {
980            onNextsSignalled += 1;
981            stillBeingSignalled = true;
982          }
983
984          // if the Publisher tries to emit more elements than was requested (and/or ignores cancellation) this will throw
985          assertTrue(onNextsSignalled <= totalDemand,
986                     String.format("Publisher signalled [%d] elements, which is more than the signalled demand: %d",
987                                   onNextsSignalled, totalDemand));
988
989        } while (stillBeingSignalled);
990      }
991    });
992
993    env.verifyNoAsyncErrorsNoDelay();
994  }
995
996  @Override @Test
997  public void required_spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber() throws Throwable {
998    final ReferenceQueue<ManualSubscriber<T>> queue = new ReferenceQueue<ManualSubscriber<T>>();
999
1000    final Function<Publisher<T>, WeakReference<ManualSubscriber<T>>> run = new Function<Publisher<T>, WeakReference<ManualSubscriber<T>>>() {
1001      @Override
1002      public WeakReference<ManualSubscriber<T>> apply(Publisher<T> pub) throws Exception {
1003        final ManualSubscriber<T> sub = env.newManualSubscriber(pub);
1004        final WeakReference<ManualSubscriber<T>> ref = new WeakReference<ManualSubscriber<T>>(sub, queue);
1005
1006        sub.request(1);
1007        sub.nextElement();
1008        sub.cancel();
1009
1010        return ref;
1011      }
1012    };
1013
1014    activePublisherTest(3, false, new PublisherTestRun<T>() {
1015      @Override
1016      public void run(Publisher<T> pub) throws Throwable {
1017        final WeakReference<ManualSubscriber<T>> ref = run.apply(pub);
1018
1019        // cancel may be run asynchronously so we add a sleep before running the GC
1020        // to "resolve" the race
1021        Thread.sleep(publisherReferenceGCTimeoutMillis);
1022        System.gc();
1023
1024        if (!ref.equals(queue.remove(100))) {
1025          env.flop(String.format("Publisher %s did not drop reference to test subscriber after subscription cancellation", pub));
1026        }
1027
1028        env.verifyNoAsyncErrorsNoDelay();
1029      }
1030    });
1031  }
1032
1033  @Override @Test
1034  public void required_spec317_mustSupportAPendingElementCountUpToLongMaxValue() throws Throwable {
1035    final int totalElements = 3;
1036
1037    activePublisherTest(totalElements, true, new PublisherTestRun<T>() {
1038      @Override
1039      public void run(Publisher<T> pub) throws Throwable {
1040        ManualSubscriber<T> sub = env.newManualSubscriber(pub);
1041        sub.request(Long.MAX_VALUE);
1042
1043        sub.nextElements(totalElements);
1044        sub.expectCompletion();
1045
1046        env.verifyNoAsyncErrorsNoDelay();
1047      }
1048    });
1049  }
1050
1051  @Override @Test
1052  public void required_spec317_mustSupportACumulativePendingElementCountUpToLongMaxValue() throws Throwable {
1053    final int totalElements = 3;
1054
1055    activePublisherTest(totalElements, true, new PublisherTestRun<T>() {
1056      @Override
1057      public void run(Publisher<T> pub) throws Throwable {
1058        final ManualSubscriber<T> sub = env.newManualSubscriber(pub);
1059        sub.request(Long.MAX_VALUE / 2); // pending = Long.MAX_VALUE / 2
1060        sub.request(Long.MAX_VALUE / 2); // pending = Long.MAX_VALUE - 1
1061        sub.request(1); // pending = Long.MAX_VALUE
1062
1063        sub.nextElements(totalElements);
1064        sub.expectCompletion();
1065
1066        try {
1067          env.verifyNoAsyncErrorsNoDelay();
1068        } finally {
1069          sub.cancel();
1070        }
1071        
1072      }
1073    });
1074  }
1075
1076  @Override @Test
1077  public void required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue() throws Throwable {
1078    activePublisherTest(Integer.MAX_VALUE, false, new PublisherTestRun<T>() {
1079      @Override public void run(Publisher<T> pub) throws Throwable {
1080        final ManualSubscriberWithSubscriptionSupport<T> sub = new BlackholeSubscriberWithSubscriptionSupport<T>(env) {
1081           // arbitrarily set limit on nuber of request calls signalled, we expect overflow after already 2 calls,
1082           // so 10 is relatively high and safe even if arbitrarily chosen
1083          int callsCounter = 10;
1084
1085          @Override
1086          public void onNext(T element) {
1087            if (env.debugEnabled()) {
1088              env.debug(String.format("%s::onNext(%s)", this, element));
1089            }
1090            if (subscription.isCompleted()) {
1091              if (callsCounter > 0) {
1092                subscription.value().request(Long.MAX_VALUE - 1);
1093                callsCounter--;
1094              } else {
1095                  subscription.value().cancel();
1096              }
1097            } else {
1098              env.flop(String.format("Subscriber::onNext(%s) called before Subscriber::onSubscribe", element));
1099            }
1100          }
1101        };
1102        env.subscribe(pub, sub, env.defaultTimeoutMillis());
1103
1104        // eventually triggers `onNext`, which will then trigger up to `callsCounter` times `request(Long.MAX_VALUE - 1)`
1105        // we're pretty sure to overflow from those
1106        sub.request(1);
1107
1108        // no onError should be signalled
1109        try {
1110          env.verifyNoAsyncErrors();
1111        } finally {
1112          sub.cancel();
1113        }
1114      }
1115    });
1116  }
1117
1118  ///////////////////// ADDITIONAL "COROLLARY" TESTS ////////////////////////
1119
1120  ///////////////////// TEST INFRASTRUCTURE /////////////////////////////////
1121
1122  public interface PublisherTestRun<T> {
1123    public void run(Publisher<T> pub) throws Throwable;
1124  }
1125
1126  /**
1127   * Test for feature that SHOULD/MUST be implemented, using a live publisher.
1128   *
1129   * @param elements the number of elements the Publisher under test  must be able to emit to run this test
1130   * @param completionSignalRequired true if an {@code onComplete} signal is required by this test to run.
1131   *                                 If the tested Publisher is unable to signal completion, tests requireing onComplete signals will be skipped.
1132   *                                 To signal if your Publisher is able to signal completion see {@link PublisherVerification#maxElementsFromPublisher()}.
1133   */
1134  public void activePublisherTest(long elements, boolean completionSignalRequired, PublisherTestRun<T> body) throws Throwable {
1135    if (elements > maxElementsFromPublisher()) {
1136      throw new SkipException(String.format("Unable to run this test, as required elements nr: %d is higher than supported by given producer: %d", elements, maxElementsFromPublisher()));
1137    } else if (completionSignalRequired && maxElementsFromPublisher() == Long.MAX_VALUE) {
1138      throw new SkipException("Unable to run this test, as it requires an onComplete signal, " +
1139                                "which this Publisher is unable to provide (as signalled by returning Long.MAX_VALUE from `maxElementsFromPublisher()`)");
1140    } else {
1141      Publisher<T> pub = createPublisher(elements);
1142      body.run(pub);
1143      env.verifyNoAsyncErrorsNoDelay();
1144    }
1145  }
1146
1147  /**
1148   * Test for feature that MAY be implemented. This test will be marked as SKIPPED if it fails.
1149   *
1150   * @param elements the number of elements the Publisher under test  must be able to emit to run this test
1151   * @param completionSignalRequired true if an {@code onComplete} signal is required by this test to run.
1152   *                                 If the tested Publisher is unable to signal completion, tests requireing onComplete signals will be skipped.
1153   *                                 To signal if your Publisher is able to signal completion see {@link PublisherVerification#maxElementsFromPublisher()}.
1154   */
1155  public void optionalActivePublisherTest(long elements, boolean completionSignalRequired, PublisherTestRun<T> body) throws Throwable {
1156    if (elements > maxElementsFromPublisher()) {
1157      throw new SkipException(String.format("Unable to run this test, as required elements nr: %d is higher than supported by given producer: %d", elements, maxElementsFromPublisher()));
1158    } else if (completionSignalRequired && maxElementsFromPublisher() == Long.MAX_VALUE) {
1159      throw new SkipException("Unable to run this test, as it requires an onComplete signal, " +
1160                                "which this Publisher is unable to provide (as signalled by returning Long.MAX_VALUE from `maxElementsFromPublisher()`)");
1161    } else {
1162
1163      final Publisher<T> pub = createPublisher(elements);
1164      final String skipMessage = "Skipped because tested publisher does NOT implement this OPTIONAL requirement.";
1165
1166      try {
1167        potentiallyPendingTest(pub, body);
1168      } catch (Exception ex) {
1169        notVerified(skipMessage);
1170      } catch (AssertionError ex) {
1171        notVerified(skipMessage + " Reason for skipping was: " + ex.getMessage());
1172      }
1173    }
1174  }
1175
1176  public static final String SKIPPING_NO_ERROR_PUBLISHER_AVAILABLE =
1177    "Skipping because no error state Publisher provided, and the test requires it. " +
1178          "Please implement PublisherVerification#createFailedPublisher to run this test.";
1179
1180  public static final String SKIPPING_OPTIONAL_TEST_FAILED =
1181    "Skipping, because provided Publisher does not pass this *additional* verification.";
1182  /**
1183   * Additional test for Publisher in error state
1184   */
1185  public void whenHasErrorPublisherTest(PublisherTestRun<T> body) throws Throwable {
1186    potentiallyPendingTest(createFailedPublisher(), body, SKIPPING_NO_ERROR_PUBLISHER_AVAILABLE);
1187  }
1188
1189  public void potentiallyPendingTest(Publisher<T> pub, PublisherTestRun<T> body) throws Throwable {
1190    potentiallyPendingTest(pub, body, SKIPPING_OPTIONAL_TEST_FAILED);
1191  }
1192
1193  public void potentiallyPendingTest(Publisher<T> pub, PublisherTestRun<T> body, String message) throws Throwable {
1194    if (pub != null) {
1195      body.run(pub);
1196    } else {
1197      throw new SkipException(message);
1198    }
1199  }
1200
1201  /**
1202   * Executes a given test body {@code n} times.
1203   * All the test runs must pass in order for the stochastic test to pass.
1204   */
1205  public void stochasticTest(int n, Function<Integer, Void> body) throws Throwable {
1206    if (skipStochasticTests()) {
1207      notVerified("Skipping @Stochastic test because `skipStochasticTests()` returned `true`!");
1208    }
1209
1210    for (int i = 0; i < n; i++) {
1211      body.apply(i);
1212    }
1213  }
1214
1215  public void notVerified() {
1216    throw new SkipException("Not verified by this TCK.");
1217  }
1218
1219  /**
1220   * Return this value from {@link PublisherVerification#maxElementsFromPublisher()} to mark that the given {@link org.reactivestreams.Publisher},
1221   * is not able to signal completion. For example it is strictly a time-bound or unbounded source of data.
1222   *
1223   * <b>Returning this value from {@link PublisherVerification#maxElementsFromPublisher()} will result in skipping all TCK tests which require onComplete signals!</b>
1224   */
1225  public long publisherUnableToSignalOnComplete() {
1226    return Long.MAX_VALUE;
1227  }
1228
1229  public void notVerified(String message) {
1230    throw new SkipException(message);
1231  }
1232
1233}