001package org.reactivestreams.tck;
002
003import org.reactivestreams.Processor;
004import org.reactivestreams.Publisher;
005import org.reactivestreams.Subscriber;
006import org.reactivestreams.Subscription;
007import org.reactivestreams.tck.TestEnvironment.ManualPublisher;
008import org.reactivestreams.tck.TestEnvironment.ManualSubscriber;
009import org.reactivestreams.tck.TestEnvironment.ManualSubscriberWithSubscriptionSupport;
010import org.reactivestreams.tck.TestEnvironment.Promise;
011import org.reactivestreams.tck.support.Function;
012import org.reactivestreams.tck.support.SubscriberWhiteboxVerificationRules;
013import org.reactivestreams.tck.support.PublisherVerificationRules;
014import org.testng.annotations.BeforeMethod;
015import org.testng.annotations.Test;
016
017import java.util.HashSet;
018import java.util.Set;
019
020public abstract class IdentityProcessorVerification<T> extends WithHelperPublisher<T>
021  implements SubscriberWhiteboxVerificationRules, PublisherVerificationRules {
022
023  private final TestEnvironment env;
024
025  ////////////////////// DELEGATED TO SPECS //////////////////////
026
027  // for delegating tests
028  private final SubscriberWhiteboxVerification<T> subscriberVerification;
029
030  // for delegating tests
031  private final PublisherVerification<T> publisherVerification;
032
033  ////////////////// END OF DELEGATED TO SPECS //////////////////
034
035  // number of elements the processor under test must be able ot buffer,
036  // without dropping elements. Defaults to `TestEnvironment.TEST_BUFFER_SIZE`.
037  private final int processorBufferSize;
038
039  /**
040   * Test class must specify the expected time it takes for the publisher to
041   * shut itself down when the the last downstream {@code Subscription} is cancelled.
042   *
043   * The processor will be required to be able to buffer {@code TestEnvironment.TEST_BUFFER_SIZE} elements.
044   */
045  @SuppressWarnings("unused")
046  public IdentityProcessorVerification(final TestEnvironment env) {
047    this(env, PublisherVerification.envPublisherReferenceGCTimeoutMillis(), TestEnvironment.TEST_BUFFER_SIZE);
048  }
049
050  /**
051   * Test class must specify the expected time it takes for the publisher to
052   * shut itself down when the the last downstream {@code Subscription} is cancelled.
053   *
054   * The processor will be required to be able to buffer {@code TestEnvironment.TEST_BUFFER_SIZE} elements.
055   *
056   * @param publisherReferenceGCTimeoutMillis used to determine after how much time a reference to a Subscriber should be already dropped by the Publisher.
057   */
058  @SuppressWarnings("unused")
059  public IdentityProcessorVerification(final TestEnvironment env, long publisherReferenceGCTimeoutMillis) {
060    this(env, publisherReferenceGCTimeoutMillis, TestEnvironment.TEST_BUFFER_SIZE);
061  }
062
063  /**
064   * Test class must specify the expected time it takes for the publisher to
065   * shut itself down when the the last downstream {@code Subscription} is cancelled.
066   *
067   * @param publisherReferenceGCTimeoutMillis used to determine after how much time a reference to a Subscriber should be already dropped by the Publisher.
068   * @param processorBufferSize            number of elements the processor is required to be able to buffer.
069   */
070  public IdentityProcessorVerification(final TestEnvironment env, long publisherReferenceGCTimeoutMillis, int processorBufferSize) {
071    this.env = env;
072    this.processorBufferSize = processorBufferSize;
073
074    this.subscriberVerification = new SubscriberWhiteboxVerification<T>(env) {
075      @Override
076      public Subscriber<T> createSubscriber(WhiteboxSubscriberProbe<T> probe) {
077        return IdentityProcessorVerification.this.createSubscriber(probe);
078      }
079
080      @Override public T createElement(int element) {
081        return IdentityProcessorVerification.this.createElement(element);
082      }
083
084      @Override
085      public Publisher<T> createHelperPublisher(long elements) {
086        return IdentityProcessorVerification.this.createHelperPublisher(elements);
087      }
088    };
089
090    publisherVerification = new PublisherVerification<T>(env, publisherReferenceGCTimeoutMillis) {
091      @Override
092      public Publisher<T> createPublisher(long elements) {
093        return IdentityProcessorVerification.this.createPublisher(elements);
094      }
095
096      @Override
097      public Publisher<T> createFailedPublisher() {
098        return IdentityProcessorVerification.this.createFailedPublisher();
099      }
100
101      @Override
102      public long maxElementsFromPublisher() {
103        return IdentityProcessorVerification.this.maxElementsFromPublisher();
104      }
105
106      @Override
107      public long boundedDepthOfOnNextAndRequestRecursion() {
108        return IdentityProcessorVerification.this.boundedDepthOfOnNextAndRequestRecursion();
109      }
110
111      @Override
112      public boolean skipStochasticTests() {
113        return IdentityProcessorVerification.this.skipStochasticTests();
114      }
115    };
116  }
117
118  /**
119   * This is the main method you must implement in your test incarnation.
120   * It must create a Publisher, which simply forwards all stream elements from its upstream
121   * to its downstream. It must be able to internally buffer the given number of elements.
122   *
123   * @param bufferSize number of elements the processor is required to be able to buffer.
124   */
125  public abstract Processor<T, T> createIdentityProcessor(int bufferSize);
126
127  /**
128   * By implementing this method, additional TCK tests concerning a "failed" publishers will be run.
129   *
130   * The expected behaviour of the {@link Publisher} returned by this method is hand out a subscription,
131   * followed by signalling {@code onError} on it, as specified by Rule 1.9.
132   *
133   * If you ignore these additional tests, return {@code null} from this method.
134   */
135  public abstract Publisher<T> createFailedPublisher();
136
137  /**
138   * Override and return lower value if your Publisher is only able to produce a known number of elements.
139   * For example, if it is designed to return at-most-one element, return {@code 1} from this method.
140   *
141   * Defaults to {@code Long.MAX_VALUE - 1}, meaning that the Publisher can be produce a huge but NOT an unbounded number of elements.
142   *
143   * To mark your Publisher will *never* signal an {@code onComplete} override this method and return {@code Long.MAX_VALUE},
144   * which will result in *skipping all tests which require an onComplete to be triggered* (!).
145   */
146  public long maxElementsFromPublisher() {
147    return Long.MAX_VALUE - 1;
148  }
149
150  /**
151   * In order to verify rule 3.3 of the reactive streams spec, this number will be used to check if a
152   * {@code Subscription} actually solves the "unbounded recursion" problem by not allowing the number of
153   * recursive calls to exceed the number returned by this method.
154   *
155   * @see <a href="https://github.com/reactive-streams/reactive-streams-jvm#3.3">reactive streams spec, rule 3.3</a>
156   * @see PublisherVerification#required_spec303_mustNotAllowUnboundedRecursion()
157   */
158  public long boundedDepthOfOnNextAndRequestRecursion() {
159    return 1;
160  }
161
162  /**
163   * Override and return {@code true} in order to skip executing tests marked as {@code Stochastic}.
164   * Such tests MAY sometimes fail even though the impl
165   */
166  public boolean skipStochasticTests() {
167    return false;
168  }
169
170  /**
171   * Describes the tested implementation in terms of how many subscribers they can support.
172   * Some tests require the {@code Publisher} under test to support multiple Subscribers,
173   * yet the spec does not require all publishers to be able to do so, thus – if an implementation
174   * supports only a limited number of subscribers (e.g. only 1 subscriber, also known as "no fanout")
175   * you MUST return that number from this method by overriding it.
176   */
177  public long maxSupportedSubscribers() {
178      return Long.MAX_VALUE;
179  }
180
181  ////////////////////// TEST ENV CLEANUP /////////////////////////////////////
182
183  @BeforeMethod
184  public void setUp() throws Exception {
185    publisherVerification.setUp();
186    subscriberVerification.setUp();
187  }
188
189  ////////////////////// PUBLISHER RULES VERIFICATION ///////////////////////////
190
191  // A Processor
192  //   must obey all Publisher rules on its publishing side
193  public Publisher<T> createPublisher(long elements) {
194    final Processor<T, T> processor = createIdentityProcessor(processorBufferSize);
195    final Publisher<T> pub = createHelperPublisher(elements);
196    pub.subscribe(processor);
197    return processor; // we run the PublisherVerification against this
198  }
199
200  @Override @Test
201  public void required_validate_maxElementsFromPublisher() throws Exception {
202    publisherVerification.required_validate_maxElementsFromPublisher();
203  }
204
205  @Override @Test
206  public void required_validate_boundedDepthOfOnNextAndRequestRecursion() throws Exception {
207    publisherVerification.required_validate_boundedDepthOfOnNextAndRequestRecursion();
208  }
209
210  /////////////////////// DELEGATED TESTS, A PROCESSOR "IS A" PUBLISHER //////////////////////
211  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#4.1
212
213  @Test
214  public void required_createPublisher1MustProduceAStreamOfExactly1Element() throws Throwable {
215    publisherVerification.required_createPublisher1MustProduceAStreamOfExactly1Element();
216  }
217
218  @Test
219  public void required_createPublisher3MustProduceAStreamOfExactly3Elements() throws Throwable {
220    publisherVerification.required_createPublisher3MustProduceAStreamOfExactly3Elements();
221  }
222
223  @Override @Test
224  public void required_spec101_subscriptionRequestMustResultInTheCorrectNumberOfProducedElements() throws Throwable {
225    publisherVerification.required_spec101_subscriptionRequestMustResultInTheCorrectNumberOfProducedElements();
226  }
227
228  @Override @Test
229  public void required_spec102_maySignalLessThanRequestedAndTerminateSubscription() throws Throwable {
230    publisherVerification.required_spec102_maySignalLessThanRequestedAndTerminateSubscription();
231  }
232
233  @Override @Test
234  public void stochastic_spec103_mustSignalOnMethodsSequentially() throws Throwable {
235    publisherVerification.stochastic_spec103_mustSignalOnMethodsSequentially();
236  }
237
238  @Override @Test
239  public void optional_spec104_mustSignalOnErrorWhenFails() throws Throwable {
240    publisherVerification.optional_spec104_mustSignalOnErrorWhenFails();
241  }
242
243  @Override @Test
244  public void required_spec105_mustSignalOnCompleteWhenFiniteStreamTerminates() throws Throwable {
245    publisherVerification.required_spec105_mustSignalOnCompleteWhenFiniteStreamTerminates();
246  }
247
248  @Override @Test
249  public void optional_spec105_emptyStreamMustTerminateBySignallingOnComplete() throws Throwable {
250    publisherVerification.optional_spec105_emptyStreamMustTerminateBySignallingOnComplete();
251  }
252
253  @Override @Test
254  public void untested_spec106_mustConsiderSubscriptionCancelledAfterOnErrorOrOnCompleteHasBeenCalled() throws Throwable {
255    publisherVerification.untested_spec106_mustConsiderSubscriptionCancelledAfterOnErrorOrOnCompleteHasBeenCalled();
256  }
257
258  @Override @Test
259  public void required_spec107_mustNotEmitFurtherSignalsOnceOnCompleteHasBeenSignalled() throws Throwable {
260    publisherVerification.required_spec107_mustNotEmitFurtherSignalsOnceOnCompleteHasBeenSignalled();
261  }
262
263  @Override @Test
264  public void untested_spec107_mustNotEmitFurtherSignalsOnceOnErrorHasBeenSignalled() throws Throwable {
265    publisherVerification.untested_spec107_mustNotEmitFurtherSignalsOnceOnErrorHasBeenSignalled();
266  }
267
268  @Override @Test
269  public void untested_spec108_possiblyCanceledSubscriptionShouldNotReceiveOnErrorOrOnCompleteSignals() throws Throwable {
270    publisherVerification.untested_spec108_possiblyCanceledSubscriptionShouldNotReceiveOnErrorOrOnCompleteSignals();
271  }
272
273  @Override @Test
274  public void untested_spec109_subscribeShouldNotThrowNonFatalThrowable() throws Throwable {
275    publisherVerification.untested_spec109_subscribeShouldNotThrowNonFatalThrowable();
276  }
277
278  @Override @Test
279  public void required_spec109_subscribeThrowNPEOnNullSubscriber() throws Throwable {
280    publisherVerification.required_spec109_subscribeThrowNPEOnNullSubscriber();
281  }
282
283  @Override @Test
284  public void required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorAfterOnSubscribe() throws Throwable {
285    publisherVerification.required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorAfterOnSubscribe();
286  }
287
288  @Override @Test
289  public void required_spec109_mustIssueOnSubscribeForNonNullSubscriber() throws Throwable {
290    publisherVerification.required_spec109_mustIssueOnSubscribeForNonNullSubscriber();
291  }
292
293  @Override @Test
294  public void untested_spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice() throws Throwable {
295    publisherVerification.untested_spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice();
296  }
297
298  @Override @Test
299  public void optional_spec111_maySupportMultiSubscribe() throws Throwable {
300    publisherVerification.optional_spec111_maySupportMultiSubscribe();
301  }
302
303  @Override @Test
304  public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne() throws Throwable {
305    publisherVerification.optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne();
306  }
307
308  @Override @Test
309  public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront() throws Throwable {
310    publisherVerification.optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront();
311  }
312
313  @Override @Test
314  public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected() throws Throwable {
315    publisherVerification.optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected();
316  }
317
318  @Override @Test
319  public void required_spec302_mustAllowSynchronousRequestCallsFromOnNextAndOnSubscribe() throws Throwable {
320    publisherVerification.required_spec302_mustAllowSynchronousRequestCallsFromOnNextAndOnSubscribe();
321  }
322
323  @Override @Test
324  public void required_spec303_mustNotAllowUnboundedRecursion() throws Throwable {
325    publisherVerification.required_spec303_mustNotAllowUnboundedRecursion();
326  }
327
328  @Override @Test
329  public void untested_spec304_requestShouldNotPerformHeavyComputations() throws Exception {
330    publisherVerification.untested_spec304_requestShouldNotPerformHeavyComputations();
331  }
332
333  @Override @Test
334  public void untested_spec305_cancelMustNotSynchronouslyPerformHeavyCompuatation() throws Exception {
335    publisherVerification.untested_spec305_cancelMustNotSynchronouslyPerformHeavyCompuatation();
336  }
337
338  @Override @Test
339  public void required_spec306_afterSubscriptionIsCancelledRequestMustBeNops() throws Throwable {
340    publisherVerification.required_spec306_afterSubscriptionIsCancelledRequestMustBeNops();
341  }
342
343  @Override @Test
344  public void required_spec307_afterSubscriptionIsCancelledAdditionalCancelationsMustBeNops() throws Throwable {
345    publisherVerification.required_spec307_afterSubscriptionIsCancelledAdditionalCancelationsMustBeNops();
346  }
347
348  @Override @Test
349  public void required_spec309_requestZeroMustSignalIllegalArgumentException() throws Throwable {
350    publisherVerification.required_spec309_requestZeroMustSignalIllegalArgumentException();
351  }
352
353  @Override @Test
354  public void required_spec309_requestNegativeNumberMustSignalIllegalArgumentException() throws Throwable {
355    publisherVerification.required_spec309_requestNegativeNumberMustSignalIllegalArgumentException();
356  }
357
358  @Override @Test
359  public void required_spec312_cancelMustMakeThePublisherToEventuallyStopSignaling() throws Throwable {
360    publisherVerification.required_spec312_cancelMustMakeThePublisherToEventuallyStopSignaling();
361  }
362
363  @Override @Test
364  public void required_spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber() throws Throwable {
365    publisherVerification.required_spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber();
366  }
367
368  @Override @Test
369  public void required_spec317_mustSupportAPendingElementCountUpToLongMaxValue() throws Throwable {
370    publisherVerification.required_spec317_mustSupportAPendingElementCountUpToLongMaxValue();
371  }
372
373  @Override @Test
374  public void required_spec317_mustSupportACumulativePendingElementCountUpToLongMaxValue() throws Throwable {
375    publisherVerification.required_spec317_mustSupportACumulativePendingElementCountUpToLongMaxValue();
376  }
377
378  @Override @Test
379  public void required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue() throws Throwable {
380    publisherVerification.required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue();
381  }
382
383  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#1.4
384  // for multiple subscribers
385  @Test
386  public void required_spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError() throws Throwable {
387    optionalMultipleSubscribersTest(2, new Function<Long,TestSetup>() {
388      @Override
389      public TestSetup apply(Long aLong) throws Throwable {
390        return new TestSetup(env, processorBufferSize) {{
391          final ManualSubscriberWithErrorCollection<T> sub1 = new ManualSubscriberWithErrorCollection<T>(env);
392          env.subscribe(processor, sub1);
393
394          final ManualSubscriberWithErrorCollection<T> sub2 = new ManualSubscriberWithErrorCollection<T>(env);
395          env.subscribe(processor, sub2);
396
397          sub1.request(1);
398          expectRequest();
399          final T x = sendNextTFromUpstream();
400          expectNextElement(sub1, x);
401          sub1.request(1);
402
403          // sub1 has received one element, and has one demand pending
404          // sub2 has not yet requested anything
405
406          final Exception ex = new RuntimeException("Test exception");
407          sendError(ex);
408          sub1.expectError(ex);
409          sub2.expectError(ex);
410
411          env.verifyNoAsyncErrorsNoDelay();
412        }};
413      }
414    });
415  }
416
417  ////////////////////// SUBSCRIBER RULES VERIFICATION ///////////////////////////
418  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#4.1
419
420  // A Processor
421  //   must obey all Subscriber rules on its consuming side
422  public Subscriber<T> createSubscriber(final SubscriberWhiteboxVerification.WhiteboxSubscriberProbe<T> probe) {
423    final Processor<T, T> processor = createIdentityProcessor(processorBufferSize);
424    processor.subscribe(
425        new Subscriber<T>() {
426          private final Promise<Subscription> subs = new Promise<Subscription>(env);
427
428          @Override
429          public void onSubscribe(final Subscription subscription) {
430            env.debug(String.format("whiteboxSubscriber::onSubscribe(%s)", subscription));
431            if (subs.isCompleted()) subscription.cancel(); // the Probe must also pass subscriber verification
432
433            probe.registerOnSubscribe(new SubscriberWhiteboxVerification.SubscriberPuppet() {
434
435              @Override
436              public void triggerRequest(long elements) {
437                subscription.request(elements);
438              }
439
440              @Override
441              public void signalCancel() {
442                subscription.cancel();
443              }
444            });
445          }
446
447          @Override
448          public void onNext(T element) {
449            env.debug(String.format("whiteboxSubscriber::onNext(%s)", element));
450            probe.registerOnNext(element);
451          }
452
453          @Override
454          public void onComplete() {
455            env.debug("whiteboxSubscriber::onComplete()");
456            probe.registerOnComplete();
457          }
458
459          @Override
460          public void onError(Throwable cause) {
461            env.debug(String.format("whiteboxSubscriber::onError(%s)", cause));
462            probe.registerOnError(cause);
463          }
464        });
465
466    return processor; // we run the SubscriberVerification against this
467  }
468
469  ////////////////////// OTHER RULE VERIFICATION ///////////////////////////
470
471  // A Processor
472  //   must immediately pass on `onError` events received from its upstream to its downstream
473  @Test
474  public void mustImmediatelyPassOnOnErrorEventsReceivedFromItsUpstreamToItsDownstream() throws Exception {
475    new TestSetup(env, processorBufferSize) {{
476      final ManualSubscriberWithErrorCollection<T> sub = new ManualSubscriberWithErrorCollection<T>(env);
477      env.subscribe(processor, sub);
478
479      final Exception ex = new RuntimeException("Test exception");
480      sendError(ex);
481      sub.expectError(ex); // "immediately", i.e. without a preceding request
482
483      env.verifyNoAsyncErrorsNoDelay();
484    }};
485  }
486
487  /////////////////////// DELEGATED TESTS, A PROCESSOR "IS A" SUBSCRIBER //////////////////////
488  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#4.1
489
490  @Test
491  public void required_exerciseWhiteboxHappyPath() throws Throwable {
492    subscriberVerification.required_exerciseWhiteboxHappyPath();
493  }
494
495  @Override @Test
496  public void required_spec201_mustSignalDemandViaSubscriptionRequest() throws Throwable {
497    subscriberVerification.required_spec201_mustSignalDemandViaSubscriptionRequest();
498  }
499
500  @Override @Test
501  public void untested_spec202_shouldAsynchronouslyDispatch() throws Exception {
502    subscriberVerification.untested_spec202_shouldAsynchronouslyDispatch();
503  }
504
505  @Override @Test
506  public void required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnComplete() throws Throwable {
507    subscriberVerification.required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnComplete();
508  }
509
510  @Override @Test
511  public void required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnError() throws Throwable {
512    subscriberVerification.required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnError();
513  }
514
515  @Override @Test
516  public void untested_spec204_mustConsiderTheSubscriptionAsCancelledInAfterRecievingOnCompleteOrOnError() throws Exception {
517    subscriberVerification.untested_spec204_mustConsiderTheSubscriptionAsCancelledInAfterRecievingOnCompleteOrOnError();
518  }
519
520  @Override @Test
521  public void required_spec205_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal() throws Throwable {
522    subscriberVerification.required_spec205_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal();
523  }
524
525  @Override @Test
526  public void untested_spec206_mustCallSubscriptionCancelIfItIsNoLongerValid() throws Exception {
527    subscriberVerification.untested_spec206_mustCallSubscriptionCancelIfItIsNoLongerValid();
528  }
529
530  @Override @Test
531  public void untested_spec207_mustEnsureAllCallsOnItsSubscriptionTakePlaceFromTheSameThreadOrTakeCareOfSynchronization() throws Exception {
532    subscriberVerification.untested_spec207_mustEnsureAllCallsOnItsSubscriptionTakePlaceFromTheSameThreadOrTakeCareOfSynchronization();
533  }
534
535  @Override @Test
536  public void required_spec208_mustBePreparedToReceiveOnNextSignalsAfterHavingCalledSubscriptionCancel() throws Throwable {
537    subscriberVerification.required_spec208_mustBePreparedToReceiveOnNextSignalsAfterHavingCalledSubscriptionCancel();
538  }
539
540  @Override @Test
541  public void required_spec209_mustBePreparedToReceiveAnOnCompleteSignalWithPrecedingRequestCall() throws Throwable {
542    subscriberVerification.required_spec209_mustBePreparedToReceiveAnOnCompleteSignalWithPrecedingRequestCall();
543  }
544
545  @Override @Test
546  public void required_spec209_mustBePreparedToReceiveAnOnCompleteSignalWithoutPrecedingRequestCall() throws Throwable {
547    subscriberVerification.required_spec209_mustBePreparedToReceiveAnOnCompleteSignalWithoutPrecedingRequestCall();
548  }
549
550  @Override @Test
551  public void required_spec210_mustBePreparedToReceiveAnOnErrorSignalWithPrecedingRequestCall() throws Throwable {
552    subscriberVerification.required_spec210_mustBePreparedToReceiveAnOnErrorSignalWithPrecedingRequestCall();
553  }
554
555  @Override @Test
556  public void required_spec210_mustBePreparedToReceiveAnOnErrorSignalWithoutPrecedingRequestCall() throws Throwable {
557    subscriberVerification.required_spec210_mustBePreparedToReceiveAnOnErrorSignalWithoutPrecedingRequestCall();
558  }
559
560  @Override @Test
561  public void untested_spec211_mustMakeSureThatAllCallsOnItsMethodsHappenBeforeTheProcessingOfTheRespectiveEvents() throws Exception {
562    subscriberVerification.untested_spec211_mustMakeSureThatAllCallsOnItsMethodsHappenBeforeTheProcessingOfTheRespectiveEvents();
563  }
564
565  @Override @Test
566  public void untested_spec212_mustNotCallOnSubscribeMoreThanOnceBasedOnObjectEquality_specViolation() throws Throwable {
567    subscriberVerification.untested_spec212_mustNotCallOnSubscribeMoreThanOnceBasedOnObjectEquality_specViolation();
568  }
569
570  @Override @Test
571  public void untested_spec213_failingOnSignalInvocation() throws Exception {
572    subscriberVerification.untested_spec213_failingOnSignalInvocation();
573  }
574
575  @Override @Test
576  public void required_spec213_onSubscribe_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable {
577    subscriberVerification.required_spec213_onSubscribe_mustThrowNullPointerExceptionWhenParametersAreNull();
578  }
579  @Override @Test
580  public void required_spec213_onNext_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable {
581    subscriberVerification.required_spec213_onNext_mustThrowNullPointerExceptionWhenParametersAreNull();
582  }
583  @Override @Test
584  public void required_spec213_onError_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable {
585    subscriberVerification.required_spec213_onError_mustThrowNullPointerExceptionWhenParametersAreNull();
586  }
587
588  @Override @Test
589  public void untested_spec301_mustNotBeCalledOutsideSubscriberContext() throws Exception {
590    subscriberVerification.untested_spec301_mustNotBeCalledOutsideSubscriberContext();
591  }
592
593  @Override @Test
594  public void required_spec308_requestMustRegisterGivenNumberElementsToBeProduced() throws Throwable {
595    subscriberVerification.required_spec308_requestMustRegisterGivenNumberElementsToBeProduced();
596  }
597
598  @Override @Test
599  public void untested_spec310_requestMaySynchronouslyCallOnNextOnSubscriber() throws Exception {
600    subscriberVerification.untested_spec310_requestMaySynchronouslyCallOnNextOnSubscriber();
601  }
602
603  @Override @Test
604  public void untested_spec311_requestMaySynchronouslyCallOnCompleteOrOnError() throws Exception {
605    subscriberVerification.untested_spec311_requestMaySynchronouslyCallOnCompleteOrOnError();
606  }
607
608  @Override @Test
609  public void untested_spec314_cancelMayCauseThePublisherToShutdownIfNoOtherSubscriptionExists() throws Exception {
610    subscriberVerification.untested_spec314_cancelMayCauseThePublisherToShutdownIfNoOtherSubscriptionExists();
611  }
612
613  @Override @Test
614  public void untested_spec315_cancelMustNotThrowExceptionAndMustSignalOnError() throws Exception {
615    subscriberVerification.untested_spec315_cancelMustNotThrowExceptionAndMustSignalOnError();
616  }
617
618  @Override @Test
619  public void untested_spec316_requestMustNotThrowExceptionAndMustOnErrorTheSubscriber() throws Exception {
620    subscriberVerification.untested_spec316_requestMustNotThrowExceptionAndMustOnErrorTheSubscriber();
621  }
622
623  /////////////////////// ADDITIONAL "COROLLARY" TESTS //////////////////////
624
625  // A Processor
626  //   must trigger `requestFromUpstream` for elements that have been requested 'long ago'
627  @Test
628  public void required_mustRequestFromUpstreamForElementsThatHaveBeenRequestedLongAgo() throws Throwable {
629    optionalMultipleSubscribersTest(2, new Function<Long,TestSetup>() {
630      @Override
631      public TestSetup apply(Long subscribers) throws Throwable {
632        return new TestSetup(env, processorBufferSize) {{
633          ManualSubscriber<T> sub1 = newSubscriber();
634          sub1.request(20);
635
636          long totalRequests = expectRequest();
637          final T x = sendNextTFromUpstream();
638          expectNextElement(sub1, x);
639
640          if (totalRequests == 1) {
641            totalRequests += expectRequest();
642          }
643          final T y = sendNextTFromUpstream();
644          expectNextElement(sub1, y);
645
646          if (totalRequests == 2) {
647            totalRequests += expectRequest();
648          }
649
650          final ManualSubscriber<T> sub2 = newSubscriber();
651
652          // sub1 now has 18 pending
653          // sub2 has 0 pending
654
655          final T z = sendNextTFromUpstream();
656          expectNextElement(sub1, z);
657          sub2.expectNone(); // since sub2 hasn't requested anything yet
658
659          sub2.request(1);
660          expectNextElement(sub2, z);
661
662          if (totalRequests == 3) {
663            expectRequest();
664          }
665
666          // to avoid error messages during test harness shutdown
667          sendCompletion();
668          sub1.expectCompletion(env.defaultTimeoutMillis());
669          sub2.expectCompletion(env.defaultTimeoutMillis());
670
671          env.verifyNoAsyncErrorsNoDelay();
672        }};
673      }
674    });
675  }
676
677  /////////////////////// TEST INFRASTRUCTURE //////////////////////
678
679  public void notVerified() {
680    publisherVerification.notVerified();
681  }
682
683  public void notVerified(String message) {
684    publisherVerification.notVerified(message);
685  }
686
687  /**
688   * Test for feature that REQUIRES multiple subscribers to be supported by Publisher.
689   */
690  public void optionalMultipleSubscribersTest(long requiredSubscribersSupport, Function<Long, TestSetup> body) throws Throwable {
691    if (requiredSubscribersSupport > maxSupportedSubscribers())
692      notVerified(String.format("The Publisher under test only supports %d subscribers, while this test requires at least %d to run.",
693                                maxSupportedSubscribers(), requiredSubscribersSupport));
694    else body.apply(requiredSubscribersSupport);
695  }
696
697  public abstract class TestSetup extends ManualPublisher<T> {
698    final private ManualSubscriber<T> tees; // gives us access to an infinite stream of T values
699    private Set<T> seenTees = new HashSet<T>();
700
701    final Processor<T, T> processor;
702
703    public TestSetup(TestEnvironment env, int testBufferSize) throws InterruptedException {
704      super(env);
705      tees = env.newManualSubscriber(createHelperPublisher(Long.MAX_VALUE));
706      processor = createIdentityProcessor(testBufferSize);
707      subscribe(processor);
708    }
709
710    public ManualSubscriber<T> newSubscriber() throws InterruptedException {
711      return env.newManualSubscriber(processor);
712    }
713
714    public T nextT() throws InterruptedException {
715      final T t = tees.requestNextElement();
716      if (seenTees.contains(t)) {
717        env.flop(String.format("Helper publisher illegally produced the same element %s twice", t));
718      }
719      seenTees.add(t);
720      return t;
721    }
722
723    public void expectNextElement(ManualSubscriber<T> sub, T expected) throws InterruptedException {
724      final T elem = sub.nextElement(String.format("timeout while awaiting %s", expected));
725      if (!elem.equals(expected)) {
726        env.flop(String.format("Received `onNext(%s)` on downstream but expected `onNext(%s)`", elem, expected));
727      }
728    }
729
730    public T sendNextTFromUpstream() throws InterruptedException {
731      final T x = nextT();
732      sendNext(x);
733      return x;
734    }
735  }
736
737  public class ManualSubscriberWithErrorCollection<A> extends ManualSubscriberWithSubscriptionSupport<A> {
738    Promise<Throwable> error;
739
740    public ManualSubscriberWithErrorCollection(TestEnvironment env) {
741      super(env);
742      error = new Promise<Throwable>(env);
743    }
744
745    @Override
746    public void onError(Throwable cause) {
747      error.complete(cause);
748    }
749
750    public void expectError(Throwable cause) throws InterruptedException {
751      expectError(cause, env.defaultTimeoutMillis());
752    }
753
754    @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
755    public void expectError(Throwable cause, long timeoutMillis) throws InterruptedException {
756      error.expectCompletion(timeoutMillis, "Did not receive expected error on downstream");
757      if (!cause.equals(error.value())) {
758        env.flop(String.format("Expected error %s but got %s", cause, error.value()));
759      }
760    }
761  }
762}