001/************************************************************************
002 * Licensed under Public Domain (CC0)                                    *
003 *                                                                       *
004 * To the extent possible under law, the person who associated CC0 with  *
005 * this code has waived all copyright and related or neighboring         *
006 * rights to this code.                                                  *
007 *                                                                       *
008 * You should have received a copy of the CC0 legalcode along with this  *
009 * work. If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.*
010 ************************************************************************/
011
012package org.reactivestreams.tck;
013
014import org.reactivestreams.Publisher;
015import org.reactivestreams.Subscriber;
016import org.reactivestreams.Subscription;
017import org.reactivestreams.tck.TestEnvironment.BlackholeSubscriberWithSubscriptionSupport;
018import org.reactivestreams.tck.TestEnvironment.Latch;
019import org.reactivestreams.tck.TestEnvironment.ManualSubscriber;
020import org.reactivestreams.tck.TestEnvironment.ManualSubscriberWithSubscriptionSupport;
021import org.reactivestreams.tck.flow.support.Function;
022import org.reactivestreams.tck.flow.support.Optional;
023import org.reactivestreams.tck.flow.support.PublisherVerificationRules;
024import org.testng.SkipException;
025import org.testng.annotations.BeforeMethod;
026import org.testng.annotations.Test;
027
028import java.lang.Override;
029import java.lang.ref.ReferenceQueue;
030import java.lang.ref.WeakReference;
031import java.util.ArrayList;
032import java.util.Arrays;
033import java.util.Collections;
034import java.util.List;
035import java.util.Random;
036import java.util.concurrent.atomic.AtomicInteger;
037import java.util.concurrent.atomic.AtomicReference;
038
039import static org.testng.Assert.assertEquals;
040import static org.testng.Assert.assertTrue;
041
042/**
043 * Provides tests for verifying {@code Publisher} specification rules.
044 *
045 * @see org.reactivestreams.Publisher
046 */
047public abstract class PublisherVerification<T> implements PublisherVerificationRules {
048
049  private static final String PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS_ENV = "PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS";
050  private static final long DEFAULT_PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS = 300L;
051
052  private final TestEnvironment env;
053
054  /**
055   * The amount of time after which a cancelled Subscriber reference should be dropped.
056   * See Rule 3.13 for details.
057   */
058  private final long publisherReferenceGCTimeoutMillis;
059
060  /**
061   * Constructs a new verification class using the given env and configuration.
062   *
063   * @param publisherReferenceGCTimeoutMillis used to determine after how much time a reference to a Subscriber should be already dropped by the Publisher.
064   */
065  public PublisherVerification(TestEnvironment env, long publisherReferenceGCTimeoutMillis) {
066    this.env = env;
067    this.publisherReferenceGCTimeoutMillis = publisherReferenceGCTimeoutMillis;
068  }
069
070  /**
071   * Constructs a new verification class using the given env and configuration.
072   *
073   * The value for {@code publisherReferenceGCTimeoutMillis} will be obtained by using {@link PublisherVerification#envPublisherReferenceGCTimeoutMillis()}.
074   */
075  public PublisherVerification(TestEnvironment env) {
076    this.env = env;
077    this.publisherReferenceGCTimeoutMillis = envPublisherReferenceGCTimeoutMillis();
078  }
079
080  /**
081   * Tries to parse the env variable {@code PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS} as long and returns the value if present,
082   * OR its default value ({@link PublisherVerification#DEFAULT_PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS}).
083   *
084   * This value is used to determine after how much time a reference to a Subscriber should be already dropped by the Publisher.
085   *
086   * @throws java.lang.IllegalArgumentException when unable to parse the env variable
087   */
088  public static long envPublisherReferenceGCTimeoutMillis() {
089    final String envMillis = System.getenv(PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS_ENV);
090    if (envMillis == null) return DEFAULT_PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS;
091    else try {
092      return Long.parseLong(envMillis);
093    } catch (NumberFormatException ex) {
094      throw new IllegalArgumentException(String.format("Unable to parse %s env value [%s] as long!", PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS_ENV, envMillis), ex);
095    }
096  }
097
098  /**
099   * This is the main method you must implement in your test incarnation.
100   * It must create a Publisher for a stream with exactly the given number of elements.
101   * If `elements` is `Long.MAX_VALUE` the produced stream must be infinite.
102   */
103  public abstract Publisher<T> createPublisher(long elements);
104
105  /**
106   * By implementing this method, additional TCK tests concerning a "failed" publishers will be run.
107   *
108   * The expected behaviour of the {@link Publisher} returned by this method is hand out a subscription,
109   * followed by signalling {@code onError} on it, as specified by Rule 1.9.
110   *
111   * If you ignore these additional tests, return {@code null} from this method.
112   */
113  public abstract Publisher<T> createFailedPublisher();
114
115
116  /**
117   * Override and return lower value if your Publisher is only able to produce a known number of elements.
118   * For example, if it is designed to return at-most-one element, return {@code 1} from this method.
119   *
120   * Defaults to {@code Long.MAX_VALUE - 1}, meaning that the Publisher can be produce a huge but NOT an unbounded number of elements.
121   *
122   * To mark your Publisher will *never* signal an {@code onComplete} override this method and return {@code Long.MAX_VALUE},
123   * which will result in *skipping all tests which require an onComplete to be triggered* (!).
124   */
125  public long maxElementsFromPublisher() {
126    return Long.MAX_VALUE - 1;
127  }
128
129  /**
130   * Override and return {@code true} in order to skip executing tests marked as {@code Stochastic}.
131   * Stochastic in this case means that the Rule is impossible or infeasible to deterministically verify—
132   * usually this means that this test case can yield false positives ("be green") even if for some case,
133   * the given implementation may violate the tested behaviour.
134   */
135  public boolean skipStochasticTests() {
136    return false;
137  }
138
139  /**
140   * In order to verify rule 3.3 of the reactive streams spec, this number will be used to check if a
141   * {@code Subscription} actually solves the "unbounded recursion" problem by not allowing the number of
142   * recursive calls to exceed the number returned by this method.
143   *
144   * @see <a href="https://github.com/reactive-streams/reactive-streams-jvm#3.3">reactive streams spec, rule 3.3</a>
145   * @see PublisherVerification#required_spec303_mustNotAllowUnboundedRecursion()
146   */
147  public long boundedDepthOfOnNextAndRequestRecursion() {
148    return 1;
149  }
150
151  ////////////////////// TEST ENV CLEANUP /////////////////////////////////////
152
153  @BeforeMethod
154  public void setUp() throws Exception {
155    env.clearAsyncErrors();
156  }
157
158  ////////////////////// TEST SETUP VERIFICATION //////////////////////////////
159
160  @Override @Test
161  public void required_createPublisher1MustProduceAStreamOfExactly1Element() throws Throwable {
162    activePublisherTest(1, true, new PublisherTestRun<T>() {
163      @Override
164      public void run(Publisher<T> pub) throws InterruptedException {
165        ManualSubscriber<T> sub = env.newManualSubscriber(pub);
166        assertTrue(requestNextElementOrEndOfStream(pub, sub).isDefined(), String.format("Publisher %s produced no elements", pub));
167        sub.requestEndOfStream();
168      }
169
170      Optional<T> requestNextElementOrEndOfStream(Publisher<T> pub, ManualSubscriber<T> sub) throws InterruptedException {
171        return sub.requestNextElementOrEndOfStream(String.format("Timeout while waiting for next element from Publisher %s", pub));
172      }
173
174    });
175  }
176
177  @Override @Test
178  public void required_createPublisher3MustProduceAStreamOfExactly3Elements() throws Throwable {
179    activePublisherTest(3, true, new PublisherTestRun<T>() {
180      @Override
181      public void run(Publisher<T> pub) throws InterruptedException {
182        ManualSubscriber<T> sub = env.newManualSubscriber(pub);
183        assertTrue(requestNextElementOrEndOfStream(pub, sub).isDefined(), String.format("Publisher %s produced no elements", pub));
184        assertTrue(requestNextElementOrEndOfStream(pub, sub).isDefined(), String.format("Publisher %s produced only 1 element", pub));
185        assertTrue(requestNextElementOrEndOfStream(pub, sub).isDefined(), String.format("Publisher %s produced only 2 elements", pub));
186        sub.requestEndOfStream();
187      }
188
189      Optional<T> requestNextElementOrEndOfStream(Publisher<T> pub, ManualSubscriber<T> sub) throws InterruptedException {
190        return sub.requestNextElementOrEndOfStream(String.format("Timeout while waiting for next element from Publisher %s", pub));
191      }
192
193    });
194  }
195
196  @Override @Test
197  public void required_validate_maxElementsFromPublisher() throws Exception {
198    assertTrue(maxElementsFromPublisher() >= 0, "maxElementsFromPublisher MUST return a number >= 0");
199  }
200
201  @Override @Test
202  public void required_validate_boundedDepthOfOnNextAndRequestRecursion() throws Exception {
203    assertTrue(boundedDepthOfOnNextAndRequestRecursion() >= 1, "boundedDepthOfOnNextAndRequestRecursion must return a number >= 1");
204  }
205
206
207  ////////////////////// SPEC RULE VERIFICATION ///////////////////////////////
208
209  @Override @Test
210  public void required_spec101_subscriptionRequestMustResultInTheCorrectNumberOfProducedElements() throws Throwable {
211    activePublisherTest(5, false, new PublisherTestRun<T>() {
212      @Override
213      public void run(Publisher<T> pub) throws InterruptedException {
214
215        ManualSubscriber<T> sub = env.newManualSubscriber(pub);
216        try {
217            sub.expectNone(String.format("Publisher %s produced value before the first `request`: ", pub));
218            sub.request(1);
219            sub.nextElement(String.format("Publisher %s produced no element after first `request`", pub));
220            sub.expectNone(String.format("Publisher %s produced unrequested: ", pub));
221    
222            sub.request(1);
223            sub.request(2);
224            sub.nextElements(3, env.defaultTimeoutMillis(), String.format("Publisher %s produced less than 3 elements after two respective `request` calls", pub));
225
226            sub.expectNone(String.format("Publisher %sproduced unrequested ", pub));
227        } finally {
228            sub.cancel();
229        }
230      }
231    });
232  }
233
234  @Override @Test
235  public void required_spec102_maySignalLessThanRequestedAndTerminateSubscription() throws Throwable {
236    final int elements = 3;
237    final int requested = 10;
238
239    activePublisherTest(elements, true, new PublisherTestRun<T>() {
240      @Override
241      public void run(Publisher<T> pub) throws Throwable {
242        final ManualSubscriber<T> sub = env.newManualSubscriber(pub);
243        sub.request(requested);
244        sub.nextElements(elements);
245        sub.expectCompletion();
246      }
247    });
248  }
249
250  @Override @Test
251  public void stochastic_spec103_mustSignalOnMethodsSequentially() throws Throwable {
252    final int iterations = 100;
253    final int elements = 10;
254
255    stochasticTest(iterations, new Function<Integer, Void>() {
256      @Override
257      public Void apply(final Integer runNumber) throws Throwable {
258        activePublisherTest(elements, true, new PublisherTestRun<T>() {
259          @Override
260          public void run(Publisher<T> pub) throws Throwable {
261            final Latch completionLatch = new Latch(env);
262
263            final AtomicInteger gotElements = new AtomicInteger(0);
264            pub.subscribe(new Subscriber<T>() {
265              private Subscription subs;
266
267              private ConcurrentAccessBarrier concurrentAccessBarrier = new ConcurrentAccessBarrier();
268
269              /**
270               * Concept wise very similar to a {@link org.reactivestreams.tck.TestEnvironment.Latch}, serves to protect
271               * a critical section from concurrent access, with the added benefit of Thread tracking and same-thread-access awareness.
272               *
273               * Since a <i>Synchronous</i> Publisher may choose to synchronously (using the same {@link Thread}) call
274               * {@code onNext} directly from either {@code subscribe} or {@code request} a plain Latch is not enough
275               * to verify concurrent access safety - one needs to track if the caller is not still using the calling thread
276               * to enter subsequent critical sections ("nesting" them effectively).
277               */
278              final class ConcurrentAccessBarrier {
279                private AtomicReference<Thread> currentlySignallingThread = new AtomicReference<Thread>(null);
280                private volatile String previousSignal = null;
281
282                public void enterSignal(String signalName) {
283                  if((!currentlySignallingThread.compareAndSet(null, Thread.currentThread())) && !isSynchronousSignal()) {
284                    env.flop(String.format(
285                      "Illegal concurrent access detected (entering critical section)! " +
286                        "%s emited %s signal, before %s finished its %s signal.",
287                        Thread.currentThread(), signalName, currentlySignallingThread.get(), previousSignal));
288                  }
289                  this.previousSignal = signalName;
290                }
291
292                public void leaveSignal(String signalName) {
293                  currentlySignallingThread.set(null);
294                  this.previousSignal = signalName;
295                }
296
297                private boolean isSynchronousSignal() {
298                  return (previousSignal != null) && Thread.currentThread().equals(currentlySignallingThread.get());
299                }
300
301              }
302
303              @Override
304              public void onSubscribe(Subscription s) {
305                final String signal = "onSubscribe()";
306                concurrentAccessBarrier.enterSignal(signal);
307
308                subs = s;
309                subs.request(1);
310
311                concurrentAccessBarrier.leaveSignal(signal);
312              }
313
314              @Override
315              public void onNext(T ignore) {
316                final String signal = String.format("onNext(%s)", ignore);
317                concurrentAccessBarrier.enterSignal(signal);
318
319                if (gotElements.incrementAndGet() <= elements) // requesting one more than we know are in the stream (some Publishers need this)
320                  subs.request(1);
321
322                concurrentAccessBarrier.leaveSignal(signal);
323              }
324
325              @Override
326              public void onError(Throwable t) {
327                final String signal = String.format("onError(%s)", t.getMessage());
328                concurrentAccessBarrier.enterSignal(signal);
329
330                // ignore value
331
332                concurrentAccessBarrier.leaveSignal(signal);
333              }
334
335              @Override
336              public void onComplete() {
337                final String signal = "onComplete()";
338                concurrentAccessBarrier.enterSignal(signal);
339
340                // entering for completeness
341
342                concurrentAccessBarrier.leaveSignal(signal);
343                completionLatch.close();
344              }
345            });
346
347            completionLatch.expectClose(
348              elements * env.defaultTimeoutMillis(),
349              String.format("Failed in iteration %d of %d. Expected completion signal after signalling %d elements (signalled %d), yet did not receive it",
350                            runNumber, iterations, elements, gotElements.get()));
351          }
352        });
353        return null;
354      }
355    });
356  }
357
358  @Override @Test
359  public void optional_spec104_mustSignalOnErrorWhenFails() throws Throwable {
360    try {
361      whenHasErrorPublisherTest(new PublisherTestRun<T>() {
362        @Override
363        public void run(final Publisher<T> pub) throws InterruptedException {
364          final Latch onErrorlatch = new Latch(env);
365          final Latch onSubscribeLatch = new Latch(env);
366          pub.subscribe(new TestEnvironment.TestSubscriber<T>(env) {
367            @Override
368            public void onSubscribe(Subscription subs) {
369              onSubscribeLatch.assertOpen("Only one onSubscribe call expected");
370              onSubscribeLatch.close();
371            }
372            @Override
373            public void onError(Throwable cause) {
374              onSubscribeLatch.assertClosed("onSubscribe should be called prior to onError always");
375              onErrorlatch.assertOpen(String.format("Error-state Publisher %s called `onError` twice on new Subscriber", pub));
376              onErrorlatch.close();
377            }
378          });
379
380          onSubscribeLatch.expectClose("Should have received onSubscribe");
381          onErrorlatch.expectClose(String.format("Error-state Publisher %s did not call `onError` on new Subscriber", pub));
382
383          env.verifyNoAsyncErrors();
384          }
385      });
386    } catch (SkipException se) {
387      throw se;
388    } catch (Throwable ex) {
389      // we also want to catch AssertionErrors and anything the publisher may have thrown inside subscribe
390      // which was wrong of him - he should have signalled on error using onError
391      throw new RuntimeException(String.format("Publisher threw exception (%s) instead of signalling error via onError!", ex.getMessage()), ex);
392    }
393  }
394
395  @Override @Test
396  public void required_spec105_mustSignalOnCompleteWhenFiniteStreamTerminates() throws Throwable {
397    activePublisherTest(3, true, new PublisherTestRun<T>() {
398      @Override
399      public void run(Publisher<T> pub) throws Throwable {
400        ManualSubscriber<T> sub = env.newManualSubscriber(pub);
401        sub.requestNextElement();
402        sub.requestNextElement();
403        sub.requestNextElement();
404        sub.requestEndOfStream();
405        sub.expectNone();
406      }
407    });
408  }
409
410  @Override @Test
411  public void optional_spec105_emptyStreamMustTerminateBySignallingOnComplete() throws Throwable {
412    optionalActivePublisherTest(0, true, new PublisherTestRun<T>() {
413      @Override
414      public void run(Publisher<T> pub) throws Throwable {
415        ManualSubscriber<T> sub = env.newManualSubscriber(pub);
416        sub.request(1);
417        sub.expectCompletion();
418        sub.expectNone();
419      }
420    });
421  }
422
423  @Override @Test
424  public void untested_spec106_mustConsiderSubscriptionCancelledAfterOnErrorOrOnCompleteHasBeenCalled() throws Throwable {
425    notVerified(); // not really testable without more control over the Publisher
426  }
427
428  @Override @Test
429  public void required_spec107_mustNotEmitFurtherSignalsOnceOnCompleteHasBeenSignalled() throws Throwable {
430    activePublisherTest(1, true, new PublisherTestRun<T>() {
431      @Override
432      public void run(Publisher<T> pub) throws Throwable {
433        ManualSubscriber<T> sub = env.newManualSubscriber(pub);
434        sub.request(10);
435        sub.nextElement();
436        sub.expectCompletion();
437
438        sub.request(10);
439        sub.expectNone();
440      }
441    });
442  }
443
444  @Override @Test
445  public void untested_spec107_mustNotEmitFurtherSignalsOnceOnErrorHasBeenSignalled() throws Throwable {
446    notVerified(); // can we meaningfully test this, without more control over the publisher?
447  }
448
449  @Override @Test
450  public void untested_spec108_possiblyCanceledSubscriptionShouldNotReceiveOnErrorOrOnCompleteSignals() throws Throwable {
451    notVerified(); // can we meaningfully test this?
452  }
453
454  @Override @Test
455  public void untested_spec109_subscribeShouldNotThrowNonFatalThrowable() throws Throwable {
456    notVerified(); // can we meaningfully test this?
457  }
458
459  @Override @Test
460  public void required_spec109_subscribeThrowNPEOnNullSubscriber() throws Throwable {
461    activePublisherTest(0, false, new PublisherTestRun<T>() {
462      @Override
463      public void run(Publisher<T> pub) throws Throwable {
464        try {
465            pub.subscribe(null);
466            env.flop("Publisher did not throw a NullPointerException when given a null Subscribe in subscribe");
467        } catch (NullPointerException ignored) {
468          // valid behaviour
469        }
470        env.verifyNoAsyncErrorsNoDelay();
471      }
472    });
473  }
474
475  @Override @Test
476  public void required_spec109_mustIssueOnSubscribeForNonNullSubscriber() throws Throwable {
477    activePublisherTest(0, false, new PublisherTestRun<T>() {
478      @Override
479      public void run(Publisher<T> pub) throws Throwable {
480        final Latch onSubscribeLatch = new Latch(env);
481        final AtomicReference<Subscription> cancel = new AtomicReference<Subscription>();
482        try {
483          pub.subscribe(new Subscriber<T>() {
484            @Override
485            public void onError(Throwable cause) {
486              onSubscribeLatch.assertClosed("onSubscribe should be called prior to onError always");
487            }
488    
489            @Override
490            public void onSubscribe(Subscription subs) {
491              cancel.set(subs);
492              onSubscribeLatch.assertOpen("Only one onSubscribe call expected");
493              onSubscribeLatch.close();
494            }
495    
496            @Override
497            public void onNext(T elem) {
498              onSubscribeLatch.assertClosed("onSubscribe should be called prior to onNext always");
499            }
500    
501            @Override
502            public void onComplete() {
503              onSubscribeLatch.assertClosed("onSubscribe should be called prior to onComplete always");
504            }
505          });
506          onSubscribeLatch.expectClose("Should have received onSubscribe");
507          env.verifyNoAsyncErrorsNoDelay();
508        } finally {
509          Subscription s = cancel.getAndSet(null);
510          if (s != null) {
511            s.cancel();
512          }
513        }
514      }
515    });
516  }
517
518  @Override @Test
519  public void required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorAfterOnSubscribe() throws Throwable {
520    whenHasErrorPublisherTest(new PublisherTestRun<T>() {
521      @Override
522      public void run(Publisher<T> pub) throws Throwable {
523        final Latch onErrorLatch = new Latch(env);
524        final Latch onSubscribeLatch = new Latch(env);
525        ManualSubscriberWithSubscriptionSupport<T> sub = new ManualSubscriberWithSubscriptionSupport<T>(env) {
526          @Override
527          public void onError(Throwable cause) {
528            onSubscribeLatch.assertClosed("onSubscribe should be called prior to onError always");
529            onErrorLatch.assertOpen("Only one onError call expected");
530            onErrorLatch.close();
531          }
532
533          @Override
534          public void onSubscribe(Subscription subs) {
535            onSubscribeLatch.assertOpen("Only one onSubscribe call expected");
536            onSubscribeLatch.close();
537          }
538        };
539        pub.subscribe(sub);
540        onSubscribeLatch.expectClose("Should have received onSubscribe");
541        onErrorLatch.expectClose("Should have received onError");
542
543        env.verifyNoAsyncErrorsNoDelay();
544      }
545    });
546  }
547
548  @Override @Test
549  public void untested_spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice() throws Throwable {
550    notVerified(); // can we meaningfully test this?
551  }
552
553  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#1.11
554  @Override @Test
555  public void optional_spec111_maySupportMultiSubscribe() throws Throwable {
556    optionalActivePublisherTest(1, false, new PublisherTestRun<T>() {
557      @Override
558      public void run(Publisher<T> pub) throws Throwable {
559        ManualSubscriber<T> sub1 = env.newManualSubscriber(pub);
560        ManualSubscriber<T> sub2 = env.newManualSubscriber(pub);
561
562        try {
563          env.verifyNoAsyncErrors();
564        } finally {
565          try {
566            sub1.cancel();
567          } finally {
568            sub2.cancel();
569          }
570        }
571      }
572    });
573  }
574
575  @Override @Test
576  public void optional_spec111_registeredSubscribersMustReceiveOnNextOrOnCompleteSignals() throws Throwable {
577    optionalActivePublisherTest(1, false, new PublisherTestRun<T>() {
578      @Override
579      public void run(Publisher<T> pub) throws Throwable {
580        ManualSubscriber<T> sub1 = env.newManualSubscriber(pub);
581        ManualSubscriber<T> sub2 = env.newManualSubscriber(pub);
582        // Since we're testing the case when the Publisher DOES support the optional multi-subscribers scenario,
583        // and decides if it handles them uni-cast or multi-cast, we don't know which subscriber will receive an
584        // onNext (and optional onComplete) signal(s) and which just onComplete signal.
585        // Plus, even if subscription assumed to be unicast, it's implementation choice, which one will be signalled
586        // with onNext.
587        sub1.requestNextElementOrEndOfStream();
588        sub2.requestNextElementOrEndOfStream();
589        try {
590            env.verifyNoAsyncErrors();
591        } finally {
592            try {
593                sub1.cancel();
594            } finally {
595                sub2.cancel();
596            }
597        }
598      }
599    });
600  }
601
602  @Override @Test
603  public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne() throws Throwable {
604    optionalActivePublisherTest(5, true, new PublisherTestRun<T>() { // This test is skipped if the publisher is unbounded (never sends onComplete)
605      @Override
606      public void run(Publisher<T> pub) throws InterruptedException {
607        ManualSubscriber<T> sub1 = env.newManualSubscriber(pub);
608        ManualSubscriber<T> sub2 = env.newManualSubscriber(pub);
609        ManualSubscriber<T> sub3 = env.newManualSubscriber(pub);
610
611        sub1.request(1);
612        T x1 = sub1.nextElement(String.format("Publisher %s did not produce the requested 1 element on 1st subscriber", pub));
613        sub2.request(2);
614        List<T> y1 = sub2.nextElements(2, String.format("Publisher %s did not produce the requested 2 elements on 2nd subscriber", pub));
615        sub1.request(1);
616        T x2 = sub1.nextElement(String.format("Publisher %s did not produce the requested 1 element on 1st subscriber", pub));
617        sub3.request(3);
618        List<T> z1 = sub3.nextElements(3, String.format("Publisher %s did not produce the requested 3 elements on 3rd subscriber", pub));
619        sub3.request(1);
620        T z2 = sub3.nextElement(String.format("Publisher %s did not produce the requested 1 element on 3rd subscriber", pub));
621        sub3.request(1);
622        T z3 = sub3.nextElement(String.format("Publisher %s did not produce the requested 1 element on 3rd subscriber", pub));
623        sub3.requestEndOfStream(String.format("Publisher %s did not complete the stream as expected on 3rd subscriber", pub));
624        sub2.request(3);
625        List<T> y2 = sub2.nextElements(3, String.format("Publisher %s did not produce the requested 3 elements on 2nd subscriber", pub));
626        sub2.requestEndOfStream(String.format("Publisher %s did not complete the stream as expected on 2nd subscriber", pub));
627        sub1.request(2);
628        List<T> x3 = sub1.nextElements(2, String.format("Publisher %s did not produce the requested 2 elements on 1st subscriber", pub));
629        sub1.request(1);
630        T x4 = sub1.nextElement(String.format("Publisher %s did not produce the requested 1 element on 1st subscriber", pub));
631        sub1.requestEndOfStream(String.format("Publisher %s did not complete the stream as expected on 1st subscriber", pub));
632
633        @SuppressWarnings("unchecked")
634        List<T> r = new ArrayList<T>(Arrays.asList(x1, x2));
635        r.addAll(x3);
636        r.addAll(Collections.singleton(x4));
637
638        List<T> check1 = new ArrayList<T>(y1);
639        check1.addAll(y2);
640
641        //noinspection unchecked
642        List<T> check2 = new ArrayList<T>(z1);
643        check2.add(z2);
644        check2.add(z3);
645
646        assertEquals(r, check1, String.format("Publisher %s did not produce the same element sequence for subscribers 1 and 2", pub));
647        assertEquals(r, check2, String.format("Publisher %s did not produce the same element sequence for subscribers 1 and 3", pub));
648      }
649    });
650  }
651
652  @Override @Test
653  public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront() throws Throwable {
654    optionalActivePublisherTest(3, false, new PublisherTestRun<T>() { // This test is skipped if the publisher cannot produce enough elements
655      @Override
656      public void run(Publisher<T> pub) throws Throwable {
657        ManualSubscriber<T> sub1 = env.newManualSubscriber(pub);
658        ManualSubscriber<T> sub2 = env.newManualSubscriber(pub);
659        ManualSubscriber<T> sub3 = env.newManualSubscriber(pub);
660
661        List<T> received1 = new ArrayList<T>();
662        List<T> received2 = new ArrayList<T>();
663        List<T> received3 = new ArrayList<T>();
664
665        // if the publisher must touch it's source to notice it's been drained, the OnComplete won't come until we ask for more than it actually contains...
666        // edgy edge case?
667        sub1.request(4);
668        sub2.request(4);
669        sub3.request(4);
670
671        received1.addAll(sub1.nextElements(3));
672        received2.addAll(sub2.nextElements(3));
673        received3.addAll(sub3.nextElements(3));
674
675        // NOTE: can't check completion, the Publisher may not be able to signal it
676        //       a similar test *with* completion checking is implemented
677
678        assertEquals(received1, received2, String.format("Expected elements to be signaled in the same sequence to 1st and 2nd subscribers"));
679        assertEquals(received2, received3, String.format("Expected elements to be signaled in the same sequence to 2nd and 3rd subscribers"));
680      }
681    });
682  }
683
684  @Override @Test
685  public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected() throws Throwable {
686    optionalActivePublisherTest(3, true, new PublisherTestRun<T>() { // This test is skipped if the publisher is unbounded (never sends onComplete)
687      @Override
688      public void run(Publisher<T> pub) throws Throwable {
689        ManualSubscriber<T> sub1 = env.newManualSubscriber(pub);
690        ManualSubscriber<T> sub2 = env.newManualSubscriber(pub);
691        ManualSubscriber<T> sub3 = env.newManualSubscriber(pub);
692
693        List<T> received1 = new ArrayList<T>();
694        List<T> received2 = new ArrayList<T>();
695        List<T> received3 = new ArrayList<T>();
696
697        // if the publisher must touch it's source to notice it's been drained, the OnComplete won't come until we ask for more than it actually contains...
698        // edgy edge case?
699        sub1.request(4);
700        sub2.request(4);
701        sub3.request(4);
702
703        received1.addAll(sub1.nextElements(3));
704        received2.addAll(sub2.nextElements(3));
705        received3.addAll(sub3.nextElements(3));
706
707        sub1.expectCompletion();
708        sub2.expectCompletion();
709        sub3.expectCompletion();
710
711        assertEquals(received1, received2, String.format("Expected elements to be signaled in the same sequence to 1st and 2nd subscribers"));
712        assertEquals(received2, received3, String.format("Expected elements to be signaled in the same sequence to 2nd and 3rd subscribers"));
713      }
714    });
715  }
716
717  ///////////////////// SUBSCRIPTION TESTS //////////////////////////////////
718
719  @Override @Test
720  public void required_spec302_mustAllowSynchronousRequestCallsFromOnNextAndOnSubscribe() throws Throwable {
721    activePublisherTest(6, false, new PublisherTestRun<T>() {
722      @Override
723      public void run(Publisher<T> pub) throws Throwable {
724        ManualSubscriber<T> sub = new ManualSubscriber<T>(env) {
725          @Override
726          public void onSubscribe(Subscription subs) {
727            this.subscription.completeImmediatly(subs);
728
729            subs.request(1);
730            subs.request(1);
731            subs.request(1);
732          }
733
734          @Override
735          public void onNext(T element) {
736            Subscription subs = this.subscription.value();
737            subs.request(1);
738          }
739        };
740
741        env.subscribe(pub, sub);
742
743        env.verifyNoAsyncErrors();
744      }
745    });
746  }
747
748  @Override @Test
749  public void required_spec303_mustNotAllowUnboundedRecursion() throws Throwable {
750    final long oneMoreThanBoundedLimit = boundedDepthOfOnNextAndRequestRecursion() + 1;
751
752    activePublisherTest(oneMoreThanBoundedLimit, false, new PublisherTestRun<T>() {
753      @Override
754      public void run(Publisher<T> pub) throws Throwable {
755        final ThreadLocal<Long> stackDepthCounter = new ThreadLocal<Long>() {
756          @Override
757          protected Long initialValue() {
758            return 0L;
759          }
760        };
761
762        final Latch runCompleted = new Latch(env);
763
764        final ManualSubscriber<T> sub = new ManualSubscriberWithSubscriptionSupport<T>(env) {
765          // counts the number of signals received, used to break out from possibly infinite request/onNext loops
766          long signalsReceived = 0L;
767
768          @Override
769          public void onNext(T element) {
770            // NOT calling super.onNext as this test only cares about stack depths, not the actual values of elements
771            // which also simplifies this test as we do not have to drain the test buffer, which would otherwise be in danger of overflowing
772
773            signalsReceived += 1;
774            stackDepthCounter.set(stackDepthCounter.get() + 1);
775            env.debug(String.format("%s(recursion depth: %d)::onNext(%s)", this, stackDepthCounter.get(), element));
776
777            final long callsUntilNow = stackDepthCounter.get();
778            if (callsUntilNow > boundedDepthOfOnNextAndRequestRecursion()) {
779              env.flop(String.format("Got %d onNext calls within thread: %s, yet expected recursive bound was %d",
780                                     callsUntilNow, Thread.currentThread(), boundedDepthOfOnNextAndRequestRecursion()));
781
782              // stop the recursive call chain
783              runCompleted.close();
784              return;
785            } else if (signalsReceived >= oneMoreThanBoundedLimit) {
786              // since max number of signals reached, and recursion depth not exceeded, we judge this as a success and
787              // stop the recursive call chain
788              runCompleted.close();
789              return;
790            }
791
792            // request more right away, the Publisher must break the recursion
793            subscription.value().request(1);
794
795            stackDepthCounter.set(stackDepthCounter.get() - 1);
796          }
797
798          @Override
799          public void onComplete() {
800            super.onComplete();
801            runCompleted.close();
802          }
803
804          @Override
805          public void onError(Throwable cause) {
806            super.onError(cause);
807            runCompleted.close();
808          }
809        };
810
811        try {
812          env.subscribe(pub, sub);
813
814          sub.request(1); // kick-off the `request -> onNext -> request -> onNext -> ...`
815
816          final String msg = String.format("Unable to validate call stack depth safety, " +
817                                               "awaited at-most %s signals (`maxOnNextSignalsInRecursionTest()`) or completion",
818                                           oneMoreThanBoundedLimit);
819          runCompleted.expectClose(env.defaultTimeoutMillis(), msg);
820          env.verifyNoAsyncErrorsNoDelay();
821        } finally {
822          // since the request/onNext recursive calls may keep the publisher running "forever",
823          // we MUST cancel it manually before exiting this test case
824          sub.cancel();
825        }
826      }
827    });
828  }
829
830  @Override @Test
831  public void untested_spec304_requestShouldNotPerformHeavyComputations() throws Exception {
832    notVerified(); // cannot be meaningfully tested, or can it?
833  }
834
835  @Override @Test
836  public void untested_spec305_cancelMustNotSynchronouslyPerformHeavyComputation() throws Exception {
837    notVerified(); // cannot be meaningfully tested, or can it?
838  }
839
840  @Override @Test
841  public void required_spec306_afterSubscriptionIsCancelledRequestMustBeNops() throws Throwable {
842    activePublisherTest(3, false, new PublisherTestRun<T>() {
843      @Override
844      public void run(Publisher<T> pub) throws Throwable {
845
846        // override ManualSubscriberWithSubscriptionSupport#cancel because by default a ManualSubscriber will drop the
847        // subscription once it's cancelled (as expected).
848        // In this test however it must keep the cancelled Subscription and keep issuing `request(long)` to it.
849        ManualSubscriber<T> sub = new ManualSubscriberWithSubscriptionSupport<T>(env) {
850          @Override
851          public void cancel() {
852            if (subscription.isCompleted()) {
853              subscription.value().cancel();
854            } else {
855              env.flop("Cannot cancel a subscription before having received it");
856            }
857          }
858        };
859
860        env.subscribe(pub, sub);
861
862        sub.cancel();
863        sub.request(1);
864        sub.request(1);
865        sub.request(1);
866
867        sub.expectNone();
868        env.verifyNoAsyncErrorsNoDelay();
869      }
870    });
871  }
872
873  @Override @Test
874  public void required_spec307_afterSubscriptionIsCancelledAdditionalCancelationsMustBeNops() throws Throwable {
875    activePublisherTest(1, false, new PublisherTestRun<T>() {
876      @Override
877      public void run(Publisher<T> pub) throws Throwable {
878        final ManualSubscriber<T> sub = env.newManualSubscriber(pub);
879
880        // leak the Subscription
881        final Subscription subs = sub.subscription.value();
882
883        subs.cancel();
884        subs.cancel();
885        subs.cancel();
886
887        sub.expectNone();
888        env.verifyNoAsyncErrorsNoDelay();
889      }
890    });
891  }
892
893  @Override @Test
894  public void required_spec309_requestZeroMustSignalIllegalArgumentException() throws Throwable {
895    activePublisherTest(10, false, new PublisherTestRun<T>() {
896      @Override public void run(Publisher<T> pub) throws Throwable {
897        final ManualSubscriber<T> sub = env.newManualSubscriber(pub);
898        sub.request(0);
899        sub.expectError(IllegalArgumentException.class);
900      }
901    });
902  }
903
904  @Override @Test
905  public void required_spec309_requestNegativeNumberMustSignalIllegalArgumentException() throws Throwable {
906    activePublisherTest(10, false, new PublisherTestRun<T>() {
907      @Override
908      public void run(Publisher<T> pub) throws Throwable {
909        final ManualSubscriber<T> sub = env.newManualSubscriber(pub);
910        final Random r = new Random();
911        sub.request(-r.nextInt(Integer.MAX_VALUE) - 1);
912        // we do require implementations to mention the rule number at the very least, or mentioning that the non-negative request is the problem
913        sub.expectError(IllegalArgumentException.class); 
914      }
915    });
916  }
917
918  @Override @Test
919  public void optional_spec309_requestNegativeNumberMaySignalIllegalArgumentExceptionWithSpecificMessage() throws Throwable {
920    optionalActivePublisherTest(10, false, new PublisherTestRun<T>() {
921      @Override
922      public void run(Publisher<T> pub) throws Throwable {
923        final ManualSubscriber<T> sub = env.newManualSubscriber(pub);
924        final Random r = new Random();
925        sub.request(-r.nextInt(Integer.MAX_VALUE) - 1);
926        // we do require implementations to mention the rule number at the very least, or mentioning that the non-negative request is the problem
927        sub.expectErrorWithMessage(IllegalArgumentException.class, Arrays.asList("3.9", "non-positive subscription request", "negative subscription request")); 
928      }
929    });
930  }
931
932  @Override @Test
933  public void required_spec312_cancelMustMakeThePublisherToEventuallyStopSignaling() throws Throwable {
934    // the publisher is able to signal more elements than the subscriber will be requesting in total
935    final int publisherElements = 20;
936
937    final int demand1 = 10;
938    final int demand2 = 5;
939    final int totalDemand = demand1 + demand2;
940
941    activePublisherTest(publisherElements, false, new PublisherTestRun<T>() {
942      @Override @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
943      public void run(Publisher<T> pub) throws Throwable {
944        final ManualSubscriber<T> sub = env.newManualSubscriber(pub);
945
946        sub.request(demand1);
947        sub.request(demand2);
948
949        /*
950          NOTE: The order of the nextElement/cancel calls below is very important (!)
951
952          If this ordering was reversed, given an asynchronous publisher,
953          the following scenario would be *legal* and would break this test:
954
955          > AsyncPublisher receives request(10) - it does not emit data right away, it's asynchronous
956          > AsyncPublisher receives request(5) - demand is now 15
957          ! AsyncPublisher didn't emit any onNext yet (!)
958          > AsyncPublisher receives cancel() - handles it right away, by "stopping itself" for example
959          ! cancel was handled hefore the AsyncPublisher ever got the chance to emit data
960          ! the subscriber ends up never receiving even one element - the test is stuck (and fails, even on valid Publisher)
961
962          Which is why we must first expect an element, and then cancel, once the producing is "running".
963         */
964        sub.nextElement();
965        sub.cancel();
966
967        int onNextsSignalled = 1;
968
969        boolean stillBeingSignalled;
970        do {
971          // put asyncError if onNext signal received
972          sub.expectNone();
973          Throwable error = env.dropAsyncError();
974
975          if (error == null) {
976            stillBeingSignalled = false;
977          } else {
978            onNextsSignalled += 1;
979            stillBeingSignalled = true;
980          }
981
982          // if the Publisher tries to emit more elements than was requested (and/or ignores cancellation) this will throw
983          assertTrue(onNextsSignalled <= totalDemand,
984                     String.format("Publisher signalled [%d] elements, which is more than the signalled demand: %d",
985                                   onNextsSignalled, totalDemand));
986
987        } while (stillBeingSignalled);
988      }
989    });
990
991    env.verifyNoAsyncErrorsNoDelay();
992  }
993
994  @Override @Test
995  public void required_spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber() throws Throwable {
996    final ReferenceQueue<ManualSubscriber<T>> queue = new ReferenceQueue<ManualSubscriber<T>>();
997
998    final Function<Publisher<T>, WeakReference<ManualSubscriber<T>>> run = new Function<Publisher<T>, WeakReference<ManualSubscriber<T>>>() {
999      @Override
1000      public WeakReference<ManualSubscriber<T>> apply(Publisher<T> pub) throws Exception {
1001        final ManualSubscriber<T> sub = env.newManualSubscriber(pub);
1002        final WeakReference<ManualSubscriber<T>> ref = new WeakReference<ManualSubscriber<T>>(sub, queue);
1003
1004        sub.request(1);
1005        sub.nextElement();
1006        sub.cancel();
1007
1008        return ref;
1009      }
1010    };
1011
1012    activePublisherTest(3, false, new PublisherTestRun<T>() {
1013      @Override
1014      public void run(Publisher<T> pub) throws Throwable {
1015        final WeakReference<ManualSubscriber<T>> ref = run.apply(pub);
1016
1017        // cancel may be run asynchronously so we add a sleep before running the GC
1018        // to "resolve" the race
1019        Thread.sleep(publisherReferenceGCTimeoutMillis);
1020        System.gc();
1021
1022        if (!ref.equals(queue.remove(100))) {
1023          env.flop(String.format("Publisher %s did not drop reference to test subscriber after subscription cancellation", pub));
1024        }
1025
1026        env.verifyNoAsyncErrorsNoDelay();
1027      }
1028    });
1029  }
1030
1031  @Override @Test
1032  public void required_spec317_mustSupportAPendingElementCountUpToLongMaxValue() throws Throwable {
1033    final int totalElements = 3;
1034
1035    activePublisherTest(totalElements, true, new PublisherTestRun<T>() {
1036      @Override
1037      public void run(Publisher<T> pub) throws Throwable {
1038        ManualSubscriber<T> sub = env.newManualSubscriber(pub);
1039        sub.request(Long.MAX_VALUE);
1040
1041        sub.nextElements(totalElements);
1042        sub.expectCompletion();
1043
1044        env.verifyNoAsyncErrorsNoDelay();
1045      }
1046    });
1047  }
1048
1049  @Override @Test
1050  public void required_spec317_mustSupportACumulativePendingElementCountUpToLongMaxValue() throws Throwable {
1051    final int totalElements = 3;
1052
1053    activePublisherTest(totalElements, true, new PublisherTestRun<T>() {
1054      @Override
1055      public void run(Publisher<T> pub) throws Throwable {
1056        final ManualSubscriber<T> sub = env.newManualSubscriber(pub);
1057        sub.request(Long.MAX_VALUE / 2); // pending = Long.MAX_VALUE / 2
1058        sub.request(Long.MAX_VALUE / 2); // pending = Long.MAX_VALUE - 1
1059        sub.request(1); // pending = Long.MAX_VALUE
1060
1061        sub.nextElements(totalElements);
1062        sub.expectCompletion();
1063
1064        try {
1065          env.verifyNoAsyncErrorsNoDelay();
1066        } finally {
1067          sub.cancel();
1068        }
1069        
1070      }
1071    });
1072  }
1073
1074  @Override @Test
1075  public void required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue() throws Throwable {
1076    activePublisherTest(Integer.MAX_VALUE, false, new PublisherTestRun<T>() {
1077      @Override public void run(Publisher<T> pub) throws Throwable {
1078        final ManualSubscriberWithSubscriptionSupport<T> sub = new BlackholeSubscriberWithSubscriptionSupport<T>(env) {
1079           // arbitrarily set limit on nuber of request calls signalled, we expect overflow after already 2 calls,
1080           // so 10 is relatively high and safe even if arbitrarily chosen
1081          int callsCounter = 10;
1082
1083          @Override
1084          public void onNext(T element) {
1085            env.debug(String.format("%s::onNext(%s)", this, element));
1086            if (subscription.isCompleted()) {
1087              if (callsCounter > 0) {
1088                subscription.value().request(Long.MAX_VALUE - 1);
1089                callsCounter--;
1090              } else {
1091                  subscription.value().cancel();
1092              }
1093            } else {
1094              env.flop(String.format("Subscriber::onNext(%s) called before Subscriber::onSubscribe", element));
1095            }
1096          }
1097        };
1098        env.subscribe(pub, sub, env.defaultTimeoutMillis());
1099
1100        // eventually triggers `onNext`, which will then trigger up to `callsCounter` times `request(Long.MAX_VALUE - 1)`
1101        // we're pretty sure to overflow from those
1102        sub.request(1);
1103
1104        // no onError should be signalled
1105        try {
1106          env.verifyNoAsyncErrors();
1107        } finally {
1108          sub.cancel();
1109        }
1110      }
1111    });
1112  }
1113
1114  ///////////////////// ADDITIONAL "COROLLARY" TESTS ////////////////////////
1115
1116  ///////////////////// TEST INFRASTRUCTURE /////////////////////////////////
1117
1118  public interface PublisherTestRun<T> {
1119    public void run(Publisher<T> pub) throws Throwable;
1120  }
1121
1122  /**
1123   * Test for feature that SHOULD/MUST be implemented, using a live publisher.
1124   *
1125   * @param elements the number of elements the Publisher under test  must be able to emit to run this test
1126   * @param completionSignalRequired true if an {@code onComplete} signal is required by this test to run.
1127   *                                 If the tested Publisher is unable to signal completion, tests requireing onComplete signals will be skipped.
1128   *                                 To signal if your Publisher is able to signal completion see {@link PublisherVerification#maxElementsFromPublisher()}.
1129   */
1130  public void activePublisherTest(long elements, boolean completionSignalRequired, PublisherTestRun<T> body) throws Throwable {
1131    if (elements > maxElementsFromPublisher()) {
1132      throw new SkipException(String.format("Unable to run this test, as required elements nr: %d is higher than supported by given producer: %d", elements, maxElementsFromPublisher()));
1133    } else if (completionSignalRequired && maxElementsFromPublisher() == Long.MAX_VALUE) {
1134      throw new SkipException("Unable to run this test, as it requires an onComplete signal, " +
1135                                "which this Publisher is unable to provide (as signalled by returning Long.MAX_VALUE from `maxElementsFromPublisher()`)");
1136    } else {
1137      Publisher<T> pub = createPublisher(elements);
1138      body.run(pub);
1139      env.verifyNoAsyncErrorsNoDelay();
1140    }
1141  }
1142
1143  /**
1144   * Test for feature that MAY be implemented. This test will be marked as SKIPPED if it fails.
1145   *
1146   * @param elements the number of elements the Publisher under test  must be able to emit to run this test
1147   * @param completionSignalRequired true if an {@code onComplete} signal is required by this test to run.
1148   *                                 If the tested Publisher is unable to signal completion, tests requireing onComplete signals will be skipped.
1149   *                                 To signal if your Publisher is able to signal completion see {@link PublisherVerification#maxElementsFromPublisher()}.
1150   */
1151  public void optionalActivePublisherTest(long elements, boolean completionSignalRequired, PublisherTestRun<T> body) throws Throwable {
1152    if (elements > maxElementsFromPublisher()) {
1153      throw new SkipException(String.format("Unable to run this test, as required elements nr: %d is higher than supported by given producer: %d", elements, maxElementsFromPublisher()));
1154    } else if (completionSignalRequired && maxElementsFromPublisher() == Long.MAX_VALUE) {
1155      throw new SkipException("Unable to run this test, as it requires an onComplete signal, " +
1156                                "which this Publisher is unable to provide (as signalled by returning Long.MAX_VALUE from `maxElementsFromPublisher()`)");
1157    } else {
1158
1159      final Publisher<T> pub = createPublisher(elements);
1160      final String skipMessage = "Skipped because tested publisher does NOT implement this OPTIONAL requirement.";
1161
1162      try {
1163        potentiallyPendingTest(pub, body);
1164      } catch (Exception ex) {
1165        notVerified(skipMessage);
1166      } catch (AssertionError ex) {
1167        notVerified(skipMessage + " Reason for skipping was: " + ex.getMessage());
1168      }
1169    }
1170  }
1171
1172  public static final String SKIPPING_NO_ERROR_PUBLISHER_AVAILABLE =
1173    "Skipping because no error state Publisher provided, and the test requires it. " +
1174          "Please implement PublisherVerification#createFailedPublisher to run this test.";
1175
1176  public static final String SKIPPING_OPTIONAL_TEST_FAILED =
1177    "Skipping, because provided Publisher does not pass this *additional* verification.";
1178  /**
1179   * Additional test for Publisher in error state
1180   */
1181  public void whenHasErrorPublisherTest(PublisherTestRun<T> body) throws Throwable {
1182    potentiallyPendingTest(createFailedPublisher(), body, SKIPPING_NO_ERROR_PUBLISHER_AVAILABLE);
1183  }
1184
1185  public void potentiallyPendingTest(Publisher<T> pub, PublisherTestRun<T> body) throws Throwable {
1186    potentiallyPendingTest(pub, body, SKIPPING_OPTIONAL_TEST_FAILED);
1187  }
1188
1189  public void potentiallyPendingTest(Publisher<T> pub, PublisherTestRun<T> body, String message) throws Throwable {
1190    if (pub != null) {
1191      body.run(pub);
1192    } else {
1193      throw new SkipException(message);
1194    }
1195  }
1196
1197  /**
1198   * Executes a given test body {@code n} times.
1199   * All the test runs must pass in order for the stochastic test to pass.
1200   */
1201  public void stochasticTest(int n, Function<Integer, Void> body) throws Throwable {
1202    if (skipStochasticTests()) {
1203      notVerified("Skipping @Stochastic test because `skipStochasticTests()` returned `true`!");
1204    }
1205
1206    for (int i = 0; i < n; i++) {
1207      body.apply(i);
1208    }
1209  }
1210
1211  public void notVerified() {
1212    throw new SkipException("Not verified by this TCK.");
1213  }
1214
1215  /**
1216   * Return this value from {@link PublisherVerification#maxElementsFromPublisher()} to mark that the given {@link org.reactivestreams.Publisher},
1217   * is not able to signal completion. For example it is strictly a time-bound or unbounded source of data.
1218   *
1219   * <b>Returning this value from {@link PublisherVerification#maxElementsFromPublisher()} will result in skipping all TCK tests which require onComplete signals!</b>
1220   */
1221  public long publisherUnableToSignalOnComplete() {
1222    return Long.MAX_VALUE;
1223  }
1224
1225  public void notVerified(String message) {
1226    throw new SkipException(message);
1227  }
1228
1229}