001/************************************************************************
002 * Licensed under Public Domain (CC0)                                    *
003 *                                                                       *
004 * To the extent possible under law, the person who associated CC0 with  *
005 * this code has waived all copyright and related or neighboring         *
006 * rights to this code.                                                  *
007 *                                                                       *
008 * You should have received a copy of the CC0 legalcode along with this  *
009 * work. If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.*
010 ************************************************************************/
011
012package org.reactivestreams.tck;
013
014import org.reactivestreams.Processor;
015import org.reactivestreams.Publisher;
016import org.reactivestreams.Subscriber;
017import org.reactivestreams.Subscription;
018import org.reactivestreams.tck.TestEnvironment.ManualPublisher;
019import org.reactivestreams.tck.TestEnvironment.ManualSubscriber;
020import org.reactivestreams.tck.TestEnvironment.ManualSubscriberWithSubscriptionSupport;
021import org.reactivestreams.tck.TestEnvironment.Promise;
022import org.reactivestreams.tck.flow.support.Function;
023import org.reactivestreams.tck.flow.support.SubscriberWhiteboxVerificationRules;
024import org.reactivestreams.tck.flow.support.PublisherVerificationRules;
025import org.testng.annotations.BeforeMethod;
026import org.testng.annotations.Test;
027
028import java.util.HashSet;
029import java.util.Set;
030
031public abstract class IdentityProcessorVerification<T> extends WithHelperPublisher<T>
032  implements SubscriberWhiteboxVerificationRules, PublisherVerificationRules {
033
034  private final TestEnvironment env;
035
036  ////////////////////// DELEGATED TO SPECS //////////////////////
037
038  // for delegating tests
039  private final SubscriberWhiteboxVerification<T> subscriberVerification;
040
041  // for delegating tests
042  private final PublisherVerification<T> publisherVerification;
043
044  ////////////////// END OF DELEGATED TO SPECS //////////////////
045
046  // number of elements the processor under test must be able ot buffer,
047  // without dropping elements. Defaults to `TestEnvironment.TEST_BUFFER_SIZE`.
048  private final int processorBufferSize;
049
050  /**
051   * Test class must specify the expected time it takes for the publisher to
052   * shut itself down when the the last downstream {@code Subscription} is cancelled.
053   *
054   * The processor will be required to be able to buffer {@code TestEnvironment.TEST_BUFFER_SIZE} elements.
055   */
056  @SuppressWarnings("unused")
057  public IdentityProcessorVerification(final TestEnvironment env) {
058    this(env, PublisherVerification.envPublisherReferenceGCTimeoutMillis(), TestEnvironment.TEST_BUFFER_SIZE);
059  }
060
061  /**
062   * Test class must specify the expected time it takes for the publisher to
063   * shut itself down when the the last downstream {@code Subscription} is cancelled.
064   *
065   * The processor will be required to be able to buffer {@code TestEnvironment.TEST_BUFFER_SIZE} elements.
066   *
067   * @param publisherReferenceGCTimeoutMillis used to determine after how much time a reference to a Subscriber should be already dropped by the Publisher.
068   */
069  @SuppressWarnings("unused")
070  public IdentityProcessorVerification(final TestEnvironment env, long publisherReferenceGCTimeoutMillis) {
071    this(env, publisherReferenceGCTimeoutMillis, TestEnvironment.TEST_BUFFER_SIZE);
072  }
073
074  /**
075   * Test class must specify the expected time it takes for the publisher to
076   * shut itself down when the the last downstream {@code Subscription} is cancelled.
077   *
078   * @param publisherReferenceGCTimeoutMillis used to determine after how much time a reference to a Subscriber should be already dropped by the Publisher.
079   * @param processorBufferSize            number of elements the processor is required to be able to buffer.
080   */
081  public IdentityProcessorVerification(final TestEnvironment env, long publisherReferenceGCTimeoutMillis, int processorBufferSize) {
082    this.env = env;
083    this.processorBufferSize = processorBufferSize;
084
085    this.subscriberVerification = new SubscriberWhiteboxVerification<T>(env) {
086      @Override
087      public Subscriber<T> createSubscriber(WhiteboxSubscriberProbe<T> probe) {
088        return IdentityProcessorVerification.this.createSubscriber(probe);
089      }
090
091      @Override public T createElement(int element) {
092        return IdentityProcessorVerification.this.createElement(element);
093      }
094
095      @Override
096      public Publisher<T> createHelperPublisher(long elements) {
097        return IdentityProcessorVerification.this.createHelperPublisher(elements);
098      }
099    };
100
101    publisherVerification = new PublisherVerification<T>(env, publisherReferenceGCTimeoutMillis) {
102      @Override
103      public Publisher<T> createPublisher(long elements) {
104        return IdentityProcessorVerification.this.createPublisher(elements);
105      }
106
107      @Override
108      public Publisher<T> createFailedPublisher() {
109        return IdentityProcessorVerification.this.createFailedPublisher();
110      }
111
112      @Override
113      public long maxElementsFromPublisher() {
114        return IdentityProcessorVerification.this.maxElementsFromPublisher();
115      }
116
117      @Override
118      public long boundedDepthOfOnNextAndRequestRecursion() {
119        return IdentityProcessorVerification.this.boundedDepthOfOnNextAndRequestRecursion();
120      }
121
122      @Override
123      public boolean skipStochasticTests() {
124        return IdentityProcessorVerification.this.skipStochasticTests();
125      }
126    };
127  }
128
129  /**
130   * This is the main method you must implement in your test incarnation.
131   * It must create a {@link Processor}, which simply forwards all stream elements from its upstream
132   * to its downstream. It must be able to internally buffer the given number of elements.
133   *
134   * @param bufferSize number of elements the processor is required to be able to buffer.
135   */
136  public abstract Processor<T, T> createIdentityProcessor(int bufferSize);
137
138  /**
139   * By implementing this method, additional TCK tests concerning a "failed" publishers will be run.
140   *
141   * The expected behaviour of the {@link Publisher} returned by this method is hand out a subscription,
142   * followed by signalling {@code onError} on it, as specified by Rule 1.9.
143   *
144   * If you want to ignore these additional tests, return {@code null} from this method.
145   */
146  public abstract Publisher<T> createFailedPublisher();
147
148  /**
149   * Override and return lower value if your Publisher is only able to produce a known number of elements.
150   * For example, if it is designed to return at-most-one element, return {@code 1} from this method.
151   *
152   * Defaults to {@code Long.MAX_VALUE - 1}, meaning that the Publisher can be produce a huge but NOT an unbounded number of elements.
153   *
154   * To mark your Publisher will *never* signal an {@code onComplete} override this method and return {@code Long.MAX_VALUE},
155   * which will result in *skipping all tests which require an onComplete to be triggered* (!).
156   */
157  public long maxElementsFromPublisher() {
158    return Long.MAX_VALUE - 1;
159  }
160
161  /**
162   * In order to verify rule 3.3 of the reactive streams spec, this number will be used to check if a
163   * {@code Subscription} actually solves the "unbounded recursion" problem by not allowing the number of
164   * recursive calls to exceed the number returned by this method.
165   *
166   * @see <a href="https://github.com/reactive-streams/reactive-streams-jvm#3.3">reactive streams spec, rule 3.3</a>
167   * @see PublisherVerification#required_spec303_mustNotAllowUnboundedRecursion()
168   */
169  public long boundedDepthOfOnNextAndRequestRecursion() {
170    return 1;
171  }
172
173  /**
174   * Override and return {@code true} in order to skip executing tests marked as {@code Stochastic}.
175   * Such tests MAY sometimes fail even though the impl
176   */
177  public boolean skipStochasticTests() {
178    return false;
179  }
180
181  /**
182   * Describes the tested implementation in terms of how many subscribers they can support.
183   * Some tests require the {@code Publisher} under test to support multiple Subscribers,
184   * yet the spec does not require all publishers to be able to do so, thus – if an implementation
185   * supports only a limited number of subscribers (e.g. only 1 subscriber, also known as "no fanout")
186   * you MUST return that number from this method by overriding it.
187   */
188  public long maxSupportedSubscribers() {
189      return Long.MAX_VALUE;
190  }
191
192  /**
193   * Override this method and return {@code true} if the {@link Processor} returned by the
194   * {@link #createIdentityProcessor(int)} coordinates its {@link Subscriber}s
195   * request amounts and only delivers onNext signals if all Subscribers have
196   * indicated (via their Subscription#request(long)) they are ready to receive elements.
197   */
198  public boolean doesCoordinatedEmission() {
199    return false;
200  }
201
202  ////////////////////// TEST ENV CLEANUP /////////////////////////////////////
203
204  @BeforeMethod
205  public void setUp() throws Exception {
206    publisherVerification.setUp();
207    subscriberVerification.setUp();
208  }
209
210  ////////////////////// PUBLISHER RULES VERIFICATION ///////////////////////////
211
212  // A Processor
213  //   must obey all Publisher rules on its publishing side
214  public Publisher<T> createPublisher(long elements) {
215    final Processor<T, T> processor = createIdentityProcessor(processorBufferSize);
216    final Publisher<T> pub = createHelperPublisher(elements);
217    pub.subscribe(processor);
218    return processor; // we run the PublisherVerification against this
219  }
220
221  @Override @Test
222  public void required_validate_maxElementsFromPublisher() throws Exception {
223    publisherVerification.required_validate_maxElementsFromPublisher();
224  }
225
226  @Override @Test
227  public void required_validate_boundedDepthOfOnNextAndRequestRecursion() throws Exception {
228    publisherVerification.required_validate_boundedDepthOfOnNextAndRequestRecursion();
229  }
230
231  /////////////////////// DELEGATED TESTS, A PROCESSOR "IS A" PUBLISHER //////////////////////
232  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#4.1
233
234  @Test
235  public void required_createPublisher1MustProduceAStreamOfExactly1Element() throws Throwable {
236    publisherVerification.required_createPublisher1MustProduceAStreamOfExactly1Element();
237  }
238
239  @Test
240  public void required_createPublisher3MustProduceAStreamOfExactly3Elements() throws Throwable {
241    publisherVerification.required_createPublisher3MustProduceAStreamOfExactly3Elements();
242  }
243
244  @Override @Test
245  public void required_spec101_subscriptionRequestMustResultInTheCorrectNumberOfProducedElements() throws Throwable {
246    publisherVerification.required_spec101_subscriptionRequestMustResultInTheCorrectNumberOfProducedElements();
247  }
248
249  @Override @Test
250  public void required_spec102_maySignalLessThanRequestedAndTerminateSubscription() throws Throwable {
251    publisherVerification.required_spec102_maySignalLessThanRequestedAndTerminateSubscription();
252  }
253
254  @Override @Test
255  public void stochastic_spec103_mustSignalOnMethodsSequentially() throws Throwable {
256    publisherVerification.stochastic_spec103_mustSignalOnMethodsSequentially();
257  }
258
259  @Override @Test
260  public void optional_spec104_mustSignalOnErrorWhenFails() throws Throwable {
261    publisherVerification.optional_spec104_mustSignalOnErrorWhenFails();
262  }
263
264  @Override @Test
265  public void required_spec105_mustSignalOnCompleteWhenFiniteStreamTerminates() throws Throwable {
266    publisherVerification.required_spec105_mustSignalOnCompleteWhenFiniteStreamTerminates();
267  }
268
269  @Override @Test
270  public void optional_spec105_emptyStreamMustTerminateBySignallingOnComplete() throws Throwable {
271    publisherVerification.optional_spec105_emptyStreamMustTerminateBySignallingOnComplete();
272  }
273
274  @Override @Test
275  public void untested_spec106_mustConsiderSubscriptionCancelledAfterOnErrorOrOnCompleteHasBeenCalled() throws Throwable {
276    publisherVerification.untested_spec106_mustConsiderSubscriptionCancelledAfterOnErrorOrOnCompleteHasBeenCalled();
277  }
278
279  @Override @Test
280  public void required_spec107_mustNotEmitFurtherSignalsOnceOnCompleteHasBeenSignalled() throws Throwable {
281    publisherVerification.required_spec107_mustNotEmitFurtherSignalsOnceOnCompleteHasBeenSignalled();
282  }
283
284  @Override @Test
285  public void untested_spec107_mustNotEmitFurtherSignalsOnceOnErrorHasBeenSignalled() throws Throwable {
286    publisherVerification.untested_spec107_mustNotEmitFurtherSignalsOnceOnErrorHasBeenSignalled();
287  }
288
289  @Override @Test
290  public void untested_spec108_possiblyCanceledSubscriptionShouldNotReceiveOnErrorOrOnCompleteSignals() throws Throwable {
291    publisherVerification.untested_spec108_possiblyCanceledSubscriptionShouldNotReceiveOnErrorOrOnCompleteSignals();
292  }
293
294  @Override @Test
295  public void untested_spec109_subscribeShouldNotThrowNonFatalThrowable() throws Throwable {
296    publisherVerification.untested_spec109_subscribeShouldNotThrowNonFatalThrowable();
297  }
298
299  @Override @Test
300  public void required_spec109_subscribeThrowNPEOnNullSubscriber() throws Throwable {
301    publisherVerification.required_spec109_subscribeThrowNPEOnNullSubscriber();
302  }
303
304  @Override @Test
305  public void required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorAfterOnSubscribe() throws Throwable {
306    publisherVerification.required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorAfterOnSubscribe();
307  }
308
309  @Override @Test
310  public void required_spec109_mustIssueOnSubscribeForNonNullSubscriber() throws Throwable {
311    publisherVerification.required_spec109_mustIssueOnSubscribeForNonNullSubscriber();
312  }
313
314  @Override @Test
315  public void untested_spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice() throws Throwable {
316    publisherVerification.untested_spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice();
317  }
318
319  @Override @Test
320  public void optional_spec111_maySupportMultiSubscribe() throws Throwable {
321    publisherVerification.optional_spec111_maySupportMultiSubscribe();
322  }
323
324  @Override @Test
325  public void optional_spec111_registeredSubscribersMustReceiveOnNextOrOnCompleteSignals() throws Throwable {
326    publisherVerification.optional_spec111_registeredSubscribersMustReceiveOnNextOrOnCompleteSignals();
327  }
328
329  @Override @Test
330  public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne() throws Throwable {
331    publisherVerification.optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne();
332  }
333
334  @Override @Test
335  public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront() throws Throwable {
336    publisherVerification.optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront();
337  }
338
339  @Override @Test
340  public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected() throws Throwable {
341    publisherVerification.optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected();
342  }
343
344  @Override @Test
345  public void required_spec302_mustAllowSynchronousRequestCallsFromOnNextAndOnSubscribe() throws Throwable {
346    publisherVerification.required_spec302_mustAllowSynchronousRequestCallsFromOnNextAndOnSubscribe();
347  }
348
349  @Override @Test
350  public void required_spec303_mustNotAllowUnboundedRecursion() throws Throwable {
351    publisherVerification.required_spec303_mustNotAllowUnboundedRecursion();
352  }
353
354  @Override @Test
355  public void untested_spec304_requestShouldNotPerformHeavyComputations() throws Exception {
356    publisherVerification.untested_spec304_requestShouldNotPerformHeavyComputations();
357  }
358
359  @Override @Test
360  public void untested_spec305_cancelMustNotSynchronouslyPerformHeavyComputation() throws Exception {
361    publisherVerification.untested_spec305_cancelMustNotSynchronouslyPerformHeavyComputation();
362  }
363
364  @Override @Test
365  public void required_spec306_afterSubscriptionIsCancelledRequestMustBeNops() throws Throwable {
366    publisherVerification.required_spec306_afterSubscriptionIsCancelledRequestMustBeNops();
367  }
368
369  @Override @Test
370  public void required_spec307_afterSubscriptionIsCancelledAdditionalCancelationsMustBeNops() throws Throwable {
371    publisherVerification.required_spec307_afterSubscriptionIsCancelledAdditionalCancelationsMustBeNops();
372  }
373
374  @Override @Test
375  public void required_spec309_requestZeroMustSignalIllegalArgumentException() throws Throwable {
376    publisherVerification.required_spec309_requestZeroMustSignalIllegalArgumentException();
377  }
378
379  @Override @Test
380  public void required_spec309_requestNegativeNumberMustSignalIllegalArgumentException() throws Throwable {
381    publisherVerification.required_spec309_requestNegativeNumberMustSignalIllegalArgumentException();
382  }
383  
384  @Override @Test
385  public void optional_spec309_requestNegativeNumberMaySignalIllegalArgumentExceptionWithSpecificMessage() throws Throwable {
386    publisherVerification.optional_spec309_requestNegativeNumberMaySignalIllegalArgumentExceptionWithSpecificMessage();
387  }
388
389  @Override @Test
390  public void required_spec312_cancelMustMakeThePublisherToEventuallyStopSignaling() throws Throwable {
391    publisherVerification.required_spec312_cancelMustMakeThePublisherToEventuallyStopSignaling();
392  }
393
394  @Override @Test
395  public void required_spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber() throws Throwable {
396    publisherVerification.required_spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber();
397  }
398
399  @Override @Test
400  public void required_spec317_mustSupportAPendingElementCountUpToLongMaxValue() throws Throwable {
401    publisherVerification.required_spec317_mustSupportAPendingElementCountUpToLongMaxValue();
402  }
403
404  @Override @Test
405  public void required_spec317_mustSupportACumulativePendingElementCountUpToLongMaxValue() throws Throwable {
406    publisherVerification.required_spec317_mustSupportACumulativePendingElementCountUpToLongMaxValue();
407  }
408
409  @Override @Test
410  public void required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue() throws Throwable {
411    publisherVerification.required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue();
412  }
413
414
415  /**
416   * Asks for a {@code Processor} that supports at least 2 {@code Subscriber}s at once and checks if two {@code Subscriber}s
417   * receive the same items and a terminal {@code Exception}.
418   * <p>
419   * If the {@code Processor} requests and/or emits items only when all of its {@code Subscriber}s have requested,
420   * override {@link #doesCoordinatedEmission()} and return {@code true} to indicate this property.
421   * <p>
422   * <b>Verifies rule:</b> <a href='https://github.com/reactive-streams/reactive-streams-jvm#1.4'>1.4</a> with multiple
423   * {@code Subscriber}s.
424   * <p>
425   * The test is not executed if {@link IdentityProcessorVerification#maxSupportedSubscribers()} is less than 2.
426   * <p>
427   * If this test fails, the following could be checked within the {@code Processor} implementation:
428   * <ul>
429   * <li>The {@code TestEnvironment} has large enough timeout specified in case the {@code Processor} has some time-delay behavior.</li>
430   * <li>The {@code Processor} is able to fulfill requests of its {@code Subscriber}s independently of each other's requests or
431   * else override {@link #doesCoordinatedEmission()} and return {@code true} to indicate the test {@code Subscriber}s
432   * both have to request first.</li>
433   * </ul>
434   */
435  @Test
436  public void required_spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError() throws Throwable {
437    optionalMultipleSubscribersTest(2, new Function<Long,TestSetup>() {
438      @Override
439      public TestSetup apply(Long aLong) throws Throwable {
440        return new TestSetup(env, processorBufferSize) {{
441          final ManualSubscriberWithErrorCollection<T> sub1 = new ManualSubscriberWithErrorCollection<T>(env);
442          env.subscribe(processor, sub1);
443
444          final ManualSubscriberWithErrorCollection<T> sub2 = new ManualSubscriberWithErrorCollection<T>(env);
445          env.subscribe(processor, sub2);
446
447          final Exception ex = new RuntimeException("Test exception");
448
449          if (doesCoordinatedEmission()) {
450            sub1.request(1);
451            sub2.request(1);
452
453            expectRequest();
454
455            final T x = sendNextTFromUpstream();
456
457            expectNextElement(sub1, x);
458            expectNextElement(sub2, x);
459
460            sub1.request(1);
461            sub2.request(1);
462          } else {
463            sub1.request(1);
464
465            expectRequest(env.defaultTimeoutMillis(),
466                    "If the Processor coordinates requests/emissions when having multiple Subscribers"
467                    + " at once, please override doesCoordinatedEmission() to return true in this "
468                    + "IdentityProcessorVerification to allow this test to pass.");
469
470            final T x = sendNextTFromUpstream();
471            expectNextElement(sub1, x,
472                    "If the Processor coordinates requests/emissions when having multiple Subscribers"
473                            + " at once, please override doesCoordinatedEmission() to return true in this "
474                            + "IdentityProcessorVerification to allow this test to pass.");
475
476            sub1.request(1);
477
478            // sub1 has received one element, and has one demand pending
479            // sub2 has not yet requested anything
480          }
481          sendError(ex);
482
483          sub1.expectError(ex);
484          sub2.expectError(ex);
485
486          env.verifyNoAsyncErrorsNoDelay();
487        }};
488      }
489    });
490  }
491
492  ////////////////////// SUBSCRIBER RULES VERIFICATION ///////////////////////////
493  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#4.1
494
495  // A Processor
496  //   must obey all Subscriber rules on its consuming side
497  public Subscriber<T> createSubscriber(final SubscriberWhiteboxVerification.WhiteboxSubscriberProbe<T> probe) {
498    final Processor<T, T> processor = createIdentityProcessor(processorBufferSize);
499    processor.subscribe(
500        new Subscriber<T>() {
501          private final Promise<Subscription> subs = new Promise<Subscription>(env);
502
503          @Override
504          public void onSubscribe(final Subscription subscription) {
505            env.debug(String.format("whiteboxSubscriber::onSubscribe(%s)", subscription));
506            if (subs.isCompleted()) subscription.cancel(); // the Probe must also pass subscriber verification
507
508            probe.registerOnSubscribe(new SubscriberWhiteboxVerification.SubscriberPuppet() {
509
510              @Override
511              public void triggerRequest(long elements) {
512                subscription.request(elements);
513              }
514
515              @Override
516              public void signalCancel() {
517                subscription.cancel();
518              }
519            });
520          }
521
522          @Override
523          public void onNext(T element) {
524            env.debug(String.format("whiteboxSubscriber::onNext(%s)", element));
525            probe.registerOnNext(element);
526          }
527
528          @Override
529          public void onComplete() {
530            env.debug("whiteboxSubscriber::onComplete()");
531            probe.registerOnComplete();
532          }
533
534          @Override
535          public void onError(Throwable cause) {
536            env.debug(String.format("whiteboxSubscriber::onError(%s)", cause));
537            probe.registerOnError(cause);
538          }
539        });
540
541    return processor; // we run the SubscriberVerification against this
542  }
543
544  ////////////////////// OTHER RULE VERIFICATION ///////////////////////////
545
546  // A Processor
547  //   must immediately pass on `onError` events received from its upstream to its downstream
548  @Test
549  public void mustImmediatelyPassOnOnErrorEventsReceivedFromItsUpstreamToItsDownstream() throws Exception {
550    new TestSetup(env, processorBufferSize) {{
551      final ManualSubscriberWithErrorCollection<T> sub = new ManualSubscriberWithErrorCollection<T>(env);
552      env.subscribe(processor, sub);
553
554      final Exception ex = new RuntimeException("Test exception");
555      sendError(ex);
556      sub.expectError(ex); // "immediately", i.e. without a preceding request
557
558      env.verifyNoAsyncErrorsNoDelay();
559    }};
560  }
561
562  /////////////////////// DELEGATED TESTS, A PROCESSOR "IS A" SUBSCRIBER //////////////////////
563  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#4.1
564
565  @Test
566  public void required_exerciseWhiteboxHappyPath() throws Throwable {
567    subscriberVerification.required_exerciseWhiteboxHappyPath();
568  }
569
570  @Override @Test
571  public void required_spec201_mustSignalDemandViaSubscriptionRequest() throws Throwable {
572    subscriberVerification.required_spec201_mustSignalDemandViaSubscriptionRequest();
573  }
574
575  @Override @Test
576  public void untested_spec202_shouldAsynchronouslyDispatch() throws Exception {
577    subscriberVerification.untested_spec202_shouldAsynchronouslyDispatch();
578  }
579
580  @Override @Test
581  public void required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnComplete() throws Throwable {
582    subscriberVerification.required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnComplete();
583  }
584
585  @Override @Test
586  public void required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnError() throws Throwable {
587    subscriberVerification.required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnError();
588  }
589
590  @Override @Test
591  public void untested_spec204_mustConsiderTheSubscriptionAsCancelledInAfterRecievingOnCompleteOrOnError() throws Exception {
592    subscriberVerification.untested_spec204_mustConsiderTheSubscriptionAsCancelledInAfterRecievingOnCompleteOrOnError();
593  }
594
595  @Override @Test
596  public void required_spec205_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal() throws Throwable {
597    subscriberVerification.required_spec205_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal();
598  }
599
600  @Override @Test
601  public void untested_spec206_mustCallSubscriptionCancelIfItIsNoLongerValid() throws Exception {
602    subscriberVerification.untested_spec206_mustCallSubscriptionCancelIfItIsNoLongerValid();
603  }
604
605  @Override @Test
606  public void untested_spec207_mustEnsureAllCallsOnItsSubscriptionTakePlaceFromTheSameThreadOrTakeCareOfSynchronization() throws Exception {
607    subscriberVerification.untested_spec207_mustEnsureAllCallsOnItsSubscriptionTakePlaceFromTheSameThreadOrTakeCareOfSynchronization();
608  }
609
610  @Override @Test
611  public void required_spec208_mustBePreparedToReceiveOnNextSignalsAfterHavingCalledSubscriptionCancel() throws Throwable {
612    subscriberVerification.required_spec208_mustBePreparedToReceiveOnNextSignalsAfterHavingCalledSubscriptionCancel();
613  }
614
615  @Override @Test
616  public void required_spec209_mustBePreparedToReceiveAnOnCompleteSignalWithPrecedingRequestCall() throws Throwable {
617    subscriberVerification.required_spec209_mustBePreparedToReceiveAnOnCompleteSignalWithPrecedingRequestCall();
618  }
619
620  @Override @Test
621  public void required_spec209_mustBePreparedToReceiveAnOnCompleteSignalWithoutPrecedingRequestCall() throws Throwable {
622    subscriberVerification.required_spec209_mustBePreparedToReceiveAnOnCompleteSignalWithoutPrecedingRequestCall();
623  }
624
625  @Override @Test
626  public void required_spec210_mustBePreparedToReceiveAnOnErrorSignalWithPrecedingRequestCall() throws Throwable {
627    subscriberVerification.required_spec210_mustBePreparedToReceiveAnOnErrorSignalWithPrecedingRequestCall();
628  }
629
630  @Override @Test
631  public void required_spec210_mustBePreparedToReceiveAnOnErrorSignalWithoutPrecedingRequestCall() throws Throwable {
632    subscriberVerification.required_spec210_mustBePreparedToReceiveAnOnErrorSignalWithoutPrecedingRequestCall();
633  }
634
635  @Override @Test
636  public void untested_spec211_mustMakeSureThatAllCallsOnItsMethodsHappenBeforeTheProcessingOfTheRespectiveEvents() throws Exception {
637    subscriberVerification.untested_spec211_mustMakeSureThatAllCallsOnItsMethodsHappenBeforeTheProcessingOfTheRespectiveEvents();
638  }
639
640  @Override @Test
641  public void untested_spec212_mustNotCallOnSubscribeMoreThanOnceBasedOnObjectEquality_specViolation() throws Throwable {
642    subscriberVerification.untested_spec212_mustNotCallOnSubscribeMoreThanOnceBasedOnObjectEquality_specViolation();
643  }
644
645  @Override @Test
646  public void untested_spec213_failingOnSignalInvocation() throws Exception {
647    subscriberVerification.untested_spec213_failingOnSignalInvocation();
648  }
649
650  @Override @Test
651  public void required_spec213_onSubscribe_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable {
652    subscriberVerification.required_spec213_onSubscribe_mustThrowNullPointerExceptionWhenParametersAreNull();
653  }
654  @Override @Test
655  public void required_spec213_onNext_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable {
656    subscriberVerification.required_spec213_onNext_mustThrowNullPointerExceptionWhenParametersAreNull();
657  }
658  @Override @Test
659  public void required_spec213_onError_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable {
660    subscriberVerification.required_spec213_onError_mustThrowNullPointerExceptionWhenParametersAreNull();
661  }
662
663  @Override @Test
664  public void untested_spec301_mustNotBeCalledOutsideSubscriberContext() throws Exception {
665    subscriberVerification.untested_spec301_mustNotBeCalledOutsideSubscriberContext();
666  }
667
668  @Override @Test
669  public void required_spec308_requestMustRegisterGivenNumberElementsToBeProduced() throws Throwable {
670    subscriberVerification.required_spec308_requestMustRegisterGivenNumberElementsToBeProduced();
671  }
672
673  @Override @Test
674  public void untested_spec310_requestMaySynchronouslyCallOnNextOnSubscriber() throws Exception {
675    subscriberVerification.untested_spec310_requestMaySynchronouslyCallOnNextOnSubscriber();
676  }
677
678  @Override @Test
679  public void untested_spec311_requestMaySynchronouslyCallOnCompleteOrOnError() throws Exception {
680    subscriberVerification.untested_spec311_requestMaySynchronouslyCallOnCompleteOrOnError();
681  }
682
683  @Override @Test
684  public void untested_spec314_cancelMayCauseThePublisherToShutdownIfNoOtherSubscriptionExists() throws Exception {
685    subscriberVerification.untested_spec314_cancelMayCauseThePublisherToShutdownIfNoOtherSubscriptionExists();
686  }
687
688  @Override @Test
689  public void untested_spec315_cancelMustNotThrowExceptionAndMustSignalOnError() throws Exception {
690    subscriberVerification.untested_spec315_cancelMustNotThrowExceptionAndMustSignalOnError();
691  }
692
693  @Override @Test
694  public void untested_spec316_requestMustNotThrowExceptionAndMustOnErrorTheSubscriber() throws Exception {
695    subscriberVerification.untested_spec316_requestMustNotThrowExceptionAndMustOnErrorTheSubscriber();
696  }
697
698  /////////////////////// ADDITIONAL "COROLLARY" TESTS //////////////////////
699
700  /**
701   * Asks for a {@code Processor} that supports at least 2 {@code Subscriber}s at once and checks requests
702   * from {@code Subscriber}s will eventually lead to requests towards the upstream of the {@code Processor}.
703   * <p>
704   * If the {@code Processor} requests and/or emits items only when all of its {@code Subscriber}s have requested,
705   * override {@link #doesCoordinatedEmission()} and return {@code true} to indicate this property.
706   * <p>
707   * <b>Verifies rule:</b> <a href='https://github.com/reactive-streams/reactive-streams-jvm#1.4'>2.1</a> with multiple
708   * {@code Subscriber}s.
709   * <p>
710   * The test is not executed if {@link IdentityProcessorVerification#maxSupportedSubscribers()} is less than 2.
711   * <p>
712   * If this test fails, the following could be checked within the {@code Processor} implementation:
713   * <ul>
714   * <li>The {@code TestEnvironment} has large enough timeout specified in case the {@code Processor} has some time-delay behavior.</li>
715   * <li>The {@code Processor} is able to fulfill requests of its {@code Subscriber}s independently of each other's requests or
716   * else override {@link #doesCoordinatedEmission()} and return {@code true} to indicate the test {@code Subscriber}s
717   * both have to request first.</li>
718   * </ul>
719   */
720  @Test
721  public void required_mustRequestFromUpstreamForElementsThatHaveBeenRequestedLongAgo() throws Throwable {
722    optionalMultipleSubscribersTest(2, new Function<Long,TestSetup>() {
723      @Override
724      public TestSetup apply(Long subscribers) throws Throwable {
725        return new TestSetup(env, processorBufferSize) {{
726          ManualSubscriber<T> sub1 = newSubscriber();
727          sub1.request(20);
728
729          long totalRequests = expectRequest();
730          final T x = sendNextTFromUpstream();
731          expectNextElement(sub1, x);
732
733          if (totalRequests == 1) {
734            totalRequests += expectRequest();
735          }
736          final T y = sendNextTFromUpstream();
737          expectNextElement(sub1, y);
738
739          if (totalRequests == 2) {
740            totalRequests += expectRequest();
741          }
742
743          final ManualSubscriber<T> sub2 = newSubscriber();
744
745          // sub1 now has 18 pending
746          // sub2 has 0 pending
747
748          if (doesCoordinatedEmission()) {
749            sub2.expectNone(); // since sub2 hasn't requested anything yet
750
751            sub2.request(1);
752
753            final T z = sendNextTFromUpstream();
754            expectNextElement(sub1, z);
755            expectNextElement(sub2, z);
756          } else {
757            final T z = sendNextTFromUpstream();
758            expectNextElement(sub1, z,
759                    "If the Processor coordinates requests/emissions when having multiple Subscribers"
760                            + " at once, please override doesCoordinatedEmission() to return true in this "
761                            + "IdentityProcessorVerification to allow this test to pass.");
762            sub2.expectNone(); // since sub2 hasn't requested anything yet
763
764            sub2.request(1);
765            expectNextElement(sub2, z);
766          }
767          if (totalRequests == 3) {
768            expectRequest();
769          }
770
771          // to avoid error messages during test harness shutdown
772          sendCompletion();
773          sub1.expectCompletion(env.defaultTimeoutMillis());
774          sub2.expectCompletion(env.defaultTimeoutMillis());
775
776          env.verifyNoAsyncErrorsNoDelay();
777        }};
778      }
779    });
780  }
781
782  /////////////////////// TEST INFRASTRUCTURE //////////////////////
783
784  public void notVerified() {
785    publisherVerification.notVerified();
786  }
787
788  public void notVerified(String message) {
789    publisherVerification.notVerified(message);
790  }
791
792  /**
793   * Test for feature that REQUIRES multiple subscribers to be supported by Publisher.
794   */
795  public void optionalMultipleSubscribersTest(long requiredSubscribersSupport, Function<Long, TestSetup> body) throws Throwable {
796    if (requiredSubscribersSupport > maxSupportedSubscribers())
797      notVerified(String.format("The Publisher under test only supports %d subscribers, while this test requires at least %d to run.",
798                                maxSupportedSubscribers(), requiredSubscribersSupport));
799    else body.apply(requiredSubscribersSupport);
800  }
801
802  public abstract class TestSetup extends ManualPublisher<T> {
803    final private ManualSubscriber<T> tees; // gives us access to an infinite stream of T values
804    private Set<T> seenTees = new HashSet<T>();
805
806    final Processor<T, T> processor;
807
808    public TestSetup(TestEnvironment env, int testBufferSize) throws InterruptedException {
809      super(env);
810      tees = env.newManualSubscriber(createHelperPublisher(Long.MAX_VALUE));
811      processor = createIdentityProcessor(testBufferSize);
812      subscribe(processor);
813    }
814
815    public ManualSubscriber<T> newSubscriber() throws InterruptedException {
816      return env.newManualSubscriber(processor);
817    }
818
819    public T nextT() throws InterruptedException {
820      final T t = tees.requestNextElement();
821      if (seenTees.contains(t)) {
822        env.flop(String.format("Helper publisher illegally produced the same element %s twice", t));
823      }
824      seenTees.add(t);
825      return t;
826    }
827
828    public void expectNextElement(ManualSubscriber<T> sub, T expected) throws InterruptedException {
829      final T elem = sub.nextElement(String.format("timeout while awaiting %s", expected));
830      if (!elem.equals(expected)) {
831        env.flop(String.format("Received `onNext(%s)` on downstream but expected `onNext(%s)`", elem, expected));
832      }
833    }
834
835    public void expectNextElement(ManualSubscriber<T> sub, T expected, String errorMessageAddendum) throws InterruptedException {
836      final T elem = sub.nextElement(String.format("timeout while awaiting %s. %s", expected, errorMessageAddendum));
837      if (!elem.equals(expected)) {
838        env.flop(String.format("Received `onNext(%s)` on downstream but expected `onNext(%s)`", elem, expected));
839      }
840    }
841
842    public T sendNextTFromUpstream() throws InterruptedException {
843      final T x = nextT();
844      sendNext(x);
845      return x;
846    }
847  }
848
849  public class ManualSubscriberWithErrorCollection<A> extends ManualSubscriberWithSubscriptionSupport<A> {
850    Promise<Throwable> error;
851
852    public ManualSubscriberWithErrorCollection(TestEnvironment env) {
853      super(env);
854      error = new Promise<Throwable>(env);
855    }
856
857    @Override
858    public void onError(Throwable cause) {
859      error.complete(cause);
860    }
861
862    public void expectError(Throwable cause) throws InterruptedException {
863      expectError(cause, env.defaultTimeoutMillis());
864    }
865
866    @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
867    public void expectError(Throwable cause, long timeoutMillis) throws InterruptedException {
868      error.expectCompletion(timeoutMillis, "Did not receive expected error on downstream");
869      if (!cause.equals(error.value())) {
870        env.flop(String.format("Expected error %s but got %s", cause, error.value()));
871      }
872    }
873  }
874}