001package org.reactivestreams.tck;
002
003import org.reactivestreams.Publisher;
004import org.reactivestreams.Subscriber;
005import org.reactivestreams.Subscription;
006import org.reactivestreams.tck.TestEnvironment.BlackholeSubscriberWithSubscriptionSupport;
007import org.reactivestreams.tck.TestEnvironment.Latch;
008import org.reactivestreams.tck.TestEnvironment.ManualSubscriber;
009import org.reactivestreams.tck.TestEnvironment.ManualSubscriberWithSubscriptionSupport;
010import org.reactivestreams.tck.support.Function;
011import org.reactivestreams.tck.support.Optional;
012import org.reactivestreams.tck.support.PublisherVerificationRules;
013import org.testng.SkipException;
014import org.testng.annotations.BeforeMethod;
015import org.testng.annotations.Test;
016
017import java.lang.Override;
018import java.lang.ref.ReferenceQueue;
019import java.lang.ref.WeakReference;
020import java.util.ArrayList;
021import java.util.Arrays;
022import java.util.Collections;
023import java.util.List;
024import java.util.Random;
025import java.util.concurrent.atomic.AtomicReference;
026
027import static org.testng.Assert.assertEquals;
028import static org.testng.Assert.assertTrue;
029
030/**
031 * Provides tests for verifying {@code Publisher} specification rules.
032 *
033 * @see org.reactivestreams.Publisher
034 */
035public abstract class PublisherVerification<T> implements PublisherVerificationRules {
036
037  private static final String PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS_ENV = "PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS";
038  private static final long DEFAULT_PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS = 300L;
039
040  private final TestEnvironment env;
041
042  /**
043   * The amount of time after which a cancelled Subscriber reference should be dropped.
044   * See Rule 3.13 for details.
045   */
046  private final long publisherReferenceGCTimeoutMillis;
047
048  /**
049   * Constructs a new verification class using the given env and configuration.
050   *
051   * @param publisherReferenceGCTimeoutMillis used to determine after how much time a reference to a Subscriber should be already dropped by the Publisher.
052   */
053  public PublisherVerification(TestEnvironment env, long publisherReferenceGCTimeoutMillis) {
054    this.env = env;
055    this.publisherReferenceGCTimeoutMillis = publisherReferenceGCTimeoutMillis;
056  }
057
058  /**
059   * Constructs a new verification class using the given env and configuration.
060   *
061   * The value for {@code publisherReferenceGCTimeoutMillis} will be obtained by using {@link PublisherVerification#envPublisherReferenceGCTimeoutMillis()}.
062   */
063  public PublisherVerification(TestEnvironment env) {
064    this.env = env;
065    this.publisherReferenceGCTimeoutMillis = envPublisherReferenceGCTimeoutMillis();
066  }
067
068  /**
069   * Tries to parse the env variable {@code PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS} as long and returns the value if present,
070   * OR its default value ({@link PublisherVerification#DEFAULT_PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS}).
071   *
072   * This value is used to determine after how much time a reference to a Subscriber should be already dropped by the Publisher.
073   *
074   * @throws java.lang.IllegalArgumentException when unable to parse the env variable
075   */
076  public static long envPublisherReferenceGCTimeoutMillis() {
077    final String envMillis = System.getenv(PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS_ENV);
078    if (envMillis == null) return DEFAULT_PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS;
079    else try {
080      return Long.parseLong(envMillis);
081    } catch(NumberFormatException ex) {
082      throw new IllegalArgumentException(String.format("Unable to parse %s env value [%s] as long!", PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS_ENV, envMillis), ex);
083    }
084  }
085
086  /**
087   * This is the main method you must implement in your test incarnation.
088   * It must create a Publisher for a stream with exactly the given number of elements.
089   * If `elements` is `Long.MAX_VALUE` the produced stream must be infinite.
090   */
091  public abstract Publisher<T> createPublisher(long elements);
092
093  /**
094   * By implementing this method, additional TCK tests concerning a "failed" publishers will be run.
095   *
096   * The expected behaviour of the {@link Publisher} returned by this method is hand out a subscription,
097   * followed by signalling {@code onError} on it, as specified by Rule 1.9.
098   *
099   * If you ignore these additional tests, return {@code null} from this method.
100   */
101  public abstract Publisher<T> createFailedPublisher();
102
103
104  /**
105   * Override and return lower value if your Publisher is only able to produce a known number of elements.
106   * For example, if it is designed to return at-most-one element, return {@code 1} from this method.
107   *
108   * Defaults to {@code Long.MAX_VALUE - 1}, meaning that the Publisher can be produce a huge but NOT an unbounded number of elements.
109   *
110   * To mark your Publisher will *never* signal an {@code onComplete} override this method and return {@code Long.MAX_VALUE},
111   * which will result in *skipping all tests which require an onComplete to be triggered* (!).
112   */
113  public long maxElementsFromPublisher() {
114    return Long.MAX_VALUE - 1;
115  }
116
117  /**
118   * Override and return {@code true} in order to skip executing tests marked as {@code Stochastic}.
119   * Such tests MAY sometimes fail even though the impl
120   */
121  public boolean skipStochasticTests() {
122    return false;
123  }
124
125  /**
126   * In order to verify rule 3.3 of the reactive streams spec, this number will be used to check if a
127   * {@code Subscription} actually solves the "unbounded recursion" problem by not allowing the number of
128   * recursive calls to exceed the number returned by this method.
129   *
130   * @see <a href="https://github.com/reactive-streams/reactive-streams-jvm#3.3">reactive streams spec, rule 3.3</a>
131   * @see PublisherVerification#required_spec303_mustNotAllowUnboundedRecursion()
132   */
133  public long boundedDepthOfOnNextAndRequestRecursion() {
134    return 1;
135  }
136
137  ////////////////////// TEST ENV CLEANUP /////////////////////////////////////
138
139  @BeforeMethod
140  public void setUp() throws Exception {
141    env.clearAsyncErrors();
142  }
143
144  ////////////////////// TEST SETUP VERIFICATION //////////////////////////////
145
146  @Override @Test
147  public void required_createPublisher1MustProduceAStreamOfExactly1Element() throws Throwable {
148    activePublisherTest(1, true, new PublisherTestRun<T>() {
149      @Override
150      public void run(Publisher<T> pub) throws InterruptedException {
151        ManualSubscriber<T> sub = env.newManualSubscriber(pub);
152        assertTrue(requestNextElementOrEndOfStream(pub, sub).isDefined(), String.format("Publisher %s produced no elements", pub));
153        sub.requestEndOfStream();
154      }
155
156      Optional<T> requestNextElementOrEndOfStream(Publisher<T> pub, ManualSubscriber<T> sub) throws InterruptedException {
157        return sub.requestNextElementOrEndOfStream(String.format("Timeout while waiting for next element from Publisher %s", pub));
158      }
159
160    });
161  }
162
163  @Override @Test
164  public void required_createPublisher3MustProduceAStreamOfExactly3Elements() throws Throwable {
165    activePublisherTest(3, true, new PublisherTestRun<T>() {
166      @Override
167      public void run(Publisher<T> pub) throws InterruptedException {
168        ManualSubscriber<T> sub = env.newManualSubscriber(pub);
169        assertTrue(requestNextElementOrEndOfStream(pub, sub).isDefined(), String.format("Publisher %s produced no elements", pub));
170        assertTrue(requestNextElementOrEndOfStream(pub, sub).isDefined(), String.format("Publisher %s produced only 1 element", pub));
171        assertTrue(requestNextElementOrEndOfStream(pub, sub).isDefined(), String.format("Publisher %s produced only 2 elements", pub));
172        sub.requestEndOfStream();
173      }
174
175      Optional<T> requestNextElementOrEndOfStream(Publisher<T> pub, ManualSubscriber<T> sub) throws InterruptedException {
176        return sub.requestNextElementOrEndOfStream(String.format("Timeout while waiting for next element from Publisher %s", pub));
177      }
178
179    });
180  }
181
182  @Override @Test
183  public void required_validate_maxElementsFromPublisher() throws Exception {
184    assertTrue(maxElementsFromPublisher() >= 0, "maxElementsFromPublisher MUST return a number >= 0");
185  }
186
187  @Override @Test
188  public void required_validate_boundedDepthOfOnNextAndRequestRecursion() throws Exception {
189    assertTrue(boundedDepthOfOnNextAndRequestRecursion() >= 1, "boundedDepthOfOnNextAndRequestRecursion must return a number >= 1");
190  }
191
192
193  ////////////////////// SPEC RULE VERIFICATION ///////////////////////////////
194
195  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#1.1
196  @Override @Test
197  public void required_spec101_subscriptionRequestMustResultInTheCorrectNumberOfProducedElements() throws Throwable {
198    activePublisherTest(5, false, new PublisherTestRun<T>() {
199      @Override
200      public void run(Publisher<T> pub) throws InterruptedException {
201
202        ManualSubscriber<T> sub = env.newManualSubscriber(pub);
203
204        sub.expectNone(String.format("Publisher %s produced value before the first `request`: ", pub));
205        sub.request(1);
206        sub.nextElement(String.format("Publisher %s produced no element after first `request`", pub));
207        sub.expectNone(String.format("Publisher %s produced unrequested: ", pub));
208
209        sub.request(1);
210        sub.request(2);
211        sub.nextElements(3, env.defaultTimeoutMillis(), String.format("Publisher %s produced less than 3 elements after two respective `request` calls", pub));
212
213        sub.expectNone(String.format("Publisher %sproduced unrequested ", pub));
214      }
215    });
216  }
217
218  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#1.2
219  @Override @Test
220  public void required_spec102_maySignalLessThanRequestedAndTerminateSubscription() throws Throwable {
221    final int elements = 3;
222    final int requested = 10;
223
224    activePublisherTest(elements, true, new PublisherTestRun<T>() {
225      @Override
226      public void run(Publisher<T> pub) throws Throwable {
227        final ManualSubscriber<T> sub = env.newManualSubscriber(pub);
228        sub.request(requested);
229        sub.nextElements(elements);
230        sub.expectCompletion();
231      }
232    });
233  }
234
235  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#1.3
236  @Override @Test
237  public void stochastic_spec103_mustSignalOnMethodsSequentially() throws Throwable {
238    final int iterations = 100;
239    final int elements = 10;
240
241    stochasticTest(iterations, new Function<Integer, Void>() {
242      @Override
243      public Void apply(final Integer runNumber) throws Throwable {
244        activePublisherTest(elements, true, new PublisherTestRun<T>() {
245          @Override
246          public void run(Publisher<T> pub) throws Throwable {
247            final Latch completionLatch = new Latch(env);
248
249            pub.subscribe(new Subscriber<T>() {
250              private Subscription subs;
251              private long gotElements = 0;
252
253              private ConcurrentAccessBarrier concurrentAccessBarrier = new ConcurrentAccessBarrier();
254
255              /**
256               * Concept wise very similar to a {@link org.reactivestreams.tck.TestEnvironment.Latch}, serves to protect
257               * a critical section from concurrent access, with the added benefit of Thread tracking and same-thread-access awareness.
258               *
259               * Since a <i>Synchronous</i> Publisher may choose to synchronously (using the same {@link Thread}) call
260               * {@code onNext} directly from either {@code subscribe} or {@code request} a plain Latch is not enough
261               * to verify concurrent access safety - one needs to track if the caller is not still using the calling thread
262               * to enter subsequent critical sections ("nesting" them effectively).
263               */
264              final class ConcurrentAccessBarrier {
265                private AtomicReference<Thread> currentlySignallingThread = new AtomicReference<Thread>(null);
266                private volatile String previousSignal = null;
267
268                public void enterSignal(String signalName) {
269                  if((!currentlySignallingThread.compareAndSet(null, Thread.currentThread())) && !isSynchronousSignal()) {
270                    env.flop(String.format(
271                      "Illegal concurrent access detected (entering critical section)! " +
272                        "%s emited %s signal, before %s finished its %s signal.",
273                        Thread.currentThread(), signalName, currentlySignallingThread.get(), previousSignal));
274                  }
275                  this.previousSignal = signalName;
276                }
277
278                public void leaveSignal(String signalName) {
279                  currentlySignallingThread.set(null);
280                  this.previousSignal = signalName;
281                }
282
283                private boolean isSynchronousSignal() {
284                  return (previousSignal != null) && Thread.currentThread().equals(currentlySignallingThread.get());
285                }
286
287              }
288
289              @Override
290              public void onSubscribe(Subscription s) {
291                final String signal = "onSubscribe()";
292                concurrentAccessBarrier.enterSignal(signal);
293
294                subs = s;
295                subs.request(1);
296
297                concurrentAccessBarrier.leaveSignal(signal);
298              }
299
300              @Override
301              public void onNext(T ignore) {
302                final String signal = String.format("onNext(%s)", ignore);
303                concurrentAccessBarrier.enterSignal(signal);
304
305                gotElements += 1;
306                if (gotElements <= elements) // requesting one more than we know are in the stream (some Publishers need this)
307                  subs.request(1);
308
309                concurrentAccessBarrier.leaveSignal(signal);
310              }
311
312              @Override
313              public void onError(Throwable t) {
314                final String signal = String.format("onError(%s)", t.getMessage());
315                concurrentAccessBarrier.enterSignal(signal);
316
317                // ignore value
318
319                concurrentAccessBarrier.leaveSignal(signal);
320              }
321
322              @Override
323              public void onComplete() {
324                final String signal = "onComplete()";
325                concurrentAccessBarrier.enterSignal(signal);
326
327                // entering for completeness
328
329                concurrentAccessBarrier.leaveSignal(signal);
330                completionLatch.close();
331              }
332            });
333
334            completionLatch.expectClose(elements * env.defaultTimeoutMillis(), "Expected 10 elements to be drained");
335          }
336        });
337        return null;
338      }
339    });
340  }
341
342  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#1.4
343  @Override @Test
344  public void optional_spec104_mustSignalOnErrorWhenFails() throws Throwable {
345    try {
346      whenHasErrorPublisherTest(new PublisherTestRun<T>() {
347        @Override
348        public void run(final Publisher<T> pub) throws InterruptedException {
349          final Latch onErrorlatch = new Latch(env);
350          final Latch onSubscribeLatch = new Latch(env);
351          pub.subscribe(new TestEnvironment.TestSubscriber<T>(env) {
352            @Override
353            public void onSubscribe(Subscription subs) {
354              onSubscribeLatch.assertOpen("Only one onSubscribe call expected");
355              onSubscribeLatch.close();
356            }
357            @Override
358            public void onError(Throwable cause) {
359              onSubscribeLatch.assertClosed("onSubscribe should be called prior to onError always");
360              onErrorlatch.assertOpen(String.format("Error-state Publisher %s called `onError` twice on new Subscriber", pub));
361              onErrorlatch.close();
362            }
363          });
364
365          onSubscribeLatch.expectClose("Should have received onSubscribe");
366          onErrorlatch.expectClose(String.format("Error-state Publisher %s did not call `onError` on new Subscriber", pub));
367
368          env.verifyNoAsyncErrors(env.defaultTimeoutMillis());
369          }
370      });
371    } catch (SkipException se) {
372      throw se;
373    } catch (Throwable ex) {
374      // we also want to catch AssertionErrors and anything the publisher may have thrown inside subscribe
375      // which was wrong of him - he should have signalled on error using onError
376      throw new RuntimeException(String.format("Publisher threw exception (%s) instead of signalling error via onError!", ex.getMessage()), ex);
377    }
378  }
379
380  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#1.5
381  @Override @Test
382  public void required_spec105_mustSignalOnCompleteWhenFiniteStreamTerminates() throws Throwable {
383    activePublisherTest(3, true, new PublisherTestRun<T>() {
384      @Override
385      public void run(Publisher<T> pub) throws Throwable {
386        ManualSubscriber<T> sub = env.newManualSubscriber(pub);
387        sub.requestNextElement();
388        sub.requestNextElement();
389        sub.requestNextElement();
390        sub.requestEndOfStream();
391        sub.expectNone();
392      }
393    });
394  }
395
396  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#1.5
397  @Override @Test
398  public void optional_spec105_emptyStreamMustTerminateBySignallingOnComplete() throws Throwable {
399    optionalActivePublisherTest(0, true, new PublisherTestRun<T>() {
400      @Override
401      public void run(Publisher<T> pub) throws Throwable {
402        ManualSubscriber<T> sub = env.newManualSubscriber(pub);
403        sub.request(1);
404        sub.expectCompletion();
405        sub.expectNone();
406      }
407    });
408  }
409
410  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#1.6
411  @Override @Test
412  public void untested_spec106_mustConsiderSubscriptionCancelledAfterOnErrorOrOnCompleteHasBeenCalled() throws Throwable {
413    notVerified(); // not really testable without more control over the Publisher
414  }
415
416  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#1.7
417  @Override @Test
418  public void required_spec107_mustNotEmitFurtherSignalsOnceOnCompleteHasBeenSignalled() throws Throwable {
419    activePublisherTest(1, true, new PublisherTestRun<T>() {
420      @Override
421      public void run(Publisher<T> pub) throws Throwable {
422        ManualSubscriber<T> sub = env.newManualSubscriber(pub);
423        sub.request(10);
424        sub.nextElement();
425        sub.expectCompletion();
426
427        sub.request(10);
428        sub.expectNone();
429      }
430    });
431  }
432
433  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#1.7
434  @Override @Test
435  public void untested_spec107_mustNotEmitFurtherSignalsOnceOnErrorHasBeenSignalled() throws Throwable {
436    notVerified(); // can we meaningfully test this, without more control over the publisher?
437  }
438
439  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#1.8
440  @Override @Test
441  public void untested_spec108_possiblyCanceledSubscriptionShouldNotReceiveOnErrorOrOnCompleteSignals() throws Throwable {
442    notVerified(); // can we meaningfully test this?
443  }
444
445  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#1.9
446  @Override @Test
447  public void untested_spec109_subscribeShouldNotThrowNonFatalThrowable() throws Throwable {
448    notVerified(); // can we meaningfully test this?
449  }
450
451  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#1.9
452  @Override @Test
453  public void required_spec109_subscribeThrowNPEOnNullSubscriber() throws Throwable {
454    activePublisherTest(0, false, new PublisherTestRun<T>() {
455      @Override
456      public void run(Publisher<T> pub) throws Throwable {
457        try {
458            pub.subscribe(null);
459            env.flop("Publisher did not throw a NullPointerException when given a null Subscribe in subscribe");
460        } catch (NullPointerException ignored) {
461          // valid behaviour
462        }
463        env.verifyNoAsyncErrorsNoDelay();
464      }
465    });
466  }
467
468  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#1.9
469  @Override @Test
470  public void required_spec109_mustIssueOnSubscribeForNonNullSubscriber() throws Throwable {
471    activePublisherTest(0, false, new PublisherTestRun<T>() {
472      @Override
473      public void run(Publisher<T> pub) throws Throwable {
474        final Latch onSubscribeLatch = new Latch(env);
475        pub.subscribe(new Subscriber<T>() {
476          @Override
477          public void onError(Throwable cause) {
478            onSubscribeLatch.assertClosed("onSubscribe should be called prior to onError always");
479          }
480
481          @Override
482          public void onSubscribe(Subscription subs) {
483            onSubscribeLatch.assertOpen("Only one onSubscribe call expected");
484            onSubscribeLatch.close();
485          }
486
487          @Override
488          public void onNext(T elem) {
489            onSubscribeLatch.assertClosed("onSubscribe should be called prior to onNext always");
490          }
491
492          @Override
493          public void onComplete() {
494            onSubscribeLatch.assertClosed("onSubscribe should be called prior to onComplete always");
495          }
496        });
497        onSubscribeLatch.expectClose("Should have received onSubscribe");
498        env.verifyNoAsyncErrorsNoDelay();
499      }
500    });
501  }
502
503  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#1.9
504  @Override @Test
505  public void required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorAfterOnSubscribe() throws Throwable {
506    whenHasErrorPublisherTest(new PublisherTestRun<T>() {
507      @Override
508      public void run(Publisher<T> pub) throws Throwable {
509        final Latch onErrorLatch = new Latch(env);
510        final Latch onSubscribeLatch = new Latch(env);
511        ManualSubscriberWithSubscriptionSupport<T> sub = new ManualSubscriberWithSubscriptionSupport<T>(env) {
512          @Override
513          public void onError(Throwable cause) {
514            onSubscribeLatch.assertClosed("onSubscribe should be called prior to onError always");
515            onErrorLatch.assertOpen("Only one onError call expected");
516            onErrorLatch.close();
517          }
518
519          @Override
520          public void onSubscribe(Subscription subs) {
521            onSubscribeLatch.assertOpen("Only one onSubscribe call expected");
522            onSubscribeLatch.close();
523          }
524        };
525        pub.subscribe(sub);
526        onSubscribeLatch.expectClose("Should have received onSubscribe");
527        onErrorLatch.expectClose("Should have received onError");
528
529        env.verifyNoAsyncErrorsNoDelay();
530      }
531    });
532  }
533
534    // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#1.10
535  @Override @Test
536  public void untested_spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice() throws Throwable {
537    notVerified(); // can we meaningfully test this?
538  }
539
540  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#1.11
541  @Override @Test
542  public void optional_spec111_maySupportMultiSubscribe() throws Throwable {
543    optionalActivePublisherTest(1, false, new PublisherTestRun<T>() {
544      @Override
545      public void run(Publisher<T> pub) throws Throwable {
546        ManualSubscriber<T> sub1 = env.newManualSubscriber(pub);
547        ManualSubscriber<T> sub2 = env.newManualSubscriber(pub);
548
549        env.verifyNoAsyncErrors();
550      }
551    });
552  }
553
554  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#1.11
555  @Override @Test
556  public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne() throws Throwable {
557    optionalActivePublisherTest(5, true, new PublisherTestRun<T>() { // This test is skipped if the publisher is unbounded (never sends onComplete)
558      @Override
559      public void run(Publisher<T> pub) throws InterruptedException {
560        ManualSubscriber<T> sub1 = env.newManualSubscriber(pub);
561        ManualSubscriber<T> sub2 = env.newManualSubscriber(pub);
562        ManualSubscriber<T> sub3 = env.newManualSubscriber(pub);
563
564        sub1.request(1);
565        T x1 = sub1.nextElement(String.format("Publisher %s did not produce the requested 1 element on 1st subscriber", pub));
566        sub2.request(2);
567        List<T> y1 = sub2.nextElements(2, String.format("Publisher %s did not produce the requested 2 elements on 2nd subscriber", pub));
568        sub1.request(1);
569        T x2 = sub1.nextElement(String.format("Publisher %s did not produce the requested 1 element on 1st subscriber", pub));
570        sub3.request(3);
571        List<T> z1 = sub3.nextElements(3, String.format("Publisher %s did not produce the requested 3 elements on 3rd subscriber", pub));
572        sub3.request(1);
573        T z2 = sub3.nextElement(String.format("Publisher %s did not produce the requested 1 element on 3rd subscriber", pub));
574        sub3.request(1);
575        T z3 = sub3.nextElement(String.format("Publisher %s did not produce the requested 1 element on 3rd subscriber", pub));
576        sub3.requestEndOfStream(String.format("Publisher %s did not complete the stream as expected on 3rd subscriber", pub));
577        sub2.request(3);
578        List<T> y2 = sub2.nextElements(3, String.format("Publisher %s did not produce the requested 3 elements on 2nd subscriber", pub));
579        sub2.requestEndOfStream(String.format("Publisher %s did not complete the stream as expected on 2nd subscriber", pub));
580        sub1.request(2);
581        List<T> x3 = sub1.nextElements(2, String.format("Publisher %s did not produce the requested 2 elements on 1st subscriber", pub));
582        sub1.request(1);
583        T x4 = sub1.nextElement(String.format("Publisher %s did not produce the requested 1 element on 1st subscriber", pub));
584        sub1.requestEndOfStream(String.format("Publisher %s did not complete the stream as expected on 1st subscriber", pub));
585
586        @SuppressWarnings("unchecked")
587        List<T> r = new ArrayList<T>(Arrays.asList(x1, x2));
588        r.addAll(x3);
589        r.addAll(Collections.singleton(x4));
590
591        List<T> check1 = new ArrayList<T>(y1);
592        check1.addAll(y2);
593
594        //noinspection unchecked
595        List<T> check2 = new ArrayList<T>(z1);
596        check2.add(z2);
597        check2.add(z3);
598
599        assertEquals(r, check1, String.format("Publisher %s did not produce the same element sequence for subscribers 1 and 2", pub));
600        assertEquals(r, check2, String.format("Publisher %s did not produce the same element sequence for subscribers 1 and 3", pub));
601      }
602    });
603  }
604
605  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#1.11
606  @Override @Test
607  public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront() throws Throwable {
608    optionalActivePublisherTest(3, false, new PublisherTestRun<T>() { // This test is skipped if the publisher cannot produce enough elements
609      @Override
610      public void run(Publisher<T> pub) throws Throwable {
611        ManualSubscriber<T> sub1 = env.newManualSubscriber(pub);
612        ManualSubscriber<T> sub2 = env.newManualSubscriber(pub);
613        ManualSubscriber<T> sub3 = env.newManualSubscriber(pub);
614
615        List<T> received1 = new ArrayList<T>();
616        List<T> received2 = new ArrayList<T>();
617        List<T> received3 = new ArrayList<T>();
618
619        // if the publisher must touch it's source to notice it's been drained, the OnComplete won't come until we ask for more than it actually contains...
620        // edgy edge case?
621        sub1.request(4);
622        sub2.request(4);
623        sub3.request(4);
624
625        received1.addAll(sub1.nextElements(3));
626        received2.addAll(sub2.nextElements(3));
627        received3.addAll(sub3.nextElements(3));
628
629        // NOTE: can't check completion, the Publisher may not be able to signal it
630        //       a similar test *with* completion checking is implemented
631
632        assertEquals(received1, received2, String.format("Expected elements to be signaled in the same sequence to 1st and 2nd subscribers"));
633        assertEquals(received2, received3, String.format("Expected elements to be signaled in the same sequence to 2nd and 3rd subscribers"));
634      }
635    });
636  }
637
638  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#1.11
639  @Override @Test
640  public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected() throws Throwable {
641    optionalActivePublisherTest(3, true, new PublisherTestRun<T>() { // This test is skipped if the publisher is unbounded (never sends onComplete)
642      @Override
643      public void run(Publisher<T> pub) throws Throwable {
644        ManualSubscriber<T> sub1 = env.newManualSubscriber(pub);
645        ManualSubscriber<T> sub2 = env.newManualSubscriber(pub);
646        ManualSubscriber<T> sub3 = env.newManualSubscriber(pub);
647
648        List<T> received1 = new ArrayList<T>();
649        List<T> received2 = new ArrayList<T>();
650        List<T> received3 = new ArrayList<T>();
651
652        // if the publisher must touch it's source to notice it's been drained, the OnComplete won't come until we ask for more than it actually contains...
653        // edgy edge case?
654        sub1.request(4);
655        sub2.request(4);
656        sub3.request(4);
657
658        received1.addAll(sub1.nextElements(3));
659        received2.addAll(sub2.nextElements(3));
660        received3.addAll(sub3.nextElements(3));
661
662        sub1.expectCompletion();
663        sub2.expectCompletion();
664        sub3.expectCompletion();
665
666        assertEquals(received1, received2, String.format("Expected elements to be signaled in the same sequence to 1st and 2nd subscribers"));
667        assertEquals(received2, received3, String.format("Expected elements to be signaled in the same sequence to 2nd and 3rd subscribers"));
668      }
669    });
670  }
671
672  ///////////////////// SUBSCRIPTION TESTS //////////////////////////////////
673
674  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.2
675  @Override @Test
676  public void required_spec302_mustAllowSynchronousRequestCallsFromOnNextAndOnSubscribe() throws Throwable {
677    activePublisherTest(6, false, new PublisherTestRun<T>() {
678      @Override
679      public void run(Publisher<T> pub) throws Throwable {
680        ManualSubscriber<T> sub = new ManualSubscriber<T>(env) {
681          @Override
682          public void onSubscribe(Subscription subs) {
683            this.subscription.completeImmediatly(subs);
684
685            subs.request(1);
686            subs.request(1);
687            subs.request(1);
688          }
689
690          @Override
691          public void onNext(T element) {
692            Subscription subs = this.subscription.value();
693            subs.request(1);
694          }
695        };
696
697        env.subscribe(pub, sub);
698
699        long delay = env.defaultTimeoutMillis();
700        env.verifyNoAsyncErrors(delay);
701      }
702    });
703  }
704
705  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.3
706  @Override @Test
707  public void required_spec303_mustNotAllowUnboundedRecursion() throws Throwable {
708    final long oneMoreThanBoundedLimit = boundedDepthOfOnNextAndRequestRecursion() + 1;
709
710    activePublisherTest(oneMoreThanBoundedLimit, false, new PublisherTestRun<T>() {
711      @Override
712      public void run(Publisher<T> pub) throws Throwable {
713        final ThreadLocal<Long> stackDepthCounter = new ThreadLocal<Long>() {
714          @Override
715          protected Long initialValue() {
716            return 0L;
717          }
718        };
719
720        final Latch runCompleted = new Latch(env);
721
722        final ManualSubscriber<T> sub = new ManualSubscriberWithSubscriptionSupport<T>(env) {
723          // counts the number of signals received, used to break out from possibly infinite request/onNext loops
724          long signalsReceived = 0L;
725
726          @Override
727          public void onNext(T element) {
728            // NOT calling super.onNext as this test only cares about stack depths, not the actual values of elements
729            // which also simplifies this test as we do not have to drain the test buffer, which would otherwise be in danger of overflowing
730
731            signalsReceived += 1;
732            stackDepthCounter.set(stackDepthCounter.get() + 1);
733            env.debug(String.format("%s(recursion depth: %d)::onNext(%s)", this, stackDepthCounter.get(), element));
734
735            final long callsUntilNow = stackDepthCounter.get();
736            if (callsUntilNow > boundedDepthOfOnNextAndRequestRecursion()) {
737              env.flop(String.format("Got %d onNext calls within thread: %s, yet expected recursive bound was %d",
738                                     callsUntilNow, Thread.currentThread(), boundedDepthOfOnNextAndRequestRecursion()));
739
740              // stop the recursive call chain
741              runCompleted.close();
742              return;
743            } else if (signalsReceived >= oneMoreThanBoundedLimit) {
744              // since max number of signals reached, and recursion depth not exceeded, we judge this as a success and
745              // stop the recursive call chain
746              runCompleted.close();
747              return;
748            }
749
750            // request more right away, the Publisher must break the recursion
751            subscription.value().request(1);
752
753            stackDepthCounter.set(stackDepthCounter.get() - 1);
754          }
755
756          @Override
757          public void onComplete() {
758            super.onComplete();
759            runCompleted.close();
760          }
761
762          @Override
763          public void onError(Throwable cause) {
764            super.onError(cause);
765            runCompleted.close();
766          }
767        };
768
769        try {
770          env.subscribe(pub, sub);
771
772          sub.request(1); // kick-off the `request -> onNext -> request -> onNext -> ...`
773
774          final String msg = String.format("Unable to validate call stack depth safety, " +
775                                               "awaited at-most %s signals (`maxOnNextSignalsInRecursionTest()`) or completion",
776                                           oneMoreThanBoundedLimit);
777          runCompleted.expectClose(env.defaultTimeoutMillis(), msg);
778          env.verifyNoAsyncErrorsNoDelay();
779        } finally {
780          // since the request/onNext recursive calls may keep the publisher running "forever",
781          // we MUST cancel it manually before exiting this test case
782          sub.cancel();
783        }
784      }
785    });
786  }
787
788  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.4
789  @Override @Test
790  public void untested_spec304_requestShouldNotPerformHeavyComputations() throws Exception {
791    notVerified(); // cannot be meaningfully tested, or can it?
792  }
793
794  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.5
795  @Override @Test
796  public void untested_spec305_cancelMustNotSynchronouslyPerformHeavyCompuatation() throws Exception {
797    notVerified(); // cannot be meaningfully tested, or can it?
798  }
799
800  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.6
801  @Override @Test
802  public void required_spec306_afterSubscriptionIsCancelledRequestMustBeNops() throws Throwable {
803    activePublisherTest(3, false, new PublisherTestRun<T>() {
804      @Override
805      public void run(Publisher<T> pub) throws Throwable {
806
807        // override ManualSubscriberWithSubscriptionSupport#cancel because by default a ManualSubscriber will drop the
808        // subscription once it's cancelled (as expected).
809        // In this test however it must keep the cancelled Subscription and keep issuing `request(long)` to it.
810        ManualSubscriber<T> sub = new ManualSubscriberWithSubscriptionSupport<T>(env) {
811          @Override
812          public void cancel() {
813            if (subscription.isCompleted()) {
814              subscription.value().cancel();
815            } else {
816              env.flop("Cannot cancel a subscription before having received it");
817            }
818          }
819        };
820
821        env.subscribe(pub, sub);
822
823        sub.cancel();
824        sub.request(1);
825        sub.request(1);
826        sub.request(1);
827
828        sub.expectNone();
829        env.verifyNoAsyncErrorsNoDelay();
830      }
831    });
832  }
833
834  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.7
835  @Override @Test
836  public void required_spec307_afterSubscriptionIsCancelledAdditionalCancelationsMustBeNops() throws Throwable {
837    activePublisherTest(1, false, new PublisherTestRun<T>() {
838      @Override
839      public void run(Publisher<T> pub) throws Throwable {
840        final ManualSubscriber<T> sub = env.newManualSubscriber(pub);
841
842        // leak the Subscription
843        final Subscription subs = sub.subscription.value();
844
845        subs.cancel();
846        subs.cancel();
847        subs.cancel();
848
849        sub.expectNone();
850        env.verifyNoAsyncErrorsNoDelay();
851      }
852    });
853  }
854
855  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.9
856  @Override @Test
857  public void required_spec309_requestZeroMustSignalIllegalArgumentException() throws Throwable {
858    activePublisherTest(10, false, new PublisherTestRun<T>() {
859      @Override public void run(Publisher<T> pub) throws Throwable {
860        final ManualSubscriber<T> sub = env.newManualSubscriber(pub);
861        sub.request(0);
862        sub.expectErrorWithMessage(IllegalArgumentException.class, "3.9"); // we do require implementations to mention the rule number at the very least
863      }
864    });
865  }
866
867  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.9
868  @Override @Test
869  public void required_spec309_requestNegativeNumberMustSignalIllegalArgumentException() throws Throwable {
870    activePublisherTest(10, false, new PublisherTestRun<T>() {
871      @Override
872      public void run(Publisher<T> pub) throws Throwable {
873        final ManualSubscriber<T> sub = env.newManualSubscriber(pub);
874        final Random r = new Random();
875        sub.request(-r.nextInt(Integer.MAX_VALUE));
876        sub.expectErrorWithMessage(IllegalArgumentException.class, "3.9"); // we do require implementations to mention the rule number at the very least
877      }
878    });
879  }
880
881  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.12
882  @Override @Test
883  public void required_spec312_cancelMustMakeThePublisherToEventuallyStopSignaling() throws Throwable {
884    // the publisher is able to signal more elements than the subscriber will be requesting in total
885    final int publisherElements = 20;
886
887    final int demand1 = 10;
888    final int demand2 = 5;
889    final int totalDemand = demand1 + demand2;
890
891    activePublisherTest(publisherElements, false, new PublisherTestRun<T>() {
892      @Override @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
893      public void run(Publisher<T> pub) throws Throwable {
894        final ManualSubscriber<T> sub = env.newManualSubscriber(pub);
895
896        sub.request(demand1);
897        sub.request(demand2);
898
899        /*
900          NOTE: The order of the nextElement/cancel calls below is very important (!)
901
902          If this ordering was reversed, given an asynchronous publisher,
903          the following scenario would be *legal* and would break this test:
904
905          > AsyncPublisher receives request(10) - it does not emit data right away, it's asynchronous
906          > AsyncPublisher receives request(5) - demand is now 15
907          ! AsyncPublisher didn't emit any onNext yet (!)
908          > AsyncPublisher receives cancel() - handles it right away, by "stopping itself" for example
909          ! cancel was handled hefore the AsyncPublisher ever got the chance to emit data
910          ! the subscriber ends up never receiving even one element - the test is stuck (and fails, even on valid Publisher)
911
912          Which is why we must first expect an element, and then cancel, once the producing is "running".
913         */
914        sub.nextElement();
915        sub.cancel();
916
917        int onNextsSignalled = 1;
918
919        boolean stillBeingSignalled;
920        do {
921          // put asyncError if onNext signal received
922          sub.expectNone();
923          Throwable error = env.dropAsyncError();
924
925          if (error == null) {
926            stillBeingSignalled = false;
927          } else {
928            onNextsSignalled += 1;
929            stillBeingSignalled = true;
930          }
931
932          // if the Publisher tries to emit more elements than was requested (and/or ignores cancellation) this will throw
933          assertTrue(onNextsSignalled <= totalDemand,
934                     String.format("Publisher signalled [%d] elements, which is more than the signalled demand: %d",
935                                   onNextsSignalled, totalDemand));
936
937        } while (stillBeingSignalled);
938      }
939    });
940
941    env.verifyNoAsyncErrorsNoDelay();
942  }
943
944  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.13
945  @Override @Test
946  public void required_spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber() throws Throwable {
947    final ReferenceQueue<ManualSubscriber<T>> queue = new ReferenceQueue<ManualSubscriber<T>>();
948
949    final Function<Publisher<T>, WeakReference<ManualSubscriber<T>>> run = new Function<Publisher<T>, WeakReference<ManualSubscriber<T>>>() {
950      @Override
951      public WeakReference<ManualSubscriber<T>> apply(Publisher<T> pub) throws Exception {
952        final ManualSubscriber<T> sub = env.newManualSubscriber(pub);
953        final WeakReference<ManualSubscriber<T>> ref = new WeakReference<ManualSubscriber<T>>(sub, queue);
954
955        sub.request(1);
956        sub.nextElement();
957        sub.cancel();
958
959        return ref;
960      }
961    };
962
963    activePublisherTest(3, false, new PublisherTestRun<T>() {
964      @Override
965      public void run(Publisher<T> pub) throws Throwable {
966        final WeakReference<ManualSubscriber<T>> ref = run.apply(pub);
967
968        // cancel may be run asynchronously so we add a sleep before running the GC
969        // to "resolve" the race
970        Thread.sleep(publisherReferenceGCTimeoutMillis);
971        System.gc();
972
973        if (!ref.equals(queue.remove(100))) {
974          env.flop(String.format("Publisher %s did not drop reference to test subscriber after subscription cancellation", pub));
975        }
976
977        env.verifyNoAsyncErrorsNoDelay();
978      }
979    });
980  }
981
982  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.17
983  @Override @Test
984  public void required_spec317_mustSupportAPendingElementCountUpToLongMaxValue() throws Throwable {
985    final int totalElements = 3;
986
987    activePublisherTest(totalElements, true, new PublisherTestRun<T>() {
988      @Override
989      public void run(Publisher<T> pub) throws Throwable {
990        ManualSubscriber<T> sub = env.newManualSubscriber(pub);
991        sub.request(Long.MAX_VALUE);
992
993        sub.nextElements(totalElements);
994        sub.expectCompletion();
995
996        env.verifyNoAsyncErrorsNoDelay();
997      }
998    });
999  }
1000
1001  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.17
1002  @Override @Test
1003  public void required_spec317_mustSupportACumulativePendingElementCountUpToLongMaxValue() throws Throwable {
1004    final int totalElements = 3;
1005
1006    activePublisherTest(totalElements, true, new PublisherTestRun<T>() {
1007      @Override
1008      public void run(Publisher<T> pub) throws Throwable {
1009        final ManualSubscriber<T> sub = env.newManualSubscriber(pub);
1010        sub.request(Long.MAX_VALUE / 2); // pending = Long.MAX_VALUE / 2
1011        sub.request(Long.MAX_VALUE / 2); // pending = Long.MAX_VALUE - 1
1012        sub.request(1); // pending = Long.MAX_VALUE
1013
1014        sub.nextElements(totalElements);
1015        sub.expectCompletion();
1016
1017        try {
1018          env.verifyNoAsyncErrorsNoDelay();
1019        } finally {
1020          sub.cancel();
1021        }
1022        
1023      }
1024    });
1025  }
1026
1027  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.17
1028  @Override @Test
1029  public void required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue() throws Throwable {
1030    activePublisherTest(Integer.MAX_VALUE, false, new PublisherTestRun<T>() {
1031      @Override public void run(Publisher<T> pub) throws Throwable {
1032        final ManualSubscriberWithSubscriptionSupport<T> sub = new BlackholeSubscriberWithSubscriptionSupport<T>(env) {
1033           // arbitrarily set limit on nuber of request calls signalled, we expect overflow after already 2 calls,
1034           // so 10 is relatively high and safe even if arbitrarily chosen
1035          int callsCounter = 10;
1036
1037          @Override
1038          public void onNext(T element) {
1039            env.debug(String.format("%s::onNext(%s)", this, element));
1040            if (subscription.isCompleted()) {
1041              if (callsCounter > 0) {
1042                subscription.value().request(Long.MAX_VALUE - 1);
1043                callsCounter--;
1044              } else {
1045                  subscription.value().cancel();
1046              }
1047            } else {
1048              env.flop(String.format("Subscriber::onNext(%s) called before Subscriber::onSubscribe", element));
1049            }
1050          }
1051        };
1052        env.subscribe(pub, sub, env.defaultTimeoutMillis());
1053
1054        // eventually triggers `onNext`, which will then trigger up to `callsCounter` times `request(Long.MAX_VALUE - 1)`
1055        // we're pretty sure to overflow from those
1056        sub.request(1);
1057
1058        // no onError should be signalled
1059        try {
1060          env.verifyNoAsyncErrors(env.defaultTimeoutMillis());
1061        } finally {
1062          sub.cancel();
1063        }
1064      }
1065    });
1066  }
1067
1068  ///////////////////// ADDITIONAL "COROLLARY" TESTS ////////////////////////
1069
1070  ///////////////////// TEST INFRASTRUCTURE /////////////////////////////////
1071
1072  public interface PublisherTestRun<T> {
1073    public void run(Publisher<T> pub) throws Throwable;
1074  }
1075
1076  /**
1077   * Test for feature that SHOULD/MUST be implemented, using a live publisher.
1078   *
1079   * @param elements the number of elements the Publisher under test  must be able to emit to run this test
1080   * @param completionSignalRequired true if an {@code onComplete} signal is required by this test to run.
1081   *                                 If the tested Publisher is unable to signal completion, tests requireing onComplete signals will be skipped.
1082   *                                 To signal if your Publisher is able to signal completion see {@link PublisherVerification#maxElementsFromPublisher()}.
1083   */
1084  public void activePublisherTest(long elements, boolean completionSignalRequired, PublisherTestRun<T> body) throws Throwable {
1085    if (elements > maxElementsFromPublisher()) {
1086      throw new SkipException(String.format("Unable to run this test, as required elements nr: %d is higher than supported by given producer: %d", elements, maxElementsFromPublisher()));
1087    } else if (completionSignalRequired && maxElementsFromPublisher() == Long.MAX_VALUE) {
1088      throw new SkipException("Unable to run this test, as it requires an onComplete signal, " +
1089                                "which this Publisher is unable to provide (as signalled by returning Long.MAX_VALUE from `maxElementsFromPublisher()`)");
1090    } else {
1091      Publisher<T> pub = createPublisher(elements);
1092      body.run(pub);
1093      env.verifyNoAsyncErrorsNoDelay();
1094    }
1095  }
1096
1097  /**
1098   * Test for feature that MAY be implemented. This test will be marked as SKIPPED if it fails.
1099   *
1100   * @param elements the number of elements the Publisher under test  must be able to emit to run this test
1101   * @param completionSignalRequired true if an {@code onComplete} signal is required by this test to run.
1102   *                                 If the tested Publisher is unable to signal completion, tests requireing onComplete signals will be skipped.
1103   *                                 To signal if your Publisher is able to signal completion see {@link PublisherVerification#maxElementsFromPublisher()}.
1104   */
1105  public void optionalActivePublisherTest(long elements, boolean completionSignalRequired, PublisherTestRun<T> body) throws Throwable {
1106    if (elements > maxElementsFromPublisher()) {
1107      throw new SkipException(String.format("Unable to run this test, as required elements nr: %d is higher than supported by given producer: %d", elements, maxElementsFromPublisher()));
1108    } else if (completionSignalRequired && maxElementsFromPublisher() == Long.MAX_VALUE) {
1109      throw new SkipException("Unable to run this test, as it requires an onComplete signal, " +
1110                                "which this Publisher is unable to provide (as signalled by returning Long.MAX_VALUE from `maxElementsFromPublisher()`)");
1111    } else {
1112
1113      final Publisher<T> pub = createPublisher(elements);
1114      final String skipMessage = "Skipped because tested publisher does NOT implement this OPTIONAL requirement.";
1115
1116      try {
1117        potentiallyPendingTest(pub, body);
1118      } catch (Exception ex) {
1119        notVerified(skipMessage);
1120      } catch (AssertionError ex) {
1121        notVerified(skipMessage + " Reason for skipping was: " + ex.getMessage());
1122      }
1123    }
1124  }
1125
1126  public static final String SKIPPING_NO_ERROR_PUBLISHER_AVAILABLE =
1127    "Skipping because no error state Publisher provided, and the test requires it. " +
1128          "Please implement PublisherVerification#createFailedPublisher to run this test.";
1129
1130  public static final String SKIPPING_OPTIONAL_TEST_FAILED =
1131    "Skipping, because provided Publisher does not pass this *additional* verification.";
1132  /**
1133   * Additional test for Publisher in error state
1134   */
1135  public void whenHasErrorPublisherTest(PublisherTestRun<T> body) throws Throwable {
1136    potentiallyPendingTest(createFailedPublisher(), body, SKIPPING_NO_ERROR_PUBLISHER_AVAILABLE);
1137  }
1138
1139  public void potentiallyPendingTest(Publisher<T> pub, PublisherTestRun<T> body) throws Throwable {
1140    potentiallyPendingTest(pub, body, SKIPPING_OPTIONAL_TEST_FAILED);
1141  }
1142
1143  public void potentiallyPendingTest(Publisher<T> pub, PublisherTestRun<T> body, String message) throws Throwable {
1144    if (pub != null) {
1145      body.run(pub);
1146    } else {
1147      throw new SkipException(message);
1148    }
1149  }
1150
1151  /**
1152   * Executes a given test body {@code n} times.
1153   * All the test runs must pass in order for the stochastic test to pass.
1154   */
1155  public void stochasticTest(int n, Function<Integer, Void> body) throws Throwable {
1156    if (skipStochasticTests()) {
1157      notVerified("Skipping @Stochastic test because `skipStochasticTests()` returned `true`!");
1158    }
1159
1160    for (int i = 0; i < n; i++) {
1161      body.apply(i);
1162    }
1163  }
1164
1165  public void notVerified() {
1166    throw new SkipException("Not verified by this TCK.");
1167  }
1168
1169  /**
1170   * Return this value from {@link PublisherVerification#maxElementsFromPublisher()} to mark that the given {@link org.reactivestreams.Publisher},
1171   * is not able to signal completion. For example it is strictly a time-bound or unbounded source of data.
1172   *
1173   * <b>Returning this value from {@link PublisherVerification#maxElementsFromPublisher()} will result in skipping all TCK tests which require onComplete signals!</b>
1174   */
1175  public long publisherUnableToSignalOnComplete() {
1176    return Long.MAX_VALUE;
1177  }
1178
1179  public void notVerified(String message) {
1180    throw new SkipException(message);
1181  }
1182
1183}