001/***************************************************
002 * Licensed under MIT No Attribution (SPDX: MIT-0) *
003 ***************************************************/
004
005package org.reactivestreams.tck;
006
007import org.reactivestreams.Processor;
008import org.reactivestreams.Publisher;
009import org.reactivestreams.Subscriber;
010import org.reactivestreams.Subscription;
011import org.reactivestreams.tck.TestEnvironment.ManualPublisher;
012import org.reactivestreams.tck.TestEnvironment.ManualSubscriber;
013import org.reactivestreams.tck.TestEnvironment.ManualSubscriberWithSubscriptionSupport;
014import org.reactivestreams.tck.TestEnvironment.Promise;
015import org.reactivestreams.tck.flow.support.Function;
016import org.reactivestreams.tck.flow.support.SubscriberWhiteboxVerificationRules;
017import org.reactivestreams.tck.flow.support.PublisherVerificationRules;
018import org.testng.annotations.BeforeMethod;
019import org.testng.annotations.Test;
020
021import java.util.HashSet;
022import java.util.Set;
023
024public abstract class IdentityProcessorVerification<T> extends WithHelperPublisher<T>
025  implements SubscriberWhiteboxVerificationRules, PublisherVerificationRules {
026
027  private final TestEnvironment env;
028
029  ////////////////////// DELEGATED TO SPECS //////////////////////
030
031  // for delegating tests
032  private final SubscriberWhiteboxVerification<T> subscriberVerification;
033
034  // for delegating tests
035  private final PublisherVerification<T> publisherVerification;
036
037  ////////////////// END OF DELEGATED TO SPECS //////////////////
038
039  // number of elements the processor under test must be able ot buffer,
040  // without dropping elements. Defaults to `TestEnvironment.TEST_BUFFER_SIZE`.
041  private final int processorBufferSize;
042
043  /**
044   * Test class must specify the expected time it takes for the publisher to
045   * shut itself down when the the last downstream {@code Subscription} is cancelled.
046   *
047   * The processor will be required to be able to buffer {@code TestEnvironment.TEST_BUFFER_SIZE} elements.
048   */
049  @SuppressWarnings("unused")
050  public IdentityProcessorVerification(final TestEnvironment env) {
051    this(env, PublisherVerification.envPublisherReferenceGCTimeoutMillis(), TestEnvironment.TEST_BUFFER_SIZE);
052  }
053
054  /**
055   * Test class must specify the expected time it takes for the publisher to
056   * shut itself down when the the last downstream {@code Subscription} is cancelled.
057   *
058   * The processor will be required to be able to buffer {@code TestEnvironment.TEST_BUFFER_SIZE} elements.
059   *
060   * @param publisherReferenceGCTimeoutMillis used to determine after how much time a reference to a Subscriber should be already dropped by the Publisher.
061   */
062  @SuppressWarnings("unused")
063  public IdentityProcessorVerification(final TestEnvironment env, long publisherReferenceGCTimeoutMillis) {
064    this(env, publisherReferenceGCTimeoutMillis, TestEnvironment.TEST_BUFFER_SIZE);
065  }
066
067  /**
068   * Test class must specify the expected time it takes for the publisher to
069   * shut itself down when the the last downstream {@code Subscription} is cancelled.
070   *
071   * @param publisherReferenceGCTimeoutMillis used to determine after how much time a reference to a Subscriber should be already dropped by the Publisher.
072   * @param processorBufferSize            number of elements the processor is required to be able to buffer.
073   */
074  public IdentityProcessorVerification(final TestEnvironment env, long publisherReferenceGCTimeoutMillis, int processorBufferSize) {
075    this.env = env;
076    this.processorBufferSize = processorBufferSize;
077
078    this.subscriberVerification = new SubscriberWhiteboxVerification<T>(env) {
079      @Override
080      public Subscriber<T> createSubscriber(WhiteboxSubscriberProbe<T> probe) {
081        return IdentityProcessorVerification.this.createSubscriber(probe);
082      }
083
084      @Override public T createElement(int element) {
085        return IdentityProcessorVerification.this.createElement(element);
086      }
087
088      @Override
089      public Publisher<T> createHelperPublisher(long elements) {
090        return IdentityProcessorVerification.this.createHelperPublisher(elements);
091      }
092    };
093
094    publisherVerification = new PublisherVerification<T>(env, publisherReferenceGCTimeoutMillis) {
095      @Override
096      public Publisher<T> createPublisher(long elements) {
097        return IdentityProcessorVerification.this.createPublisher(elements);
098      }
099
100      @Override
101      public Publisher<T> createFailedPublisher() {
102        return IdentityProcessorVerification.this.createFailedPublisher();
103      }
104
105      @Override
106      public long maxElementsFromPublisher() {
107        return IdentityProcessorVerification.this.maxElementsFromPublisher();
108      }
109
110      @Override
111      public long boundedDepthOfOnNextAndRequestRecursion() {
112        return IdentityProcessorVerification.this.boundedDepthOfOnNextAndRequestRecursion();
113      }
114
115      @Override
116      public boolean skipStochasticTests() {
117        return IdentityProcessorVerification.this.skipStochasticTests();
118      }
119    };
120  }
121
122  /**
123   * This is the main method you must implement in your test incarnation.
124   * It must create a {@link Processor}, which simply forwards all stream elements from its upstream
125   * to its downstream. It must be able to internally buffer the given number of elements.
126   *
127   * @param bufferSize number of elements the processor is required to be able to buffer.
128   */
129  public abstract Processor<T, T> createIdentityProcessor(int bufferSize);
130
131  /**
132   * By implementing this method, additional TCK tests concerning a "failed" publishers will be run.
133   *
134   * The expected behaviour of the {@link Publisher} returned by this method is hand out a subscription,
135   * followed by signalling {@code onError} on it, as specified by Rule 1.9.
136   *
137   * If you want to ignore these additional tests, return {@code null} from this method.
138   */
139  public abstract Publisher<T> createFailedPublisher();
140
141  /**
142   * Override and return lower value if your Publisher is only able to produce a known number of elements.
143   * For example, if it is designed to return at-most-one element, return {@code 1} from this method.
144   *
145   * Defaults to {@code Long.MAX_VALUE - 1}, meaning that the Publisher can be produce a huge but NOT an unbounded number of elements.
146   *
147   * To mark your Publisher will *never* signal an {@code onComplete} override this method and return {@code Long.MAX_VALUE},
148   * which will result in *skipping all tests which require an onComplete to be triggered* (!).
149   */
150  public long maxElementsFromPublisher() {
151    return Long.MAX_VALUE - 1;
152  }
153
154  /**
155   * In order to verify rule 3.3 of the reactive streams spec, this number will be used to check if a
156   * {@code Subscription} actually solves the "unbounded recursion" problem by not allowing the number of
157   * recursive calls to exceed the number returned by this method.
158   *
159   * @see <a href="https://github.com/reactive-streams/reactive-streams-jvm#3.3">reactive streams spec, rule 3.3</a>
160   * @see PublisherVerification#required_spec303_mustNotAllowUnboundedRecursion()
161   */
162  public long boundedDepthOfOnNextAndRequestRecursion() {
163    return 1;
164  }
165
166  /**
167   * Override and return {@code true} in order to skip executing tests marked as {@code Stochastic}.
168   * Stochastic in this case means that the Rule is impossible or infeasible to deterministically verify—
169   * usually this means that this test case can yield false positives ("be green") even if for some case,
170   * the given implementation may violate the tested behaviour.
171   */
172  public boolean skipStochasticTests() {
173    return false;
174  }
175
176  /**
177   * Describes the tested implementation in terms of how many subscribers they can support.
178   * Some tests require the {@code Publisher} under test to support multiple Subscribers,
179   * yet the spec does not require all publishers to be able to do so, thus – if an implementation
180   * supports only a limited number of subscribers (e.g. only 1 subscriber, also known as "no fanout")
181   * you MUST return that number from this method by overriding it.
182   */
183  public long maxSupportedSubscribers() {
184      return Long.MAX_VALUE;
185  }
186
187  /**
188   * Override this method and return {@code true} if the {@link Processor} returned by the
189   * {@link #createIdentityProcessor(int)} coordinates its {@link Subscriber}s
190   * request amounts and only delivers onNext signals if all Subscribers have
191   * indicated (via their Subscription#request(long)) they are ready to receive elements.
192   */
193  public boolean doesCoordinatedEmission() {
194    return false;
195  }
196
197  ////////////////////// TEST ENV CLEANUP /////////////////////////////////////
198
199  @BeforeMethod
200  public void setUp() throws Exception {
201    publisherVerification.setUp();
202    subscriberVerification.setUp();
203  }
204
205  ////////////////////// PUBLISHER RULES VERIFICATION ///////////////////////////
206
207  // A Processor
208  //   must obey all Publisher rules on its publishing side
209  public Publisher<T> createPublisher(long elements) {
210    final Processor<T, T> processor = createIdentityProcessor(processorBufferSize);
211    final Publisher<T> pub = createHelperPublisher(elements);
212    pub.subscribe(processor);
213    return processor; // we run the PublisherVerification against this
214  }
215
216  @Override @Test
217  public void required_validate_maxElementsFromPublisher() throws Exception {
218    publisherVerification.required_validate_maxElementsFromPublisher();
219  }
220
221  @Override @Test
222  public void required_validate_boundedDepthOfOnNextAndRequestRecursion() throws Exception {
223    publisherVerification.required_validate_boundedDepthOfOnNextAndRequestRecursion();
224  }
225
226  /////////////////////// DELEGATED TESTS, A PROCESSOR "IS A" PUBLISHER //////////////////////
227  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#4.1
228
229  @Test
230  public void required_createPublisher1MustProduceAStreamOfExactly1Element() throws Throwable {
231    publisherVerification.required_createPublisher1MustProduceAStreamOfExactly1Element();
232  }
233
234  @Test
235  public void required_createPublisher3MustProduceAStreamOfExactly3Elements() throws Throwable {
236    publisherVerification.required_createPublisher3MustProduceAStreamOfExactly3Elements();
237  }
238
239  @Override @Test
240  public void required_spec101_subscriptionRequestMustResultInTheCorrectNumberOfProducedElements() throws Throwable {
241    publisherVerification.required_spec101_subscriptionRequestMustResultInTheCorrectNumberOfProducedElements();
242  }
243
244  @Override @Test
245  public void required_spec102_maySignalLessThanRequestedAndTerminateSubscription() throws Throwable {
246    publisherVerification.required_spec102_maySignalLessThanRequestedAndTerminateSubscription();
247  }
248
249  @Override @Test
250  public void stochastic_spec103_mustSignalOnMethodsSequentially() throws Throwable {
251    publisherVerification.stochastic_spec103_mustSignalOnMethodsSequentially();
252  }
253
254  @Override @Test
255  public void optional_spec104_mustSignalOnErrorWhenFails() throws Throwable {
256    publisherVerification.optional_spec104_mustSignalOnErrorWhenFails();
257  }
258
259  @Override @Test
260  public void required_spec105_mustSignalOnCompleteWhenFiniteStreamTerminates() throws Throwable {
261    publisherVerification.required_spec105_mustSignalOnCompleteWhenFiniteStreamTerminates();
262  }
263
264  @Override @Test
265  public void optional_spec105_emptyStreamMustTerminateBySignallingOnComplete() throws Throwable {
266    publisherVerification.optional_spec105_emptyStreamMustTerminateBySignallingOnComplete();
267  }
268
269  @Override @Test
270  public void untested_spec106_mustConsiderSubscriptionCancelledAfterOnErrorOrOnCompleteHasBeenCalled() throws Throwable {
271    publisherVerification.untested_spec106_mustConsiderSubscriptionCancelledAfterOnErrorOrOnCompleteHasBeenCalled();
272  }
273
274  @Override @Test
275  public void required_spec107_mustNotEmitFurtherSignalsOnceOnCompleteHasBeenSignalled() throws Throwable {
276    publisherVerification.required_spec107_mustNotEmitFurtherSignalsOnceOnCompleteHasBeenSignalled();
277  }
278
279  @Override @Test
280  public void untested_spec107_mustNotEmitFurtherSignalsOnceOnErrorHasBeenSignalled() throws Throwable {
281    publisherVerification.untested_spec107_mustNotEmitFurtherSignalsOnceOnErrorHasBeenSignalled();
282  }
283
284  @Override @Test
285  public void untested_spec108_possiblyCanceledSubscriptionShouldNotReceiveOnErrorOrOnCompleteSignals() throws Throwable {
286    publisherVerification.untested_spec108_possiblyCanceledSubscriptionShouldNotReceiveOnErrorOrOnCompleteSignals();
287  }
288
289  @Override @Test
290  public void untested_spec109_subscribeShouldNotThrowNonFatalThrowable() throws Throwable {
291    publisherVerification.untested_spec109_subscribeShouldNotThrowNonFatalThrowable();
292  }
293
294  @Override @Test
295  public void required_spec109_subscribeThrowNPEOnNullSubscriber() throws Throwable {
296    publisherVerification.required_spec109_subscribeThrowNPEOnNullSubscriber();
297  }
298
299  @Override @Test
300  public void required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorAfterOnSubscribe() throws Throwable {
301    publisherVerification.required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorAfterOnSubscribe();
302  }
303
304  @Override @Test
305  public void required_spec109_mustIssueOnSubscribeForNonNullSubscriber() throws Throwable {
306    publisherVerification.required_spec109_mustIssueOnSubscribeForNonNullSubscriber();
307  }
308
309  @Override @Test
310  public void untested_spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice() throws Throwable {
311    publisherVerification.untested_spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice();
312  }
313
314  @Override @Test
315  public void optional_spec111_maySupportMultiSubscribe() throws Throwable {
316    publisherVerification.optional_spec111_maySupportMultiSubscribe();
317  }
318
319  @Override @Test
320  public void optional_spec111_registeredSubscribersMustReceiveOnNextOrOnCompleteSignals() throws Throwable {
321    publisherVerification.optional_spec111_registeredSubscribersMustReceiveOnNextOrOnCompleteSignals();
322  }
323
324  @Override @Test
325  public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne() throws Throwable {
326    publisherVerification.optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne();
327  }
328
329  @Override @Test
330  public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront() throws Throwable {
331    publisherVerification.optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront();
332  }
333
334  @Override @Test
335  public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected() throws Throwable {
336    publisherVerification.optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected();
337  }
338
339  @Override @Test
340  public void required_spec302_mustAllowSynchronousRequestCallsFromOnNextAndOnSubscribe() throws Throwable {
341    publisherVerification.required_spec302_mustAllowSynchronousRequestCallsFromOnNextAndOnSubscribe();
342  }
343
344  @Override @Test
345  public void required_spec303_mustNotAllowUnboundedRecursion() throws Throwable {
346    publisherVerification.required_spec303_mustNotAllowUnboundedRecursion();
347  }
348
349  @Override @Test
350  public void untested_spec304_requestShouldNotPerformHeavyComputations() throws Exception {
351    publisherVerification.untested_spec304_requestShouldNotPerformHeavyComputations();
352  }
353
354  @Override @Test
355  public void untested_spec305_cancelMustNotSynchronouslyPerformHeavyComputation() throws Exception {
356    publisherVerification.untested_spec305_cancelMustNotSynchronouslyPerformHeavyComputation();
357  }
358
359  @Override @Test
360  public void required_spec306_afterSubscriptionIsCancelledRequestMustBeNops() throws Throwable {
361    publisherVerification.required_spec306_afterSubscriptionIsCancelledRequestMustBeNops();
362  }
363
364  @Override @Test
365  public void required_spec307_afterSubscriptionIsCancelledAdditionalCancelationsMustBeNops() throws Throwable {
366    publisherVerification.required_spec307_afterSubscriptionIsCancelledAdditionalCancelationsMustBeNops();
367  }
368
369  @Override @Test
370  public void required_spec309_requestZeroMustSignalIllegalArgumentException() throws Throwable {
371    publisherVerification.required_spec309_requestZeroMustSignalIllegalArgumentException();
372  }
373
374  @Override @Test
375  public void required_spec309_requestNegativeNumberMustSignalIllegalArgumentException() throws Throwable {
376    publisherVerification.required_spec309_requestNegativeNumberMustSignalIllegalArgumentException();
377  }
378  
379  @Override @Test
380  public void optional_spec309_requestNegativeNumberMaySignalIllegalArgumentExceptionWithSpecificMessage() throws Throwable {
381    publisherVerification.optional_spec309_requestNegativeNumberMaySignalIllegalArgumentExceptionWithSpecificMessage();
382  }
383
384  @Override @Test
385  public void required_spec312_cancelMustMakeThePublisherToEventuallyStopSignaling() throws Throwable {
386    publisherVerification.required_spec312_cancelMustMakeThePublisherToEventuallyStopSignaling();
387  }
388
389  @Override @Test
390  public void required_spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber() throws Throwable {
391    publisherVerification.required_spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber();
392  }
393
394  @Override @Test
395  public void required_spec317_mustSupportAPendingElementCountUpToLongMaxValue() throws Throwable {
396    publisherVerification.required_spec317_mustSupportAPendingElementCountUpToLongMaxValue();
397  }
398
399  @Override @Test
400  public void required_spec317_mustSupportACumulativePendingElementCountUpToLongMaxValue() throws Throwable {
401    publisherVerification.required_spec317_mustSupportACumulativePendingElementCountUpToLongMaxValue();
402  }
403
404  @Override @Test
405  public void required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue() throws Throwable {
406    publisherVerification.required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue();
407  }
408
409
410  /**
411   * Asks for a {@code Processor} that supports at least 2 {@code Subscriber}s at once and checks if two {@code Subscriber}s
412   * receive the same items and a terminal {@code Exception}.
413   * <p>
414   * If the {@code Processor} requests and/or emits items only when all of its {@code Subscriber}s have requested,
415   * override {@link #doesCoordinatedEmission()} and return {@code true} to indicate this property.
416   * <p>
417   * <b>Verifies rule:</b> <a href='https://github.com/reactive-streams/reactive-streams-jvm#1.4'>1.4</a> with multiple
418   * {@code Subscriber}s.
419   * <p>
420   * The test is not executed if {@link IdentityProcessorVerification#maxSupportedSubscribers()} is less than 2.
421   * <p>
422   * If this test fails, the following could be checked within the {@code Processor} implementation:
423   * <ul>
424   * <li>The {@code TestEnvironment} has large enough timeout specified in case the {@code Processor} has some time-delay behavior.</li>
425   * <li>The {@code Processor} is able to fulfill requests of its {@code Subscriber}s independently of each other's requests or
426   * else override {@link #doesCoordinatedEmission()} and return {@code true} to indicate the test {@code Subscriber}s
427   * both have to request first.</li>
428   * </ul>
429   */
430  @Test
431  public void required_spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError() throws Throwable {
432    optionalMultipleSubscribersTest(2, new Function<Long,TestSetup>() {
433      @Override
434      public TestSetup apply(Long aLong) throws Throwable {
435        return new TestSetup(env, processorBufferSize) {{
436          final ManualSubscriberWithErrorCollection<T> sub1 = new ManualSubscriberWithErrorCollection<T>(env);
437          env.subscribe(processor, sub1);
438
439          final ManualSubscriberWithErrorCollection<T> sub2 = new ManualSubscriberWithErrorCollection<T>(env);
440          env.subscribe(processor, sub2);
441
442          final Exception ex = new RuntimeException("Test exception");
443
444          if (doesCoordinatedEmission()) {
445            sub1.request(1);
446            sub2.request(1);
447
448            expectRequest();
449
450            final T x = sendNextTFromUpstream();
451
452            expectNextElement(sub1, x);
453            expectNextElement(sub2, x);
454
455            sub1.request(1);
456            sub2.request(1);
457          } else {
458            sub1.request(1);
459
460            expectRequest(env.defaultTimeoutMillis(),
461                    "If the Processor coordinates requests/emissions when having multiple Subscribers"
462                    + " at once, please override doesCoordinatedEmission() to return true in this "
463                    + "IdentityProcessorVerification to allow this test to pass.");
464
465            final T x = sendNextTFromUpstream();
466            expectNextElement(sub1, x,
467                    "If the Processor coordinates requests/emissions when having multiple Subscribers"
468                            + " at once, please override doesCoordinatedEmission() to return true in this "
469                            + "IdentityProcessorVerification to allow this test to pass.");
470
471            sub1.request(1);
472
473            // sub1 has received one element, and has one demand pending
474            // sub2 has not yet requested anything
475          }
476          sendError(ex);
477
478          sub1.expectError(ex);
479          sub2.expectError(ex);
480
481          env.verifyNoAsyncErrorsNoDelay();
482        }};
483      }
484    });
485  }
486
487  ////////////////////// SUBSCRIBER RULES VERIFICATION ///////////////////////////
488  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#4.1
489
490  // A Processor
491  //   must obey all Subscriber rules on its consuming side
492  public Subscriber<T> createSubscriber(final SubscriberWhiteboxVerification.WhiteboxSubscriberProbe<T> probe) {
493    final Processor<T, T> processor = createIdentityProcessor(processorBufferSize);
494    processor.subscribe(
495        new Subscriber<T>() {
496          private final Promise<Subscription> subs = new Promise<Subscription>(env);
497
498          @Override
499          public void onSubscribe(final Subscription subscription) {
500            if (env.debugEnabled()) {
501              env.debug(String.format("whiteboxSubscriber::onSubscribe(%s)", subscription));
502            }
503            if (subs.isCompleted()) subscription.cancel(); // the Probe must also pass subscriber verification
504
505            probe.registerOnSubscribe(new SubscriberWhiteboxVerification.SubscriberPuppet() {
506
507              @Override
508              public void triggerRequest(long elements) {
509                subscription.request(elements);
510              }
511
512              @Override
513              public void signalCancel() {
514                subscription.cancel();
515              }
516            });
517          }
518
519          @Override
520          public void onNext(T element) {
521            if (env.debugEnabled()) {
522              env.debug(String.format("whiteboxSubscriber::onNext(%s)", element));
523            }
524            probe.registerOnNext(element);
525          }
526
527          @Override
528          public void onComplete() {
529            if (env.debugEnabled()) {
530              env.debug("whiteboxSubscriber::onComplete()");
531            }
532            probe.registerOnComplete();
533          }
534
535          @Override
536          public void onError(Throwable cause) {
537            if (env.debugEnabled()) {
538              env.debug(String.format("whiteboxSubscriber::onError(%s)", cause));
539            }
540            probe.registerOnError(cause);
541          }
542        });
543
544    return processor; // we run the SubscriberVerification against this
545  }
546
547  ////////////////////// OTHER RULE VERIFICATION ///////////////////////////
548
549  // A Processor
550  //   must immediately pass on `onError` events received from its upstream to its downstream
551  @Test
552  public void mustImmediatelyPassOnOnErrorEventsReceivedFromItsUpstreamToItsDownstream() throws Exception {
553    new TestSetup(env, processorBufferSize) {{
554      final ManualSubscriberWithErrorCollection<T> sub = new ManualSubscriberWithErrorCollection<T>(env);
555      env.subscribe(processor, sub);
556
557      final Exception ex = new RuntimeException("Test exception");
558      sendError(ex);
559      sub.expectError(ex); // "immediately", i.e. without a preceding request
560
561      env.verifyNoAsyncErrorsNoDelay();
562    }};
563  }
564
565  /////////////////////// DELEGATED TESTS, A PROCESSOR "IS A" SUBSCRIBER //////////////////////
566  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#4.1
567
568  @Test
569  public void required_exerciseWhiteboxHappyPath() throws Throwable {
570    subscriberVerification.required_exerciseWhiteboxHappyPath();
571  }
572
573  @Override @Test
574  public void required_spec201_mustSignalDemandViaSubscriptionRequest() throws Throwable {
575    subscriberVerification.required_spec201_mustSignalDemandViaSubscriptionRequest();
576  }
577
578  @Override @Test
579  public void untested_spec202_shouldAsynchronouslyDispatch() throws Exception {
580    subscriberVerification.untested_spec202_shouldAsynchronouslyDispatch();
581  }
582
583  @Override @Test
584  public void required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnComplete() throws Throwable {
585    subscriberVerification.required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnComplete();
586  }
587
588  @Override @Test
589  public void required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnError() throws Throwable {
590    subscriberVerification.required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnError();
591  }
592
593  @Override @Test
594  public void untested_spec204_mustConsiderTheSubscriptionAsCancelledInAfterRecievingOnCompleteOrOnError() throws Exception {
595    subscriberVerification.untested_spec204_mustConsiderTheSubscriptionAsCancelledInAfterRecievingOnCompleteOrOnError();
596  }
597
598  @Override @Test
599  public void required_spec205_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal() throws Throwable {
600    subscriberVerification.required_spec205_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal();
601  }
602
603  @Override @Test
604  public void untested_spec206_mustCallSubscriptionCancelIfItIsNoLongerValid() throws Exception {
605    subscriberVerification.untested_spec206_mustCallSubscriptionCancelIfItIsNoLongerValid();
606  }
607
608  @Override @Test
609  public void untested_spec207_mustEnsureAllCallsOnItsSubscriptionTakePlaceFromTheSameThreadOrTakeCareOfSynchronization() throws Exception {
610    subscriberVerification.untested_spec207_mustEnsureAllCallsOnItsSubscriptionTakePlaceFromTheSameThreadOrTakeCareOfSynchronization();
611  }
612
613  @Override @Test
614  public void required_spec208_mustBePreparedToReceiveOnNextSignalsAfterHavingCalledSubscriptionCancel() throws Throwable {
615    subscriberVerification.required_spec208_mustBePreparedToReceiveOnNextSignalsAfterHavingCalledSubscriptionCancel();
616  }
617
618  @Override @Test
619  public void required_spec209_mustBePreparedToReceiveAnOnCompleteSignalWithPrecedingRequestCall() throws Throwable {
620    subscriberVerification.required_spec209_mustBePreparedToReceiveAnOnCompleteSignalWithPrecedingRequestCall();
621  }
622
623  @Override @Test
624  public void required_spec209_mustBePreparedToReceiveAnOnCompleteSignalWithoutPrecedingRequestCall() throws Throwable {
625    subscriberVerification.required_spec209_mustBePreparedToReceiveAnOnCompleteSignalWithoutPrecedingRequestCall();
626  }
627
628  @Override @Test
629  public void required_spec210_mustBePreparedToReceiveAnOnErrorSignalWithPrecedingRequestCall() throws Throwable {
630    subscriberVerification.required_spec210_mustBePreparedToReceiveAnOnErrorSignalWithPrecedingRequestCall();
631  }
632
633  @Override @Test
634  public void required_spec210_mustBePreparedToReceiveAnOnErrorSignalWithoutPrecedingRequestCall() throws Throwable {
635    subscriberVerification.required_spec210_mustBePreparedToReceiveAnOnErrorSignalWithoutPrecedingRequestCall();
636  }
637
638  @Override @Test
639  public void untested_spec211_mustMakeSureThatAllCallsOnItsMethodsHappenBeforeTheProcessingOfTheRespectiveEvents() throws Exception {
640    subscriberVerification.untested_spec211_mustMakeSureThatAllCallsOnItsMethodsHappenBeforeTheProcessingOfTheRespectiveEvents();
641  }
642
643  @Override @Test
644  public void untested_spec212_mustNotCallOnSubscribeMoreThanOnceBasedOnObjectEquality_specViolation() throws Throwable {
645    subscriberVerification.untested_spec212_mustNotCallOnSubscribeMoreThanOnceBasedOnObjectEquality_specViolation();
646  }
647
648  @Override @Test
649  public void untested_spec213_failingOnSignalInvocation() throws Exception {
650    subscriberVerification.untested_spec213_failingOnSignalInvocation();
651  }
652
653  @Override @Test
654  public void required_spec213_onSubscribe_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable {
655    subscriberVerification.required_spec213_onSubscribe_mustThrowNullPointerExceptionWhenParametersAreNull();
656  }
657  @Override @Test
658  public void required_spec213_onNext_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable {
659    subscriberVerification.required_spec213_onNext_mustThrowNullPointerExceptionWhenParametersAreNull();
660  }
661  @Override @Test
662  public void required_spec213_onError_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable {
663    subscriberVerification.required_spec213_onError_mustThrowNullPointerExceptionWhenParametersAreNull();
664  }
665
666  @Override @Test
667  public void untested_spec301_mustNotBeCalledOutsideSubscriberContext() throws Exception {
668    subscriberVerification.untested_spec301_mustNotBeCalledOutsideSubscriberContext();
669  }
670
671  @Override @Test
672  public void required_spec308_requestMustRegisterGivenNumberElementsToBeProduced() throws Throwable {
673    subscriberVerification.required_spec308_requestMustRegisterGivenNumberElementsToBeProduced();
674  }
675
676  @Override @Test
677  public void untested_spec310_requestMaySynchronouslyCallOnNextOnSubscriber() throws Exception {
678    subscriberVerification.untested_spec310_requestMaySynchronouslyCallOnNextOnSubscriber();
679  }
680
681  @Override @Test
682  public void untested_spec311_requestMaySynchronouslyCallOnCompleteOrOnError() throws Exception {
683    subscriberVerification.untested_spec311_requestMaySynchronouslyCallOnCompleteOrOnError();
684  }
685
686  @Override @Test
687  public void untested_spec314_cancelMayCauseThePublisherToShutdownIfNoOtherSubscriptionExists() throws Exception {
688    subscriberVerification.untested_spec314_cancelMayCauseThePublisherToShutdownIfNoOtherSubscriptionExists();
689  }
690
691  @Override @Test
692  public void untested_spec315_cancelMustNotThrowExceptionAndMustSignalOnError() throws Exception {
693    subscriberVerification.untested_spec315_cancelMustNotThrowExceptionAndMustSignalOnError();
694  }
695
696  @Override @Test
697  public void untested_spec316_requestMustNotThrowExceptionAndMustOnErrorTheSubscriber() throws Exception {
698    subscriberVerification.untested_spec316_requestMustNotThrowExceptionAndMustOnErrorTheSubscriber();
699  }
700
701  /////////////////////// ADDITIONAL "COROLLARY" TESTS //////////////////////
702
703  /**
704   * Asks for a {@code Processor} that supports at least 2 {@code Subscriber}s at once and checks requests
705   * from {@code Subscriber}s will eventually lead to requests towards the upstream of the {@code Processor}.
706   * <p>
707   * If the {@code Processor} requests and/or emits items only when all of its {@code Subscriber}s have requested,
708   * override {@link #doesCoordinatedEmission()} and return {@code true} to indicate this property.
709   * <p>
710   * <b>Verifies rule:</b> <a href='https://github.com/reactive-streams/reactive-streams-jvm#1.4'>2.1</a> with multiple
711   * {@code Subscriber}s.
712   * <p>
713   * The test is not executed if {@link IdentityProcessorVerification#maxSupportedSubscribers()} is less than 2.
714   * <p>
715   * If this test fails, the following could be checked within the {@code Processor} implementation:
716   * <ul>
717   * <li>The {@code TestEnvironment} has large enough timeout specified in case the {@code Processor} has some time-delay behavior.</li>
718   * <li>The {@code Processor} is able to fulfill requests of its {@code Subscriber}s independently of each other's requests or
719   * else override {@link #doesCoordinatedEmission()} and return {@code true} to indicate the test {@code Subscriber}s
720   * both have to request first.</li>
721   * </ul>
722   */
723  @Test
724  public void required_mustRequestFromUpstreamForElementsThatHaveBeenRequestedLongAgo() throws Throwable {
725    optionalMultipleSubscribersTest(2, new Function<Long,TestSetup>() {
726      @Override
727      public TestSetup apply(Long subscribers) throws Throwable {
728        return new TestSetup(env, processorBufferSize) {{
729          ManualSubscriber<T> sub1 = newSubscriber();
730          sub1.request(20);
731
732          long totalRequests = expectRequest();
733          final T x = sendNextTFromUpstream();
734          expectNextElement(sub1, x);
735
736          if (totalRequests == 1) {
737            totalRequests += expectRequest();
738          }
739          final T y = sendNextTFromUpstream();
740          expectNextElement(sub1, y);
741
742          if (totalRequests == 2) {
743            totalRequests += expectRequest();
744          }
745
746          final ManualSubscriber<T> sub2 = newSubscriber();
747
748          // sub1 now has 18 pending
749          // sub2 has 0 pending
750
751          if (doesCoordinatedEmission()) {
752            sub2.expectNone(); // since sub2 hasn't requested anything yet
753
754            sub2.request(1);
755
756            final T z = sendNextTFromUpstream();
757            expectNextElement(sub1, z);
758            expectNextElement(sub2, z);
759          } else {
760            final T z = sendNextTFromUpstream();
761            expectNextElement(sub1, z,
762                    "If the Processor coordinates requests/emissions when having multiple Subscribers"
763                            + " at once, please override doesCoordinatedEmission() to return true in this "
764                            + "IdentityProcessorVerification to allow this test to pass.");
765            sub2.expectNone(); // since sub2 hasn't requested anything yet
766
767            sub2.request(1);
768            expectNextElement(sub2, z);
769          }
770          if (totalRequests == 3) {
771            expectRequest();
772          }
773
774          // to avoid error messages during test harness shutdown
775          sendCompletion();
776          sub1.expectCompletion(env.defaultTimeoutMillis());
777          sub2.expectCompletion(env.defaultTimeoutMillis());
778
779          env.verifyNoAsyncErrorsNoDelay();
780        }};
781      }
782    });
783  }
784
785  /////////////////////// TEST INFRASTRUCTURE //////////////////////
786
787  public void notVerified() {
788    publisherVerification.notVerified();
789  }
790
791  public void notVerified(String message) {
792    publisherVerification.notVerified(message);
793  }
794
795  /**
796   * Test for feature that REQUIRES multiple subscribers to be supported by Publisher.
797   */
798  public void optionalMultipleSubscribersTest(long requiredSubscribersSupport, Function<Long, TestSetup> body) throws Throwable {
799    if (requiredSubscribersSupport > maxSupportedSubscribers())
800      notVerified(String.format("The Publisher under test only supports %d subscribers, while this test requires at least %d to run.",
801                                maxSupportedSubscribers(), requiredSubscribersSupport));
802    else body.apply(requiredSubscribersSupport);
803  }
804
805  public abstract class TestSetup extends ManualPublisher<T> {
806    final private ManualSubscriber<T> tees; // gives us access to an infinite stream of T values
807    private Set<T> seenTees = new HashSet<T>();
808
809    final Processor<T, T> processor;
810
811    public TestSetup(TestEnvironment env, int testBufferSize) throws InterruptedException {
812      super(env);
813      tees = env.newManualSubscriber(createHelperPublisher(Long.MAX_VALUE));
814      processor = createIdentityProcessor(testBufferSize);
815      subscribe(processor);
816    }
817
818    public ManualSubscriber<T> newSubscriber() throws InterruptedException {
819      return env.newManualSubscriber(processor);
820    }
821
822    public T nextT() throws InterruptedException {
823      final T t = tees.requestNextElement();
824      if (seenTees.contains(t)) {
825        env.flop(String.format("Helper publisher illegally produced the same element %s twice", t));
826      }
827      seenTees.add(t);
828      return t;
829    }
830
831    public void expectNextElement(ManualSubscriber<T> sub, T expected) throws InterruptedException {
832      final T elem = sub.nextElement(String.format("timeout while awaiting %s", expected));
833      if (!elem.equals(expected)) {
834        env.flop(String.format("Received `onNext(%s)` on downstream but expected `onNext(%s)`", elem, expected));
835      }
836    }
837
838    public void expectNextElement(ManualSubscriber<T> sub, T expected, String errorMessageAddendum) throws InterruptedException {
839      final T elem = sub.nextElement(String.format("timeout while awaiting %s. %s", expected, errorMessageAddendum));
840      if (!elem.equals(expected)) {
841        env.flop(String.format("Received `onNext(%s)` on downstream but expected `onNext(%s)`", elem, expected));
842      }
843    }
844
845    public T sendNextTFromUpstream() throws InterruptedException {
846      final T x = nextT();
847      sendNext(x);
848      return x;
849    }
850  }
851
852  public class ManualSubscriberWithErrorCollection<A> extends ManualSubscriberWithSubscriptionSupport<A> {
853    Promise<Throwable> error;
854
855    public ManualSubscriberWithErrorCollection(TestEnvironment env) {
856      super(env);
857      error = new Promise<Throwable>(env);
858    }
859
860    @Override
861    public void onError(Throwable cause) {
862      error.complete(cause);
863    }
864
865    public void expectError(Throwable cause) throws InterruptedException {
866      expectError(cause, env.defaultTimeoutMillis());
867    }
868
869    @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
870    public void expectError(Throwable cause, long timeoutMillis) throws InterruptedException {
871      error.expectCompletion(timeoutMillis, "Did not receive expected error on downstream");
872      if (!cause.equals(error.value())) {
873        env.flop(String.format("Expected error %s but got %s", cause, error.value()));
874      }
875    }
876  }
877}