001/************************************************************************
002 * Licensed under Public Domain (CC0)                                    *
003 *                                                                       *
004 * To the extent possible under law, the person who associated CC0 with  *
005 * this code has waived all copyright and related or neighboring         *
006 * rights to this code.                                                  *
007 *                                                                       *
008 * You should have received a copy of the CC0 legalcode along with this  *
009 * work. If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.*
010 ************************************************************************/
011
012package org.reactivestreams.tck;
013
014import org.reactivestreams.Processor;
015import org.reactivestreams.Publisher;
016import org.reactivestreams.Subscriber;
017import org.reactivestreams.Subscription;
018import org.reactivestreams.tck.TestEnvironment.ManualPublisher;
019import org.reactivestreams.tck.TestEnvironment.ManualSubscriber;
020import org.reactivestreams.tck.TestEnvironment.ManualSubscriberWithSubscriptionSupport;
021import org.reactivestreams.tck.TestEnvironment.Promise;
022import org.reactivestreams.tck.flow.support.Function;
023import org.reactivestreams.tck.flow.support.SubscriberWhiteboxVerificationRules;
024import org.reactivestreams.tck.flow.support.PublisherVerificationRules;
025import org.testng.annotations.BeforeMethod;
026import org.testng.annotations.Test;
027
028import java.util.HashSet;
029import java.util.Set;
030
031public abstract class IdentityProcessorVerification<T> extends WithHelperPublisher<T>
032  implements SubscriberWhiteboxVerificationRules, PublisherVerificationRules {
033
034  private final TestEnvironment env;
035
036  ////////////////////// DELEGATED TO SPECS //////////////////////
037
038  // for delegating tests
039  private final SubscriberWhiteboxVerification<T> subscriberVerification;
040
041  // for delegating tests
042  private final PublisherVerification<T> publisherVerification;
043
044  ////////////////// END OF DELEGATED TO SPECS //////////////////
045
046  // number of elements the processor under test must be able ot buffer,
047  // without dropping elements. Defaults to `TestEnvironment.TEST_BUFFER_SIZE`.
048  private final int processorBufferSize;
049
050  /**
051   * Test class must specify the expected time it takes for the publisher to
052   * shut itself down when the the last downstream {@code Subscription} is cancelled.
053   *
054   * The processor will be required to be able to buffer {@code TestEnvironment.TEST_BUFFER_SIZE} elements.
055   */
056  @SuppressWarnings("unused")
057  public IdentityProcessorVerification(final TestEnvironment env) {
058    this(env, PublisherVerification.envPublisherReferenceGCTimeoutMillis(), TestEnvironment.TEST_BUFFER_SIZE);
059  }
060
061  /**
062   * Test class must specify the expected time it takes for the publisher to
063   * shut itself down when the the last downstream {@code Subscription} is cancelled.
064   *
065   * The processor will be required to be able to buffer {@code TestEnvironment.TEST_BUFFER_SIZE} elements.
066   *
067   * @param publisherReferenceGCTimeoutMillis used to determine after how much time a reference to a Subscriber should be already dropped by the Publisher.
068   */
069  @SuppressWarnings("unused")
070  public IdentityProcessorVerification(final TestEnvironment env, long publisherReferenceGCTimeoutMillis) {
071    this(env, publisherReferenceGCTimeoutMillis, TestEnvironment.TEST_BUFFER_SIZE);
072  }
073
074  /**
075   * Test class must specify the expected time it takes for the publisher to
076   * shut itself down when the the last downstream {@code Subscription} is cancelled.
077   *
078   * @param publisherReferenceGCTimeoutMillis used to determine after how much time a reference to a Subscriber should be already dropped by the Publisher.
079   * @param processorBufferSize            number of elements the processor is required to be able to buffer.
080   */
081  public IdentityProcessorVerification(final TestEnvironment env, long publisherReferenceGCTimeoutMillis, int processorBufferSize) {
082    this.env = env;
083    this.processorBufferSize = processorBufferSize;
084
085    this.subscriberVerification = new SubscriberWhiteboxVerification<T>(env) {
086      @Override
087      public Subscriber<T> createSubscriber(WhiteboxSubscriberProbe<T> probe) {
088        return IdentityProcessorVerification.this.createSubscriber(probe);
089      }
090
091      @Override public T createElement(int element) {
092        return IdentityProcessorVerification.this.createElement(element);
093      }
094
095      @Override
096      public Publisher<T> createHelperPublisher(long elements) {
097        return IdentityProcessorVerification.this.createHelperPublisher(elements);
098      }
099    };
100
101    publisherVerification = new PublisherVerification<T>(env, publisherReferenceGCTimeoutMillis) {
102      @Override
103      public Publisher<T> createPublisher(long elements) {
104        return IdentityProcessorVerification.this.createPublisher(elements);
105      }
106
107      @Override
108      public Publisher<T> createFailedPublisher() {
109        return IdentityProcessorVerification.this.createFailedPublisher();
110      }
111
112      @Override
113      public long maxElementsFromPublisher() {
114        return IdentityProcessorVerification.this.maxElementsFromPublisher();
115      }
116
117      @Override
118      public long boundedDepthOfOnNextAndRequestRecursion() {
119        return IdentityProcessorVerification.this.boundedDepthOfOnNextAndRequestRecursion();
120      }
121
122      @Override
123      public boolean skipStochasticTests() {
124        return IdentityProcessorVerification.this.skipStochasticTests();
125      }
126    };
127  }
128
129  /**
130   * This is the main method you must implement in your test incarnation.
131   * It must create a {@link Processor}, which simply forwards all stream elements from its upstream
132   * to its downstream. It must be able to internally buffer the given number of elements.
133   *
134   * @param bufferSize number of elements the processor is required to be able to buffer.
135   */
136  public abstract Processor<T, T> createIdentityProcessor(int bufferSize);
137
138  /**
139   * By implementing this method, additional TCK tests concerning a "failed" publishers will be run.
140   *
141   * The expected behaviour of the {@link Publisher} returned by this method is hand out a subscription,
142   * followed by signalling {@code onError} on it, as specified by Rule 1.9.
143   *
144   * If you want to ignore these additional tests, return {@code null} from this method.
145   */
146  public abstract Publisher<T> createFailedPublisher();
147
148  /**
149   * Override and return lower value if your Publisher is only able to produce a known number of elements.
150   * For example, if it is designed to return at-most-one element, return {@code 1} from this method.
151   *
152   * Defaults to {@code Long.MAX_VALUE - 1}, meaning that the Publisher can be produce a huge but NOT an unbounded number of elements.
153   *
154   * To mark your Publisher will *never* signal an {@code onComplete} override this method and return {@code Long.MAX_VALUE},
155   * which will result in *skipping all tests which require an onComplete to be triggered* (!).
156   */
157  public long maxElementsFromPublisher() {
158    return Long.MAX_VALUE - 1;
159  }
160
161  /**
162   * In order to verify rule 3.3 of the reactive streams spec, this number will be used to check if a
163   * {@code Subscription} actually solves the "unbounded recursion" problem by not allowing the number of
164   * recursive calls to exceed the number returned by this method.
165   *
166   * @see <a href="https://github.com/reactive-streams/reactive-streams-jvm#3.3">reactive streams spec, rule 3.3</a>
167   * @see PublisherVerification#required_spec303_mustNotAllowUnboundedRecursion()
168   */
169  public long boundedDepthOfOnNextAndRequestRecursion() {
170    return 1;
171  }
172
173  /**
174   * Override and return {@code true} in order to skip executing tests marked as {@code Stochastic}.
175   * Stochastic in this case means that the Rule is impossible or infeasible to deterministically verify—
176   * usually this means that this test case can yield false positives ("be green") even if for some case,
177   * the given implementation may violate the tested behaviour.
178   */
179  public boolean skipStochasticTests() {
180    return false;
181  }
182
183  /**
184   * Describes the tested implementation in terms of how many subscribers they can support.
185   * Some tests require the {@code Publisher} under test to support multiple Subscribers,
186   * yet the spec does not require all publishers to be able to do so, thus – if an implementation
187   * supports only a limited number of subscribers (e.g. only 1 subscriber, also known as "no fanout")
188   * you MUST return that number from this method by overriding it.
189   */
190  public long maxSupportedSubscribers() {
191      return Long.MAX_VALUE;
192  }
193
194  /**
195   * Override this method and return {@code true} if the {@link Processor} returned by the
196   * {@link #createIdentityProcessor(int)} coordinates its {@link Subscriber}s
197   * request amounts and only delivers onNext signals if all Subscribers have
198   * indicated (via their Subscription#request(long)) they are ready to receive elements.
199   */
200  public boolean doesCoordinatedEmission() {
201    return false;
202  }
203
204  ////////////////////// TEST ENV CLEANUP /////////////////////////////////////
205
206  @BeforeMethod
207  public void setUp() throws Exception {
208    publisherVerification.setUp();
209    subscriberVerification.setUp();
210  }
211
212  ////////////////////// PUBLISHER RULES VERIFICATION ///////////////////////////
213
214  // A Processor
215  //   must obey all Publisher rules on its publishing side
216  public Publisher<T> createPublisher(long elements) {
217    final Processor<T, T> processor = createIdentityProcessor(processorBufferSize);
218    final Publisher<T> pub = createHelperPublisher(elements);
219    pub.subscribe(processor);
220    return processor; // we run the PublisherVerification against this
221  }
222
223  @Override @Test
224  public void required_validate_maxElementsFromPublisher() throws Exception {
225    publisherVerification.required_validate_maxElementsFromPublisher();
226  }
227
228  @Override @Test
229  public void required_validate_boundedDepthOfOnNextAndRequestRecursion() throws Exception {
230    publisherVerification.required_validate_boundedDepthOfOnNextAndRequestRecursion();
231  }
232
233  /////////////////////// DELEGATED TESTS, A PROCESSOR "IS A" PUBLISHER //////////////////////
234  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#4.1
235
236  @Test
237  public void required_createPublisher1MustProduceAStreamOfExactly1Element() throws Throwable {
238    publisherVerification.required_createPublisher1MustProduceAStreamOfExactly1Element();
239  }
240
241  @Test
242  public void required_createPublisher3MustProduceAStreamOfExactly3Elements() throws Throwable {
243    publisherVerification.required_createPublisher3MustProduceAStreamOfExactly3Elements();
244  }
245
246  @Override @Test
247  public void required_spec101_subscriptionRequestMustResultInTheCorrectNumberOfProducedElements() throws Throwable {
248    publisherVerification.required_spec101_subscriptionRequestMustResultInTheCorrectNumberOfProducedElements();
249  }
250
251  @Override @Test
252  public void required_spec102_maySignalLessThanRequestedAndTerminateSubscription() throws Throwable {
253    publisherVerification.required_spec102_maySignalLessThanRequestedAndTerminateSubscription();
254  }
255
256  @Override @Test
257  public void stochastic_spec103_mustSignalOnMethodsSequentially() throws Throwable {
258    publisherVerification.stochastic_spec103_mustSignalOnMethodsSequentially();
259  }
260
261  @Override @Test
262  public void optional_spec104_mustSignalOnErrorWhenFails() throws Throwable {
263    publisherVerification.optional_spec104_mustSignalOnErrorWhenFails();
264  }
265
266  @Override @Test
267  public void required_spec105_mustSignalOnCompleteWhenFiniteStreamTerminates() throws Throwable {
268    publisherVerification.required_spec105_mustSignalOnCompleteWhenFiniteStreamTerminates();
269  }
270
271  @Override @Test
272  public void optional_spec105_emptyStreamMustTerminateBySignallingOnComplete() throws Throwable {
273    publisherVerification.optional_spec105_emptyStreamMustTerminateBySignallingOnComplete();
274  }
275
276  @Override @Test
277  public void untested_spec106_mustConsiderSubscriptionCancelledAfterOnErrorOrOnCompleteHasBeenCalled() throws Throwable {
278    publisherVerification.untested_spec106_mustConsiderSubscriptionCancelledAfterOnErrorOrOnCompleteHasBeenCalled();
279  }
280
281  @Override @Test
282  public void required_spec107_mustNotEmitFurtherSignalsOnceOnCompleteHasBeenSignalled() throws Throwable {
283    publisherVerification.required_spec107_mustNotEmitFurtherSignalsOnceOnCompleteHasBeenSignalled();
284  }
285
286  @Override @Test
287  public void untested_spec107_mustNotEmitFurtherSignalsOnceOnErrorHasBeenSignalled() throws Throwable {
288    publisherVerification.untested_spec107_mustNotEmitFurtherSignalsOnceOnErrorHasBeenSignalled();
289  }
290
291  @Override @Test
292  public void untested_spec108_possiblyCanceledSubscriptionShouldNotReceiveOnErrorOrOnCompleteSignals() throws Throwable {
293    publisherVerification.untested_spec108_possiblyCanceledSubscriptionShouldNotReceiveOnErrorOrOnCompleteSignals();
294  }
295
296  @Override @Test
297  public void untested_spec109_subscribeShouldNotThrowNonFatalThrowable() throws Throwable {
298    publisherVerification.untested_spec109_subscribeShouldNotThrowNonFatalThrowable();
299  }
300
301  @Override @Test
302  public void required_spec109_subscribeThrowNPEOnNullSubscriber() throws Throwable {
303    publisherVerification.required_spec109_subscribeThrowNPEOnNullSubscriber();
304  }
305
306  @Override @Test
307  public void required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorAfterOnSubscribe() throws Throwable {
308    publisherVerification.required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorAfterOnSubscribe();
309  }
310
311  @Override @Test
312  public void required_spec109_mustIssueOnSubscribeForNonNullSubscriber() throws Throwable {
313    publisherVerification.required_spec109_mustIssueOnSubscribeForNonNullSubscriber();
314  }
315
316  @Override @Test
317  public void untested_spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice() throws Throwable {
318    publisherVerification.untested_spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice();
319  }
320
321  @Override @Test
322  public void optional_spec111_maySupportMultiSubscribe() throws Throwable {
323    publisherVerification.optional_spec111_maySupportMultiSubscribe();
324  }
325
326  @Override @Test
327  public void optional_spec111_registeredSubscribersMustReceiveOnNextOrOnCompleteSignals() throws Throwable {
328    publisherVerification.optional_spec111_registeredSubscribersMustReceiveOnNextOrOnCompleteSignals();
329  }
330
331  @Override @Test
332  public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne() throws Throwable {
333    publisherVerification.optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne();
334  }
335
336  @Override @Test
337  public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront() throws Throwable {
338    publisherVerification.optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront();
339  }
340
341  @Override @Test
342  public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected() throws Throwable {
343    publisherVerification.optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected();
344  }
345
346  @Override @Test
347  public void required_spec302_mustAllowSynchronousRequestCallsFromOnNextAndOnSubscribe() throws Throwable {
348    publisherVerification.required_spec302_mustAllowSynchronousRequestCallsFromOnNextAndOnSubscribe();
349  }
350
351  @Override @Test
352  public void required_spec303_mustNotAllowUnboundedRecursion() throws Throwable {
353    publisherVerification.required_spec303_mustNotAllowUnboundedRecursion();
354  }
355
356  @Override @Test
357  public void untested_spec304_requestShouldNotPerformHeavyComputations() throws Exception {
358    publisherVerification.untested_spec304_requestShouldNotPerformHeavyComputations();
359  }
360
361  @Override @Test
362  public void untested_spec305_cancelMustNotSynchronouslyPerformHeavyComputation() throws Exception {
363    publisherVerification.untested_spec305_cancelMustNotSynchronouslyPerformHeavyComputation();
364  }
365
366  @Override @Test
367  public void required_spec306_afterSubscriptionIsCancelledRequestMustBeNops() throws Throwable {
368    publisherVerification.required_spec306_afterSubscriptionIsCancelledRequestMustBeNops();
369  }
370
371  @Override @Test
372  public void required_spec307_afterSubscriptionIsCancelledAdditionalCancelationsMustBeNops() throws Throwable {
373    publisherVerification.required_spec307_afterSubscriptionIsCancelledAdditionalCancelationsMustBeNops();
374  }
375
376  @Override @Test
377  public void required_spec309_requestZeroMustSignalIllegalArgumentException() throws Throwable {
378    publisherVerification.required_spec309_requestZeroMustSignalIllegalArgumentException();
379  }
380
381  @Override @Test
382  public void required_spec309_requestNegativeNumberMustSignalIllegalArgumentException() throws Throwable {
383    publisherVerification.required_spec309_requestNegativeNumberMustSignalIllegalArgumentException();
384  }
385  
386  @Override @Test
387  public void optional_spec309_requestNegativeNumberMaySignalIllegalArgumentExceptionWithSpecificMessage() throws Throwable {
388    publisherVerification.optional_spec309_requestNegativeNumberMaySignalIllegalArgumentExceptionWithSpecificMessage();
389  }
390
391  @Override @Test
392  public void required_spec312_cancelMustMakeThePublisherToEventuallyStopSignaling() throws Throwable {
393    publisherVerification.required_spec312_cancelMustMakeThePublisherToEventuallyStopSignaling();
394  }
395
396  @Override @Test
397  public void required_spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber() throws Throwable {
398    publisherVerification.required_spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber();
399  }
400
401  @Override @Test
402  public void required_spec317_mustSupportAPendingElementCountUpToLongMaxValue() throws Throwable {
403    publisherVerification.required_spec317_mustSupportAPendingElementCountUpToLongMaxValue();
404  }
405
406  @Override @Test
407  public void required_spec317_mustSupportACumulativePendingElementCountUpToLongMaxValue() throws Throwable {
408    publisherVerification.required_spec317_mustSupportACumulativePendingElementCountUpToLongMaxValue();
409  }
410
411  @Override @Test
412  public void required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue() throws Throwable {
413    publisherVerification.required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue();
414  }
415
416
417  /**
418   * Asks for a {@code Processor} that supports at least 2 {@code Subscriber}s at once and checks if two {@code Subscriber}s
419   * receive the same items and a terminal {@code Exception}.
420   * <p>
421   * If the {@code Processor} requests and/or emits items only when all of its {@code Subscriber}s have requested,
422   * override {@link #doesCoordinatedEmission()} and return {@code true} to indicate this property.
423   * <p>
424   * <b>Verifies rule:</b> <a href='https://github.com/reactive-streams/reactive-streams-jvm#1.4'>1.4</a> with multiple
425   * {@code Subscriber}s.
426   * <p>
427   * The test is not executed if {@link IdentityProcessorVerification#maxSupportedSubscribers()} is less than 2.
428   * <p>
429   * If this test fails, the following could be checked within the {@code Processor} implementation:
430   * <ul>
431   * <li>The {@code TestEnvironment} has large enough timeout specified in case the {@code Processor} has some time-delay behavior.</li>
432   * <li>The {@code Processor} is able to fulfill requests of its {@code Subscriber}s independently of each other's requests or
433   * else override {@link #doesCoordinatedEmission()} and return {@code true} to indicate the test {@code Subscriber}s
434   * both have to request first.</li>
435   * </ul>
436   */
437  @Test
438  public void required_spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError() throws Throwable {
439    optionalMultipleSubscribersTest(2, new Function<Long,TestSetup>() {
440      @Override
441      public TestSetup apply(Long aLong) throws Throwable {
442        return new TestSetup(env, processorBufferSize) {{
443          final ManualSubscriberWithErrorCollection<T> sub1 = new ManualSubscriberWithErrorCollection<T>(env);
444          env.subscribe(processor, sub1);
445
446          final ManualSubscriberWithErrorCollection<T> sub2 = new ManualSubscriberWithErrorCollection<T>(env);
447          env.subscribe(processor, sub2);
448
449          final Exception ex = new RuntimeException("Test exception");
450
451          if (doesCoordinatedEmission()) {
452            sub1.request(1);
453            sub2.request(1);
454
455            expectRequest();
456
457            final T x = sendNextTFromUpstream();
458
459            expectNextElement(sub1, x);
460            expectNextElement(sub2, x);
461
462            sub1.request(1);
463            sub2.request(1);
464          } else {
465            sub1.request(1);
466
467            expectRequest(env.defaultTimeoutMillis(),
468                    "If the Processor coordinates requests/emissions when having multiple Subscribers"
469                    + " at once, please override doesCoordinatedEmission() to return true in this "
470                    + "IdentityProcessorVerification to allow this test to pass.");
471
472            final T x = sendNextTFromUpstream();
473            expectNextElement(sub1, x,
474                    "If the Processor coordinates requests/emissions when having multiple Subscribers"
475                            + " at once, please override doesCoordinatedEmission() to return true in this "
476                            + "IdentityProcessorVerification to allow this test to pass.");
477
478            sub1.request(1);
479
480            // sub1 has received one element, and has one demand pending
481            // sub2 has not yet requested anything
482          }
483          sendError(ex);
484
485          sub1.expectError(ex);
486          sub2.expectError(ex);
487
488          env.verifyNoAsyncErrorsNoDelay();
489        }};
490      }
491    });
492  }
493
494  ////////////////////// SUBSCRIBER RULES VERIFICATION ///////////////////////////
495  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#4.1
496
497  // A Processor
498  //   must obey all Subscriber rules on its consuming side
499  public Subscriber<T> createSubscriber(final SubscriberWhiteboxVerification.WhiteboxSubscriberProbe<T> probe) {
500    final Processor<T, T> processor = createIdentityProcessor(processorBufferSize);
501    processor.subscribe(
502        new Subscriber<T>() {
503          private final Promise<Subscription> subs = new Promise<Subscription>(env);
504
505          @Override
506          public void onSubscribe(final Subscription subscription) {
507            if (env.debugEnabled()) {
508              env.debug(String.format("whiteboxSubscriber::onSubscribe(%s)", subscription));
509            }
510            if (subs.isCompleted()) subscription.cancel(); // the Probe must also pass subscriber verification
511
512            probe.registerOnSubscribe(new SubscriberWhiteboxVerification.SubscriberPuppet() {
513
514              @Override
515              public void triggerRequest(long elements) {
516                subscription.request(elements);
517              }
518
519              @Override
520              public void signalCancel() {
521                subscription.cancel();
522              }
523            });
524          }
525
526          @Override
527          public void onNext(T element) {
528            if (env.debugEnabled()) {
529              env.debug(String.format("whiteboxSubscriber::onNext(%s)", element));
530            }
531            probe.registerOnNext(element);
532          }
533
534          @Override
535          public void onComplete() {
536            if (env.debugEnabled()) {
537              env.debug("whiteboxSubscriber::onComplete()");
538            }
539            probe.registerOnComplete();
540          }
541
542          @Override
543          public void onError(Throwable cause) {
544            if (env.debugEnabled()) {
545              env.debug(String.format("whiteboxSubscriber::onError(%s)", cause));
546            }
547            probe.registerOnError(cause);
548          }
549        });
550
551    return processor; // we run the SubscriberVerification against this
552  }
553
554  ////////////////////// OTHER RULE VERIFICATION ///////////////////////////
555
556  // A Processor
557  //   must immediately pass on `onError` events received from its upstream to its downstream
558  @Test
559  public void mustImmediatelyPassOnOnErrorEventsReceivedFromItsUpstreamToItsDownstream() throws Exception {
560    new TestSetup(env, processorBufferSize) {{
561      final ManualSubscriberWithErrorCollection<T> sub = new ManualSubscriberWithErrorCollection<T>(env);
562      env.subscribe(processor, sub);
563
564      final Exception ex = new RuntimeException("Test exception");
565      sendError(ex);
566      sub.expectError(ex); // "immediately", i.e. without a preceding request
567
568      env.verifyNoAsyncErrorsNoDelay();
569    }};
570  }
571
572  /////////////////////// DELEGATED TESTS, A PROCESSOR "IS A" SUBSCRIBER //////////////////////
573  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#4.1
574
575  @Test
576  public void required_exerciseWhiteboxHappyPath() throws Throwable {
577    subscriberVerification.required_exerciseWhiteboxHappyPath();
578  }
579
580  @Override @Test
581  public void required_spec201_mustSignalDemandViaSubscriptionRequest() throws Throwable {
582    subscriberVerification.required_spec201_mustSignalDemandViaSubscriptionRequest();
583  }
584
585  @Override @Test
586  public void untested_spec202_shouldAsynchronouslyDispatch() throws Exception {
587    subscriberVerification.untested_spec202_shouldAsynchronouslyDispatch();
588  }
589
590  @Override @Test
591  public void required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnComplete() throws Throwable {
592    subscriberVerification.required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnComplete();
593  }
594
595  @Override @Test
596  public void required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnError() throws Throwable {
597    subscriberVerification.required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnError();
598  }
599
600  @Override @Test
601  public void untested_spec204_mustConsiderTheSubscriptionAsCancelledInAfterRecievingOnCompleteOrOnError() throws Exception {
602    subscriberVerification.untested_spec204_mustConsiderTheSubscriptionAsCancelledInAfterRecievingOnCompleteOrOnError();
603  }
604
605  @Override @Test
606  public void required_spec205_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal() throws Throwable {
607    subscriberVerification.required_spec205_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal();
608  }
609
610  @Override @Test
611  public void untested_spec206_mustCallSubscriptionCancelIfItIsNoLongerValid() throws Exception {
612    subscriberVerification.untested_spec206_mustCallSubscriptionCancelIfItIsNoLongerValid();
613  }
614
615  @Override @Test
616  public void untested_spec207_mustEnsureAllCallsOnItsSubscriptionTakePlaceFromTheSameThreadOrTakeCareOfSynchronization() throws Exception {
617    subscriberVerification.untested_spec207_mustEnsureAllCallsOnItsSubscriptionTakePlaceFromTheSameThreadOrTakeCareOfSynchronization();
618  }
619
620  @Override @Test
621  public void required_spec208_mustBePreparedToReceiveOnNextSignalsAfterHavingCalledSubscriptionCancel() throws Throwable {
622    subscriberVerification.required_spec208_mustBePreparedToReceiveOnNextSignalsAfterHavingCalledSubscriptionCancel();
623  }
624
625  @Override @Test
626  public void required_spec209_mustBePreparedToReceiveAnOnCompleteSignalWithPrecedingRequestCall() throws Throwable {
627    subscriberVerification.required_spec209_mustBePreparedToReceiveAnOnCompleteSignalWithPrecedingRequestCall();
628  }
629
630  @Override @Test
631  public void required_spec209_mustBePreparedToReceiveAnOnCompleteSignalWithoutPrecedingRequestCall() throws Throwable {
632    subscriberVerification.required_spec209_mustBePreparedToReceiveAnOnCompleteSignalWithoutPrecedingRequestCall();
633  }
634
635  @Override @Test
636  public void required_spec210_mustBePreparedToReceiveAnOnErrorSignalWithPrecedingRequestCall() throws Throwable {
637    subscriberVerification.required_spec210_mustBePreparedToReceiveAnOnErrorSignalWithPrecedingRequestCall();
638  }
639
640  @Override @Test
641  public void required_spec210_mustBePreparedToReceiveAnOnErrorSignalWithoutPrecedingRequestCall() throws Throwable {
642    subscriberVerification.required_spec210_mustBePreparedToReceiveAnOnErrorSignalWithoutPrecedingRequestCall();
643  }
644
645  @Override @Test
646  public void untested_spec211_mustMakeSureThatAllCallsOnItsMethodsHappenBeforeTheProcessingOfTheRespectiveEvents() throws Exception {
647    subscriberVerification.untested_spec211_mustMakeSureThatAllCallsOnItsMethodsHappenBeforeTheProcessingOfTheRespectiveEvents();
648  }
649
650  @Override @Test
651  public void untested_spec212_mustNotCallOnSubscribeMoreThanOnceBasedOnObjectEquality_specViolation() throws Throwable {
652    subscriberVerification.untested_spec212_mustNotCallOnSubscribeMoreThanOnceBasedOnObjectEquality_specViolation();
653  }
654
655  @Override @Test
656  public void untested_spec213_failingOnSignalInvocation() throws Exception {
657    subscriberVerification.untested_spec213_failingOnSignalInvocation();
658  }
659
660  @Override @Test
661  public void required_spec213_onSubscribe_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable {
662    subscriberVerification.required_spec213_onSubscribe_mustThrowNullPointerExceptionWhenParametersAreNull();
663  }
664  @Override @Test
665  public void required_spec213_onNext_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable {
666    subscriberVerification.required_spec213_onNext_mustThrowNullPointerExceptionWhenParametersAreNull();
667  }
668  @Override @Test
669  public void required_spec213_onError_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable {
670    subscriberVerification.required_spec213_onError_mustThrowNullPointerExceptionWhenParametersAreNull();
671  }
672
673  @Override @Test
674  public void untested_spec301_mustNotBeCalledOutsideSubscriberContext() throws Exception {
675    subscriberVerification.untested_spec301_mustNotBeCalledOutsideSubscriberContext();
676  }
677
678  @Override @Test
679  public void required_spec308_requestMustRegisterGivenNumberElementsToBeProduced() throws Throwable {
680    subscriberVerification.required_spec308_requestMustRegisterGivenNumberElementsToBeProduced();
681  }
682
683  @Override @Test
684  public void untested_spec310_requestMaySynchronouslyCallOnNextOnSubscriber() throws Exception {
685    subscriberVerification.untested_spec310_requestMaySynchronouslyCallOnNextOnSubscriber();
686  }
687
688  @Override @Test
689  public void untested_spec311_requestMaySynchronouslyCallOnCompleteOrOnError() throws Exception {
690    subscriberVerification.untested_spec311_requestMaySynchronouslyCallOnCompleteOrOnError();
691  }
692
693  @Override @Test
694  public void untested_spec314_cancelMayCauseThePublisherToShutdownIfNoOtherSubscriptionExists() throws Exception {
695    subscriberVerification.untested_spec314_cancelMayCauseThePublisherToShutdownIfNoOtherSubscriptionExists();
696  }
697
698  @Override @Test
699  public void untested_spec315_cancelMustNotThrowExceptionAndMustSignalOnError() throws Exception {
700    subscriberVerification.untested_spec315_cancelMustNotThrowExceptionAndMustSignalOnError();
701  }
702
703  @Override @Test
704  public void untested_spec316_requestMustNotThrowExceptionAndMustOnErrorTheSubscriber() throws Exception {
705    subscriberVerification.untested_spec316_requestMustNotThrowExceptionAndMustOnErrorTheSubscriber();
706  }
707
708  /////////////////////// ADDITIONAL "COROLLARY" TESTS //////////////////////
709
710  /**
711   * Asks for a {@code Processor} that supports at least 2 {@code Subscriber}s at once and checks requests
712   * from {@code Subscriber}s will eventually lead to requests towards the upstream of the {@code Processor}.
713   * <p>
714   * If the {@code Processor} requests and/or emits items only when all of its {@code Subscriber}s have requested,
715   * override {@link #doesCoordinatedEmission()} and return {@code true} to indicate this property.
716   * <p>
717   * <b>Verifies rule:</b> <a href='https://github.com/reactive-streams/reactive-streams-jvm#1.4'>2.1</a> with multiple
718   * {@code Subscriber}s.
719   * <p>
720   * The test is not executed if {@link IdentityProcessorVerification#maxSupportedSubscribers()} is less than 2.
721   * <p>
722   * If this test fails, the following could be checked within the {@code Processor} implementation:
723   * <ul>
724   * <li>The {@code TestEnvironment} has large enough timeout specified in case the {@code Processor} has some time-delay behavior.</li>
725   * <li>The {@code Processor} is able to fulfill requests of its {@code Subscriber}s independently of each other's requests or
726   * else override {@link #doesCoordinatedEmission()} and return {@code true} to indicate the test {@code Subscriber}s
727   * both have to request first.</li>
728   * </ul>
729   */
730  @Test
731  public void required_mustRequestFromUpstreamForElementsThatHaveBeenRequestedLongAgo() throws Throwable {
732    optionalMultipleSubscribersTest(2, new Function<Long,TestSetup>() {
733      @Override
734      public TestSetup apply(Long subscribers) throws Throwable {
735        return new TestSetup(env, processorBufferSize) {{
736          ManualSubscriber<T> sub1 = newSubscriber();
737          sub1.request(20);
738
739          long totalRequests = expectRequest();
740          final T x = sendNextTFromUpstream();
741          expectNextElement(sub1, x);
742
743          if (totalRequests == 1) {
744            totalRequests += expectRequest();
745          }
746          final T y = sendNextTFromUpstream();
747          expectNextElement(sub1, y);
748
749          if (totalRequests == 2) {
750            totalRequests += expectRequest();
751          }
752
753          final ManualSubscriber<T> sub2 = newSubscriber();
754
755          // sub1 now has 18 pending
756          // sub2 has 0 pending
757
758          if (doesCoordinatedEmission()) {
759            sub2.expectNone(); // since sub2 hasn't requested anything yet
760
761            sub2.request(1);
762
763            final T z = sendNextTFromUpstream();
764            expectNextElement(sub1, z);
765            expectNextElement(sub2, z);
766          } else {
767            final T z = sendNextTFromUpstream();
768            expectNextElement(sub1, z,
769                    "If the Processor coordinates requests/emissions when having multiple Subscribers"
770                            + " at once, please override doesCoordinatedEmission() to return true in this "
771                            + "IdentityProcessorVerification to allow this test to pass.");
772            sub2.expectNone(); // since sub2 hasn't requested anything yet
773
774            sub2.request(1);
775            expectNextElement(sub2, z);
776          }
777          if (totalRequests == 3) {
778            expectRequest();
779          }
780
781          // to avoid error messages during test harness shutdown
782          sendCompletion();
783          sub1.expectCompletion(env.defaultTimeoutMillis());
784          sub2.expectCompletion(env.defaultTimeoutMillis());
785
786          env.verifyNoAsyncErrorsNoDelay();
787        }};
788      }
789    });
790  }
791
792  /////////////////////// TEST INFRASTRUCTURE //////////////////////
793
794  public void notVerified() {
795    publisherVerification.notVerified();
796  }
797
798  public void notVerified(String message) {
799    publisherVerification.notVerified(message);
800  }
801
802  /**
803   * Test for feature that REQUIRES multiple subscribers to be supported by Publisher.
804   */
805  public void optionalMultipleSubscribersTest(long requiredSubscribersSupport, Function<Long, TestSetup> body) throws Throwable {
806    if (requiredSubscribersSupport > maxSupportedSubscribers())
807      notVerified(String.format("The Publisher under test only supports %d subscribers, while this test requires at least %d to run.",
808                                maxSupportedSubscribers(), requiredSubscribersSupport));
809    else body.apply(requiredSubscribersSupport);
810  }
811
812  public abstract class TestSetup extends ManualPublisher<T> {
813    final private ManualSubscriber<T> tees; // gives us access to an infinite stream of T values
814    private Set<T> seenTees = new HashSet<T>();
815
816    final Processor<T, T> processor;
817
818    public TestSetup(TestEnvironment env, int testBufferSize) throws InterruptedException {
819      super(env);
820      tees = env.newManualSubscriber(createHelperPublisher(Long.MAX_VALUE));
821      processor = createIdentityProcessor(testBufferSize);
822      subscribe(processor);
823    }
824
825    public ManualSubscriber<T> newSubscriber() throws InterruptedException {
826      return env.newManualSubscriber(processor);
827    }
828
829    public T nextT() throws InterruptedException {
830      final T t = tees.requestNextElement();
831      if (seenTees.contains(t)) {
832        env.flop(String.format("Helper publisher illegally produced the same element %s twice", t));
833      }
834      seenTees.add(t);
835      return t;
836    }
837
838    public void expectNextElement(ManualSubscriber<T> sub, T expected) throws InterruptedException {
839      final T elem = sub.nextElement(String.format("timeout while awaiting %s", expected));
840      if (!elem.equals(expected)) {
841        env.flop(String.format("Received `onNext(%s)` on downstream but expected `onNext(%s)`", elem, expected));
842      }
843    }
844
845    public void expectNextElement(ManualSubscriber<T> sub, T expected, String errorMessageAddendum) throws InterruptedException {
846      final T elem = sub.nextElement(String.format("timeout while awaiting %s. %s", expected, errorMessageAddendum));
847      if (!elem.equals(expected)) {
848        env.flop(String.format("Received `onNext(%s)` on downstream but expected `onNext(%s)`", elem, expected));
849      }
850    }
851
852    public T sendNextTFromUpstream() throws InterruptedException {
853      final T x = nextT();
854      sendNext(x);
855      return x;
856    }
857  }
858
859  public class ManualSubscriberWithErrorCollection<A> extends ManualSubscriberWithSubscriptionSupport<A> {
860    Promise<Throwable> error;
861
862    public ManualSubscriberWithErrorCollection(TestEnvironment env) {
863      super(env);
864      error = new Promise<Throwable>(env);
865    }
866
867    @Override
868    public void onError(Throwable cause) {
869      error.complete(cause);
870    }
871
872    public void expectError(Throwable cause) throws InterruptedException {
873      expectError(cause, env.defaultTimeoutMillis());
874    }
875
876    @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
877    public void expectError(Throwable cause, long timeoutMillis) throws InterruptedException {
878      error.expectCompletion(timeoutMillis, "Did not receive expected error on downstream");
879      if (!cause.equals(error.value())) {
880        env.flop(String.format("Expected error %s but got %s", cause, error.value()));
881      }
882    }
883  }
884}