001package org.reactivestreams.tck; 002 003import org.reactivestreams.Processor; 004import org.reactivestreams.Publisher; 005import org.reactivestreams.Subscriber; 006import org.reactivestreams.Subscription; 007import org.reactivestreams.tck.TestEnvironment.ManualPublisher; 008import org.reactivestreams.tck.TestEnvironment.ManualSubscriber; 009import org.reactivestreams.tck.TestEnvironment.ManualSubscriberWithSubscriptionSupport; 010import org.reactivestreams.tck.TestEnvironment.Promise; 011import org.reactivestreams.tck.support.Function; 012import org.reactivestreams.tck.support.SubscriberWhiteboxVerificationRules; 013import org.reactivestreams.tck.support.PublisherVerificationRules; 014import org.testng.annotations.BeforeMethod; 015import org.testng.annotations.Test; 016 017import java.util.HashSet; 018import java.util.Set; 019 020public abstract class IdentityProcessorVerification<T> extends WithHelperPublisher<T> 021 implements SubscriberWhiteboxVerificationRules, PublisherVerificationRules { 022 023 private final TestEnvironment env; 024 025 ////////////////////// DELEGATED TO SPECS ////////////////////// 026 027 // for delegating tests 028 private final SubscriberWhiteboxVerification<T> subscriberVerification; 029 030 // for delegating tests 031 private final PublisherVerification<T> publisherVerification; 032 033 ////////////////// END OF DELEGATED TO SPECS ////////////////// 034 035 // number of elements the processor under test must be able ot buffer, 036 // without dropping elements. Defaults to `TestEnvironment.TEST_BUFFER_SIZE`. 037 private final int processorBufferSize; 038 039 /** 040 * Test class must specify the expected time it takes for the publisher to 041 * shut itself down when the the last downstream {@code Subscription} is cancelled. 042 * 043 * The processor will be required to be able to buffer {@code TestEnvironment.TEST_BUFFER_SIZE} elements. 044 */ 045 @SuppressWarnings("unused") 046 public IdentityProcessorVerification(final TestEnvironment env) { 047 this(env, PublisherVerification.envPublisherReferenceGCTimeoutMillis(), TestEnvironment.TEST_BUFFER_SIZE); 048 } 049 050 /** 051 * Test class must specify the expected time it takes for the publisher to 052 * shut itself down when the the last downstream {@code Subscription} is cancelled. 053 * 054 * The processor will be required to be able to buffer {@code TestEnvironment.TEST_BUFFER_SIZE} elements. 055 * 056 * @param publisherReferenceGCTimeoutMillis used to determine after how much time a reference to a Subscriber should be already dropped by the Publisher. 057 */ 058 @SuppressWarnings("unused") 059 public IdentityProcessorVerification(final TestEnvironment env, long publisherReferenceGCTimeoutMillis) { 060 this(env, publisherReferenceGCTimeoutMillis, TestEnvironment.TEST_BUFFER_SIZE); 061 } 062 063 /** 064 * Test class must specify the expected time it takes for the publisher to 065 * shut itself down when the the last downstream {@code Subscription} is cancelled. 066 * 067 * @param publisherReferenceGCTimeoutMillis used to determine after how much time a reference to a Subscriber should be already dropped by the Publisher. 068 * @param processorBufferSize number of elements the processor is required to be able to buffer. 069 */ 070 public IdentityProcessorVerification(final TestEnvironment env, long publisherReferenceGCTimeoutMillis, int processorBufferSize) { 071 this.env = env; 072 this.processorBufferSize = processorBufferSize; 073 074 this.subscriberVerification = new SubscriberWhiteboxVerification<T>(env) { 075 @Override 076 public Subscriber<T> createSubscriber(WhiteboxSubscriberProbe<T> probe) { 077 return IdentityProcessorVerification.this.createSubscriber(probe); 078 } 079 080 @Override public T createElement(int element) { 081 return IdentityProcessorVerification.this.createElement(element); 082 } 083 084 @Override 085 public Publisher<T> createHelperPublisher(long elements) { 086 return IdentityProcessorVerification.this.createHelperPublisher(elements); 087 } 088 }; 089 090 publisherVerification = new PublisherVerification<T>(env, publisherReferenceGCTimeoutMillis) { 091 @Override 092 public Publisher<T> createPublisher(long elements) { 093 return IdentityProcessorVerification.this.createPublisher(elements); 094 } 095 096 @Override 097 public Publisher<T> createFailedPublisher() { 098 return IdentityProcessorVerification.this.createFailedPublisher(); 099 } 100 101 @Override 102 public long maxElementsFromPublisher() { 103 return IdentityProcessorVerification.this.maxElementsFromPublisher(); 104 } 105 106 @Override 107 public long boundedDepthOfOnNextAndRequestRecursion() { 108 return IdentityProcessorVerification.this.boundedDepthOfOnNextAndRequestRecursion(); 109 } 110 111 @Override 112 public boolean skipStochasticTests() { 113 return IdentityProcessorVerification.this.skipStochasticTests(); 114 } 115 }; 116 } 117 118 /** 119 * This is the main method you must implement in your test incarnation. 120 * It must create a Publisher, which simply forwards all stream elements from its upstream 121 * to its downstream. It must be able to internally buffer the given number of elements. 122 * 123 * @param bufferSize number of elements the processor is required to be able to buffer. 124 */ 125 public abstract Processor<T, T> createIdentityProcessor(int bufferSize); 126 127 /** 128 * By implementing this method, additional TCK tests concerning a "failed" publishers will be run. 129 * 130 * The expected behaviour of the {@link Publisher} returned by this method is hand out a subscription, 131 * followed by signalling {@code onError} on it, as specified by Rule 1.9. 132 * 133 * If you ignore these additional tests, return {@code null} from this method. 134 */ 135 public abstract Publisher<T> createFailedPublisher(); 136 137 /** 138 * Override and return lower value if your Publisher is only able to produce a known number of elements. 139 * For example, if it is designed to return at-most-one element, return {@code 1} from this method. 140 * 141 * Defaults to {@code Long.MAX_VALUE - 1}, meaning that the Publisher can be produce a huge but NOT an unbounded number of elements. 142 * 143 * To mark your Publisher will *never* signal an {@code onComplete} override this method and return {@code Long.MAX_VALUE}, 144 * which will result in *skipping all tests which require an onComplete to be triggered* (!). 145 */ 146 public long maxElementsFromPublisher() { 147 return Long.MAX_VALUE - 1; 148 } 149 150 /** 151 * In order to verify rule 3.3 of the reactive streams spec, this number will be used to check if a 152 * {@code Subscription} actually solves the "unbounded recursion" problem by not allowing the number of 153 * recursive calls to exceed the number returned by this method. 154 * 155 * @see <a href="https://github.com/reactive-streams/reactive-streams-jvm#3.3">reactive streams spec, rule 3.3</a> 156 * @see PublisherVerification#required_spec303_mustNotAllowUnboundedRecursion() 157 */ 158 public long boundedDepthOfOnNextAndRequestRecursion() { 159 return 1; 160 } 161 162 /** 163 * Override and return {@code true} in order to skip executing tests marked as {@code Stochastic}. 164 * Such tests MAY sometimes fail even though the impl 165 */ 166 public boolean skipStochasticTests() { 167 return false; 168 } 169 170 /** 171 * Describes the tested implementation in terms of how many subscribers they can support. 172 * Some tests require the {@code Publisher} under test to support multiple Subscribers, 173 * yet the spec does not require all publishers to be able to do so, thus – if an implementation 174 * supports only a limited number of subscribers (e.g. only 1 subscriber, also known as "no fanout") 175 * you MUST return that number from this method by overriding it. 176 */ 177 public long maxSupportedSubscribers() { 178 return Long.MAX_VALUE; 179 } 180 181 ////////////////////// TEST ENV CLEANUP ///////////////////////////////////// 182 183 @BeforeMethod 184 public void setUp() throws Exception { 185 publisherVerification.setUp(); 186 subscriberVerification.setUp(); 187 } 188 189 ////////////////////// PUBLISHER RULES VERIFICATION /////////////////////////// 190 191 // A Processor 192 // must obey all Publisher rules on its publishing side 193 public Publisher<T> createPublisher(long elements) { 194 final Processor<T, T> processor = createIdentityProcessor(processorBufferSize); 195 final Publisher<T> pub = createHelperPublisher(elements); 196 pub.subscribe(processor); 197 return processor; // we run the PublisherVerification against this 198 } 199 200 @Override @Test 201 public void required_validate_maxElementsFromPublisher() throws Exception { 202 publisherVerification.required_validate_maxElementsFromPublisher(); 203 } 204 205 @Override @Test 206 public void required_validate_boundedDepthOfOnNextAndRequestRecursion() throws Exception { 207 publisherVerification.required_validate_boundedDepthOfOnNextAndRequestRecursion(); 208 } 209 210 /////////////////////// DELEGATED TESTS, A PROCESSOR "IS A" PUBLISHER ////////////////////// 211 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#4.1 212 213 @Test 214 public void required_createPublisher1MustProduceAStreamOfExactly1Element() throws Throwable { 215 publisherVerification.required_createPublisher1MustProduceAStreamOfExactly1Element(); 216 } 217 218 @Test 219 public void required_createPublisher3MustProduceAStreamOfExactly3Elements() throws Throwable { 220 publisherVerification.required_createPublisher3MustProduceAStreamOfExactly3Elements(); 221 } 222 223 @Override @Test 224 public void required_spec101_subscriptionRequestMustResultInTheCorrectNumberOfProducedElements() throws Throwable { 225 publisherVerification.required_spec101_subscriptionRequestMustResultInTheCorrectNumberOfProducedElements(); 226 } 227 228 @Override @Test 229 public void required_spec102_maySignalLessThanRequestedAndTerminateSubscription() throws Throwable { 230 publisherVerification.required_spec102_maySignalLessThanRequestedAndTerminateSubscription(); 231 } 232 233 @Override @Test 234 public void stochastic_spec103_mustSignalOnMethodsSequentially() throws Throwable { 235 publisherVerification.stochastic_spec103_mustSignalOnMethodsSequentially(); 236 } 237 238 @Override @Test 239 public void optional_spec104_mustSignalOnErrorWhenFails() throws Throwable { 240 publisherVerification.optional_spec104_mustSignalOnErrorWhenFails(); 241 } 242 243 @Override @Test 244 public void required_spec105_mustSignalOnCompleteWhenFiniteStreamTerminates() throws Throwable { 245 publisherVerification.required_spec105_mustSignalOnCompleteWhenFiniteStreamTerminates(); 246 } 247 248 @Override @Test 249 public void optional_spec105_emptyStreamMustTerminateBySignallingOnComplete() throws Throwable { 250 publisherVerification.optional_spec105_emptyStreamMustTerminateBySignallingOnComplete(); 251 } 252 253 @Override @Test 254 public void untested_spec106_mustConsiderSubscriptionCancelledAfterOnErrorOrOnCompleteHasBeenCalled() throws Throwable { 255 publisherVerification.untested_spec106_mustConsiderSubscriptionCancelledAfterOnErrorOrOnCompleteHasBeenCalled(); 256 } 257 258 @Override @Test 259 public void required_spec107_mustNotEmitFurtherSignalsOnceOnCompleteHasBeenSignalled() throws Throwable { 260 publisherVerification.required_spec107_mustNotEmitFurtherSignalsOnceOnCompleteHasBeenSignalled(); 261 } 262 263 @Override @Test 264 public void untested_spec107_mustNotEmitFurtherSignalsOnceOnErrorHasBeenSignalled() throws Throwable { 265 publisherVerification.untested_spec107_mustNotEmitFurtherSignalsOnceOnErrorHasBeenSignalled(); 266 } 267 268 @Override @Test 269 public void untested_spec108_possiblyCanceledSubscriptionShouldNotReceiveOnErrorOrOnCompleteSignals() throws Throwable { 270 publisherVerification.untested_spec108_possiblyCanceledSubscriptionShouldNotReceiveOnErrorOrOnCompleteSignals(); 271 } 272 273 @Override @Test 274 public void untested_spec109_subscribeShouldNotThrowNonFatalThrowable() throws Throwable { 275 publisherVerification.untested_spec109_subscribeShouldNotThrowNonFatalThrowable(); 276 } 277 278 @Override @Test 279 public void required_spec109_subscribeThrowNPEOnNullSubscriber() throws Throwable { 280 publisherVerification.required_spec109_subscribeThrowNPEOnNullSubscriber(); 281 } 282 283 @Override @Test 284 public void required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorAfterOnSubscribe() throws Throwable { 285 publisherVerification.required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorAfterOnSubscribe(); 286 } 287 288 @Override @Test 289 public void required_spec109_mustIssueOnSubscribeForNonNullSubscriber() throws Throwable { 290 publisherVerification.required_spec109_mustIssueOnSubscribeForNonNullSubscriber(); 291 } 292 293 @Override @Test 294 public void untested_spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice() throws Throwable { 295 publisherVerification.untested_spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice(); 296 } 297 298 @Override @Test 299 public void optional_spec111_maySupportMultiSubscribe() throws Throwable { 300 publisherVerification.optional_spec111_maySupportMultiSubscribe(); 301 } 302 303 @Override @Test 304 public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne() throws Throwable { 305 publisherVerification.optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne(); 306 } 307 308 @Override @Test 309 public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront() throws Throwable { 310 publisherVerification.optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront(); 311 } 312 313 @Override @Test 314 public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected() throws Throwable { 315 publisherVerification.optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected(); 316 } 317 318 @Override @Test 319 public void required_spec302_mustAllowSynchronousRequestCallsFromOnNextAndOnSubscribe() throws Throwable { 320 publisherVerification.required_spec302_mustAllowSynchronousRequestCallsFromOnNextAndOnSubscribe(); 321 } 322 323 @Override @Test 324 public void required_spec303_mustNotAllowUnboundedRecursion() throws Throwable { 325 publisherVerification.required_spec303_mustNotAllowUnboundedRecursion(); 326 } 327 328 @Override @Test 329 public void untested_spec304_requestShouldNotPerformHeavyComputations() throws Exception { 330 publisherVerification.untested_spec304_requestShouldNotPerformHeavyComputations(); 331 } 332 333 @Override @Test 334 public void untested_spec305_cancelMustNotSynchronouslyPerformHeavyCompuatation() throws Exception { 335 publisherVerification.untested_spec305_cancelMustNotSynchronouslyPerformHeavyCompuatation(); 336 } 337 338 @Override @Test 339 public void required_spec306_afterSubscriptionIsCancelledRequestMustBeNops() throws Throwable { 340 publisherVerification.required_spec306_afterSubscriptionIsCancelledRequestMustBeNops(); 341 } 342 343 @Override @Test 344 public void required_spec307_afterSubscriptionIsCancelledAdditionalCancelationsMustBeNops() throws Throwable { 345 publisherVerification.required_spec307_afterSubscriptionIsCancelledAdditionalCancelationsMustBeNops(); 346 } 347 348 @Override @Test 349 public void required_spec309_requestZeroMustSignalIllegalArgumentException() throws Throwable { 350 publisherVerification.required_spec309_requestZeroMustSignalIllegalArgumentException(); 351 } 352 353 @Override @Test 354 public void required_spec309_requestNegativeNumberMustSignalIllegalArgumentException() throws Throwable { 355 publisherVerification.required_spec309_requestNegativeNumberMustSignalIllegalArgumentException(); 356 } 357 358 @Override @Test 359 public void required_spec312_cancelMustMakeThePublisherToEventuallyStopSignaling() throws Throwable { 360 publisherVerification.required_spec312_cancelMustMakeThePublisherToEventuallyStopSignaling(); 361 } 362 363 @Override @Test 364 public void required_spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber() throws Throwable { 365 publisherVerification.required_spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber(); 366 } 367 368 @Override @Test 369 public void required_spec317_mustSupportAPendingElementCountUpToLongMaxValue() throws Throwable { 370 publisherVerification.required_spec317_mustSupportAPendingElementCountUpToLongMaxValue(); 371 } 372 373 @Override @Test 374 public void required_spec317_mustSupportACumulativePendingElementCountUpToLongMaxValue() throws Throwable { 375 publisherVerification.required_spec317_mustSupportACumulativePendingElementCountUpToLongMaxValue(); 376 } 377 378 @Override @Test 379 public void required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue() throws Throwable { 380 publisherVerification.required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue(); 381 } 382 383 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#1.4 384 // for multiple subscribers 385 @Test 386 public void required_spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError() throws Throwable { 387 optionalMultipleSubscribersTest(2, new Function<Long,TestSetup>() { 388 @Override 389 public TestSetup apply(Long aLong) throws Throwable { 390 return new TestSetup(env, processorBufferSize) {{ 391 final ManualSubscriberWithErrorCollection<T> sub1 = new ManualSubscriberWithErrorCollection<T>(env); 392 env.subscribe(processor, sub1); 393 394 final ManualSubscriberWithErrorCollection<T> sub2 = new ManualSubscriberWithErrorCollection<T>(env); 395 env.subscribe(processor, sub2); 396 397 sub1.request(1); 398 expectRequest(); 399 final T x = sendNextTFromUpstream(); 400 expectNextElement(sub1, x); 401 sub1.request(1); 402 403 // sub1 has received one element, and has one demand pending 404 // sub2 has not yet requested anything 405 406 final Exception ex = new RuntimeException("Test exception"); 407 sendError(ex); 408 sub1.expectError(ex); 409 sub2.expectError(ex); 410 411 env.verifyNoAsyncErrorsNoDelay(); 412 }}; 413 } 414 }); 415 } 416 417 ////////////////////// SUBSCRIBER RULES VERIFICATION /////////////////////////// 418 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#4.1 419 420 // A Processor 421 // must obey all Subscriber rules on its consuming side 422 public Subscriber<T> createSubscriber(final SubscriberWhiteboxVerification.WhiteboxSubscriberProbe<T> probe) { 423 final Processor<T, T> processor = createIdentityProcessor(processorBufferSize); 424 processor.subscribe( 425 new Subscriber<T>() { 426 private final Promise<Subscription> subs = new Promise<Subscription>(env); 427 428 @Override 429 public void onSubscribe(final Subscription subscription) { 430 env.debug(String.format("whiteboxSubscriber::onSubscribe(%s)", subscription)); 431 if (subs.isCompleted()) subscription.cancel(); // the Probe must also pass subscriber verification 432 433 probe.registerOnSubscribe(new SubscriberWhiteboxVerification.SubscriberPuppet() { 434 435 @Override 436 public void triggerRequest(long elements) { 437 subscription.request(elements); 438 } 439 440 @Override 441 public void signalCancel() { 442 subscription.cancel(); 443 } 444 }); 445 } 446 447 @Override 448 public void onNext(T element) { 449 env.debug(String.format("whiteboxSubscriber::onNext(%s)", element)); 450 probe.registerOnNext(element); 451 } 452 453 @Override 454 public void onComplete() { 455 env.debug("whiteboxSubscriber::onComplete()"); 456 probe.registerOnComplete(); 457 } 458 459 @Override 460 public void onError(Throwable cause) { 461 env.debug(String.format("whiteboxSubscriber::onError(%s)", cause)); 462 probe.registerOnError(cause); 463 } 464 }); 465 466 return processor; // we run the SubscriberVerification against this 467 } 468 469 ////////////////////// OTHER RULE VERIFICATION /////////////////////////// 470 471 // A Processor 472 // must immediately pass on `onError` events received from its upstream to its downstream 473 @Test 474 public void mustImmediatelyPassOnOnErrorEventsReceivedFromItsUpstreamToItsDownstream() throws Exception { 475 new TestSetup(env, processorBufferSize) {{ 476 final ManualSubscriberWithErrorCollection<T> sub = new ManualSubscriberWithErrorCollection<T>(env); 477 env.subscribe(processor, sub); 478 479 final Exception ex = new RuntimeException("Test exception"); 480 sendError(ex); 481 sub.expectError(ex); // "immediately", i.e. without a preceding request 482 483 env.verifyNoAsyncErrorsNoDelay(); 484 }}; 485 } 486 487 /////////////////////// DELEGATED TESTS, A PROCESSOR "IS A" SUBSCRIBER ////////////////////// 488 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#4.1 489 490 @Test 491 public void required_exerciseWhiteboxHappyPath() throws Throwable { 492 subscriberVerification.required_exerciseWhiteboxHappyPath(); 493 } 494 495 @Override @Test 496 public void required_spec201_mustSignalDemandViaSubscriptionRequest() throws Throwable { 497 subscriberVerification.required_spec201_mustSignalDemandViaSubscriptionRequest(); 498 } 499 500 @Override @Test 501 public void untested_spec202_shouldAsynchronouslyDispatch() throws Exception { 502 subscriberVerification.untested_spec202_shouldAsynchronouslyDispatch(); 503 } 504 505 @Override @Test 506 public void required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnComplete() throws Throwable { 507 subscriberVerification.required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnComplete(); 508 } 509 510 @Override @Test 511 public void required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnError() throws Throwable { 512 subscriberVerification.required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnError(); 513 } 514 515 @Override @Test 516 public void untested_spec204_mustConsiderTheSubscriptionAsCancelledInAfterRecievingOnCompleteOrOnError() throws Exception { 517 subscriberVerification.untested_spec204_mustConsiderTheSubscriptionAsCancelledInAfterRecievingOnCompleteOrOnError(); 518 } 519 520 @Override @Test 521 public void required_spec205_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal() throws Throwable { 522 subscriberVerification.required_spec205_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal(); 523 } 524 525 @Override @Test 526 public void untested_spec206_mustCallSubscriptionCancelIfItIsNoLongerValid() throws Exception { 527 subscriberVerification.untested_spec206_mustCallSubscriptionCancelIfItIsNoLongerValid(); 528 } 529 530 @Override @Test 531 public void untested_spec207_mustEnsureAllCallsOnItsSubscriptionTakePlaceFromTheSameThreadOrTakeCareOfSynchronization() throws Exception { 532 subscriberVerification.untested_spec207_mustEnsureAllCallsOnItsSubscriptionTakePlaceFromTheSameThreadOrTakeCareOfSynchronization(); 533 } 534 535 @Override @Test 536 public void required_spec208_mustBePreparedToReceiveOnNextSignalsAfterHavingCalledSubscriptionCancel() throws Throwable { 537 subscriberVerification.required_spec208_mustBePreparedToReceiveOnNextSignalsAfterHavingCalledSubscriptionCancel(); 538 } 539 540 @Override @Test 541 public void required_spec209_mustBePreparedToReceiveAnOnCompleteSignalWithPrecedingRequestCall() throws Throwable { 542 subscriberVerification.required_spec209_mustBePreparedToReceiveAnOnCompleteSignalWithPrecedingRequestCall(); 543 } 544 545 @Override @Test 546 public void required_spec209_mustBePreparedToReceiveAnOnCompleteSignalWithoutPrecedingRequestCall() throws Throwable { 547 subscriberVerification.required_spec209_mustBePreparedToReceiveAnOnCompleteSignalWithoutPrecedingRequestCall(); 548 } 549 550 @Override @Test 551 public void required_spec210_mustBePreparedToReceiveAnOnErrorSignalWithPrecedingRequestCall() throws Throwable { 552 subscriberVerification.required_spec210_mustBePreparedToReceiveAnOnErrorSignalWithPrecedingRequestCall(); 553 } 554 555 @Override @Test 556 public void required_spec210_mustBePreparedToReceiveAnOnErrorSignalWithoutPrecedingRequestCall() throws Throwable { 557 subscriberVerification.required_spec210_mustBePreparedToReceiveAnOnErrorSignalWithoutPrecedingRequestCall(); 558 } 559 560 @Override @Test 561 public void untested_spec211_mustMakeSureThatAllCallsOnItsMethodsHappenBeforeTheProcessingOfTheRespectiveEvents() throws Exception { 562 subscriberVerification.untested_spec211_mustMakeSureThatAllCallsOnItsMethodsHappenBeforeTheProcessingOfTheRespectiveEvents(); 563 } 564 565 @Override @Test 566 public void untested_spec212_mustNotCallOnSubscribeMoreThanOnceBasedOnObjectEquality_specViolation() throws Throwable { 567 subscriberVerification.untested_spec212_mustNotCallOnSubscribeMoreThanOnceBasedOnObjectEquality_specViolation(); 568 } 569 570 @Override @Test 571 public void untested_spec213_failingOnSignalInvocation() throws Exception { 572 subscriberVerification.untested_spec213_failingOnSignalInvocation(); 573 } 574 575 @Override @Test 576 public void required_spec213_onSubscribe_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable { 577 subscriberVerification.required_spec213_onSubscribe_mustThrowNullPointerExceptionWhenParametersAreNull(); 578 } 579 @Override @Test 580 public void required_spec213_onNext_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable { 581 subscriberVerification.required_spec213_onNext_mustThrowNullPointerExceptionWhenParametersAreNull(); 582 } 583 @Override @Test 584 public void required_spec213_onError_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable { 585 subscriberVerification.required_spec213_onError_mustThrowNullPointerExceptionWhenParametersAreNull(); 586 } 587 588 @Override @Test 589 public void untested_spec301_mustNotBeCalledOutsideSubscriberContext() throws Exception { 590 subscriberVerification.untested_spec301_mustNotBeCalledOutsideSubscriberContext(); 591 } 592 593 @Override @Test 594 public void required_spec308_requestMustRegisterGivenNumberElementsToBeProduced() throws Throwable { 595 subscriberVerification.required_spec308_requestMustRegisterGivenNumberElementsToBeProduced(); 596 } 597 598 @Override @Test 599 public void untested_spec310_requestMaySynchronouslyCallOnNextOnSubscriber() throws Exception { 600 subscriberVerification.untested_spec310_requestMaySynchronouslyCallOnNextOnSubscriber(); 601 } 602 603 @Override @Test 604 public void untested_spec311_requestMaySynchronouslyCallOnCompleteOrOnError() throws Exception { 605 subscriberVerification.untested_spec311_requestMaySynchronouslyCallOnCompleteOrOnError(); 606 } 607 608 @Override @Test 609 public void untested_spec314_cancelMayCauseThePublisherToShutdownIfNoOtherSubscriptionExists() throws Exception { 610 subscriberVerification.untested_spec314_cancelMayCauseThePublisherToShutdownIfNoOtherSubscriptionExists(); 611 } 612 613 @Override @Test 614 public void untested_spec315_cancelMustNotThrowExceptionAndMustSignalOnError() throws Exception { 615 subscriberVerification.untested_spec315_cancelMustNotThrowExceptionAndMustSignalOnError(); 616 } 617 618 @Override @Test 619 public void untested_spec316_requestMustNotThrowExceptionAndMustOnErrorTheSubscriber() throws Exception { 620 subscriberVerification.untested_spec316_requestMustNotThrowExceptionAndMustOnErrorTheSubscriber(); 621 } 622 623 /////////////////////// ADDITIONAL "COROLLARY" TESTS ////////////////////// 624 625 // A Processor 626 // must trigger `requestFromUpstream` for elements that have been requested 'long ago' 627 @Test 628 public void required_mustRequestFromUpstreamForElementsThatHaveBeenRequestedLongAgo() throws Throwable { 629 optionalMultipleSubscribersTest(2, new Function<Long,TestSetup>() { 630 @Override 631 public TestSetup apply(Long subscribers) throws Throwable { 632 return new TestSetup(env, processorBufferSize) {{ 633 ManualSubscriber<T> sub1 = newSubscriber(); 634 sub1.request(20); 635 636 long totalRequests = expectRequest(); 637 final T x = sendNextTFromUpstream(); 638 expectNextElement(sub1, x); 639 640 if (totalRequests == 1) { 641 totalRequests += expectRequest(); 642 } 643 final T y = sendNextTFromUpstream(); 644 expectNextElement(sub1, y); 645 646 if (totalRequests == 2) { 647 totalRequests += expectRequest(); 648 } 649 650 final ManualSubscriber<T> sub2 = newSubscriber(); 651 652 // sub1 now has 18 pending 653 // sub2 has 0 pending 654 655 final T z = sendNextTFromUpstream(); 656 expectNextElement(sub1, z); 657 sub2.expectNone(); // since sub2 hasn't requested anything yet 658 659 sub2.request(1); 660 expectNextElement(sub2, z); 661 662 if (totalRequests == 3) { 663 expectRequest(); 664 } 665 666 // to avoid error messages during test harness shutdown 667 sendCompletion(); 668 sub1.expectCompletion(env.defaultTimeoutMillis()); 669 sub2.expectCompletion(env.defaultTimeoutMillis()); 670 671 env.verifyNoAsyncErrorsNoDelay(); 672 }}; 673 } 674 }); 675 } 676 677 /////////////////////// TEST INFRASTRUCTURE ////////////////////// 678 679 public void notVerified() { 680 publisherVerification.notVerified(); 681 } 682 683 public void notVerified(String message) { 684 publisherVerification.notVerified(message); 685 } 686 687 /** 688 * Test for feature that REQUIRES multiple subscribers to be supported by Publisher. 689 */ 690 public void optionalMultipleSubscribersTest(long requiredSubscribersSupport, Function<Long, TestSetup> body) throws Throwable { 691 if (requiredSubscribersSupport > maxSupportedSubscribers()) 692 notVerified(String.format("The Publisher under test only supports %d subscribers, while this test requires at least %d to run.", 693 maxSupportedSubscribers(), requiredSubscribersSupport)); 694 else body.apply(requiredSubscribersSupport); 695 } 696 697 public abstract class TestSetup extends ManualPublisher<T> { 698 final private ManualSubscriber<T> tees; // gives us access to an infinite stream of T values 699 private Set<T> seenTees = new HashSet<T>(); 700 701 final Processor<T, T> processor; 702 703 public TestSetup(TestEnvironment env, int testBufferSize) throws InterruptedException { 704 super(env); 705 tees = env.newManualSubscriber(createHelperPublisher(Long.MAX_VALUE)); 706 processor = createIdentityProcessor(testBufferSize); 707 subscribe(processor); 708 } 709 710 public ManualSubscriber<T> newSubscriber() throws InterruptedException { 711 return env.newManualSubscriber(processor); 712 } 713 714 public T nextT() throws InterruptedException { 715 final T t = tees.requestNextElement(); 716 if (seenTees.contains(t)) { 717 env.flop(String.format("Helper publisher illegally produced the same element %s twice", t)); 718 } 719 seenTees.add(t); 720 return t; 721 } 722 723 public void expectNextElement(ManualSubscriber<T> sub, T expected) throws InterruptedException { 724 final T elem = sub.nextElement(String.format("timeout while awaiting %s", expected)); 725 if (!elem.equals(expected)) { 726 env.flop(String.format("Received `onNext(%s)` on downstream but expected `onNext(%s)`", elem, expected)); 727 } 728 } 729 730 public T sendNextTFromUpstream() throws InterruptedException { 731 final T x = nextT(); 732 sendNext(x); 733 return x; 734 } 735 } 736 737 public class ManualSubscriberWithErrorCollection<A> extends ManualSubscriberWithSubscriptionSupport<A> { 738 Promise<Throwable> error; 739 740 public ManualSubscriberWithErrorCollection(TestEnvironment env) { 741 super(env); 742 error = new Promise<Throwable>(env); 743 } 744 745 @Override 746 public void onError(Throwable cause) { 747 error.complete(cause); 748 } 749 750 public void expectError(Throwable cause) throws InterruptedException { 751 expectError(cause, env.defaultTimeoutMillis()); 752 } 753 754 @SuppressWarnings("ThrowableResultOfMethodCallIgnored") 755 public void expectError(Throwable cause, long timeoutMillis) throws InterruptedException { 756 error.expectCompletion(timeoutMillis, "Did not receive expected error on downstream"); 757 if (!cause.equals(error.value())) { 758 env.flop(String.format("Expected error %s but got %s", cause, error.value())); 759 } 760 } 761 } 762}