001/************************************************************************ 002 * Licensed under Public Domain (CC0) * 003 * * 004 * To the extent possible under law, the person who associated CC0 with * 005 * this code has waived all copyright and related or neighboring * 006 * rights to this code. * 007 * * 008 * You should have received a copy of the CC0 legalcode along with this * 009 * work. If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.* 010 ************************************************************************/ 011 012package org.reactivestreams.tck; 013 014import org.reactivestreams.Processor; 015import org.reactivestreams.Publisher; 016import org.reactivestreams.Subscriber; 017import org.reactivestreams.Subscription; 018import org.reactivestreams.tck.TestEnvironment.ManualPublisher; 019import org.reactivestreams.tck.TestEnvironment.ManualSubscriber; 020import org.reactivestreams.tck.TestEnvironment.ManualSubscriberWithSubscriptionSupport; 021import org.reactivestreams.tck.TestEnvironment.Promise; 022import org.reactivestreams.tck.support.Function; 023import org.reactivestreams.tck.support.SubscriberWhiteboxVerificationRules; 024import org.reactivestreams.tck.support.PublisherVerificationRules; 025import org.testng.annotations.BeforeMethod; 026import org.testng.annotations.Test; 027 028import java.util.HashSet; 029import java.util.Set; 030 031public abstract class IdentityProcessorVerification<T> extends WithHelperPublisher<T> 032 implements SubscriberWhiteboxVerificationRules, PublisherVerificationRules { 033 034 private final TestEnvironment env; 035 036 ////////////////////// DELEGATED TO SPECS ////////////////////// 037 038 // for delegating tests 039 private final SubscriberWhiteboxVerification<T> subscriberVerification; 040 041 // for delegating tests 042 private final PublisherVerification<T> publisherVerification; 043 044 ////////////////// END OF DELEGATED TO SPECS ////////////////// 045 046 // number of elements the processor under test must be able ot buffer, 047 // without dropping elements. Defaults to `TestEnvironment.TEST_BUFFER_SIZE`. 048 private final int processorBufferSize; 049 050 /** 051 * Test class must specify the expected time it takes for the publisher to 052 * shut itself down when the the last downstream {@code Subscription} is cancelled. 053 * 054 * The processor will be required to be able to buffer {@code TestEnvironment.TEST_BUFFER_SIZE} elements. 055 */ 056 @SuppressWarnings("unused") 057 public IdentityProcessorVerification(final TestEnvironment env) { 058 this(env, PublisherVerification.envPublisherReferenceGCTimeoutMillis(), TestEnvironment.TEST_BUFFER_SIZE); 059 } 060 061 /** 062 * Test class must specify the expected time it takes for the publisher to 063 * shut itself down when the the last downstream {@code Subscription} is cancelled. 064 * 065 * The processor will be required to be able to buffer {@code TestEnvironment.TEST_BUFFER_SIZE} elements. 066 * 067 * @param publisherReferenceGCTimeoutMillis used to determine after how much time a reference to a Subscriber should be already dropped by the Publisher. 068 */ 069 @SuppressWarnings("unused") 070 public IdentityProcessorVerification(final TestEnvironment env, long publisherReferenceGCTimeoutMillis) { 071 this(env, publisherReferenceGCTimeoutMillis, TestEnvironment.TEST_BUFFER_SIZE); 072 } 073 074 /** 075 * Test class must specify the expected time it takes for the publisher to 076 * shut itself down when the the last downstream {@code Subscription} is cancelled. 077 * 078 * @param publisherReferenceGCTimeoutMillis used to determine after how much time a reference to a Subscriber should be already dropped by the Publisher. 079 * @param processorBufferSize number of elements the processor is required to be able to buffer. 080 */ 081 public IdentityProcessorVerification(final TestEnvironment env, long publisherReferenceGCTimeoutMillis, int processorBufferSize) { 082 this.env = env; 083 this.processorBufferSize = processorBufferSize; 084 085 this.subscriberVerification = new SubscriberWhiteboxVerification<T>(env) { 086 @Override 087 public Subscriber<T> createSubscriber(WhiteboxSubscriberProbe<T> probe) { 088 return IdentityProcessorVerification.this.createSubscriber(probe); 089 } 090 091 @Override public T createElement(int element) { 092 return IdentityProcessorVerification.this.createElement(element); 093 } 094 095 @Override 096 public Publisher<T> createHelperPublisher(long elements) { 097 return IdentityProcessorVerification.this.createHelperPublisher(elements); 098 } 099 }; 100 101 publisherVerification = new PublisherVerification<T>(env, publisherReferenceGCTimeoutMillis) { 102 @Override 103 public Publisher<T> createPublisher(long elements) { 104 return IdentityProcessorVerification.this.createPublisher(elements); 105 } 106 107 @Override 108 public Publisher<T> createFailedPublisher() { 109 return IdentityProcessorVerification.this.createFailedPublisher(); 110 } 111 112 @Override 113 public long maxElementsFromPublisher() { 114 return IdentityProcessorVerification.this.maxElementsFromPublisher(); 115 } 116 117 @Override 118 public long boundedDepthOfOnNextAndRequestRecursion() { 119 return IdentityProcessorVerification.this.boundedDepthOfOnNextAndRequestRecursion(); 120 } 121 122 @Override 123 public boolean skipStochasticTests() { 124 return IdentityProcessorVerification.this.skipStochasticTests(); 125 } 126 }; 127 } 128 129 /** 130 * This is the main method you must implement in your test incarnation. 131 * It must create a Publisher, which simply forwards all stream elements from its upstream 132 * to its downstream. It must be able to internally buffer the given number of elements. 133 * 134 * @param bufferSize number of elements the processor is required to be able to buffer. 135 */ 136 public abstract Processor<T, T> createIdentityProcessor(int bufferSize); 137 138 /** 139 * By implementing this method, additional TCK tests concerning a "failed" publishers will be run. 140 * 141 * The expected behaviour of the {@link Publisher} returned by this method is hand out a subscription, 142 * followed by signalling {@code onError} on it, as specified by Rule 1.9. 143 * 144 * If you ignore these additional tests, return {@code null} from this method. 145 */ 146 public abstract Publisher<T> createFailedPublisher(); 147 148 /** 149 * Override and return lower value if your Publisher is only able to produce a known number of elements. 150 * For example, if it is designed to return at-most-one element, return {@code 1} from this method. 151 * 152 * Defaults to {@code Long.MAX_VALUE - 1}, meaning that the Publisher can be produce a huge but NOT an unbounded number of elements. 153 * 154 * To mark your Publisher will *never* signal an {@code onComplete} override this method and return {@code Long.MAX_VALUE}, 155 * which will result in *skipping all tests which require an onComplete to be triggered* (!). 156 */ 157 public long maxElementsFromPublisher() { 158 return Long.MAX_VALUE - 1; 159 } 160 161 /** 162 * In order to verify rule 3.3 of the reactive streams spec, this number will be used to check if a 163 * {@code Subscription} actually solves the "unbounded recursion" problem by not allowing the number of 164 * recursive calls to exceed the number returned by this method. 165 * 166 * @see <a href="https://github.com/reactive-streams/reactive-streams-jvm#3.3">reactive streams spec, rule 3.3</a> 167 * @see PublisherVerification#required_spec303_mustNotAllowUnboundedRecursion() 168 */ 169 public long boundedDepthOfOnNextAndRequestRecursion() { 170 return 1; 171 } 172 173 /** 174 * Override and return {@code true} in order to skip executing tests marked as {@code Stochastic}. 175 * Such tests MAY sometimes fail even though the impl 176 */ 177 public boolean skipStochasticTests() { 178 return false; 179 } 180 181 /** 182 * Describes the tested implementation in terms of how many subscribers they can support. 183 * Some tests require the {@code Publisher} under test to support multiple Subscribers, 184 * yet the spec does not require all publishers to be able to do so, thus – if an implementation 185 * supports only a limited number of subscribers (e.g. only 1 subscriber, also known as "no fanout") 186 * you MUST return that number from this method by overriding it. 187 */ 188 public long maxSupportedSubscribers() { 189 return Long.MAX_VALUE; 190 } 191 192 ////////////////////// TEST ENV CLEANUP ///////////////////////////////////// 193 194 @BeforeMethod 195 public void setUp() throws Exception { 196 publisherVerification.setUp(); 197 subscriberVerification.setUp(); 198 } 199 200 ////////////////////// PUBLISHER RULES VERIFICATION /////////////////////////// 201 202 // A Processor 203 // must obey all Publisher rules on its publishing side 204 public Publisher<T> createPublisher(long elements) { 205 final Processor<T, T> processor = createIdentityProcessor(processorBufferSize); 206 final Publisher<T> pub = createHelperPublisher(elements); 207 pub.subscribe(processor); 208 return processor; // we run the PublisherVerification against this 209 } 210 211 @Override @Test 212 public void required_validate_maxElementsFromPublisher() throws Exception { 213 publisherVerification.required_validate_maxElementsFromPublisher(); 214 } 215 216 @Override @Test 217 public void required_validate_boundedDepthOfOnNextAndRequestRecursion() throws Exception { 218 publisherVerification.required_validate_boundedDepthOfOnNextAndRequestRecursion(); 219 } 220 221 /////////////////////// DELEGATED TESTS, A PROCESSOR "IS A" PUBLISHER ////////////////////// 222 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#4.1 223 224 @Test 225 public void required_createPublisher1MustProduceAStreamOfExactly1Element() throws Throwable { 226 publisherVerification.required_createPublisher1MustProduceAStreamOfExactly1Element(); 227 } 228 229 @Test 230 public void required_createPublisher3MustProduceAStreamOfExactly3Elements() throws Throwable { 231 publisherVerification.required_createPublisher3MustProduceAStreamOfExactly3Elements(); 232 } 233 234 @Override @Test 235 public void required_spec101_subscriptionRequestMustResultInTheCorrectNumberOfProducedElements() throws Throwable { 236 publisherVerification.required_spec101_subscriptionRequestMustResultInTheCorrectNumberOfProducedElements(); 237 } 238 239 @Override @Test 240 public void required_spec102_maySignalLessThanRequestedAndTerminateSubscription() throws Throwable { 241 publisherVerification.required_spec102_maySignalLessThanRequestedAndTerminateSubscription(); 242 } 243 244 @Override @Test 245 public void stochastic_spec103_mustSignalOnMethodsSequentially() throws Throwable { 246 publisherVerification.stochastic_spec103_mustSignalOnMethodsSequentially(); 247 } 248 249 @Override @Test 250 public void optional_spec104_mustSignalOnErrorWhenFails() throws Throwable { 251 publisherVerification.optional_spec104_mustSignalOnErrorWhenFails(); 252 } 253 254 @Override @Test 255 public void required_spec105_mustSignalOnCompleteWhenFiniteStreamTerminates() throws Throwable { 256 publisherVerification.required_spec105_mustSignalOnCompleteWhenFiniteStreamTerminates(); 257 } 258 259 @Override @Test 260 public void optional_spec105_emptyStreamMustTerminateBySignallingOnComplete() throws Throwable { 261 publisherVerification.optional_spec105_emptyStreamMustTerminateBySignallingOnComplete(); 262 } 263 264 @Override @Test 265 public void untested_spec106_mustConsiderSubscriptionCancelledAfterOnErrorOrOnCompleteHasBeenCalled() throws Throwable { 266 publisherVerification.untested_spec106_mustConsiderSubscriptionCancelledAfterOnErrorOrOnCompleteHasBeenCalled(); 267 } 268 269 @Override @Test 270 public void required_spec107_mustNotEmitFurtherSignalsOnceOnCompleteHasBeenSignalled() throws Throwable { 271 publisherVerification.required_spec107_mustNotEmitFurtherSignalsOnceOnCompleteHasBeenSignalled(); 272 } 273 274 @Override @Test 275 public void untested_spec107_mustNotEmitFurtherSignalsOnceOnErrorHasBeenSignalled() throws Throwable { 276 publisherVerification.untested_spec107_mustNotEmitFurtherSignalsOnceOnErrorHasBeenSignalled(); 277 } 278 279 @Override @Test 280 public void untested_spec108_possiblyCanceledSubscriptionShouldNotReceiveOnErrorOrOnCompleteSignals() throws Throwable { 281 publisherVerification.untested_spec108_possiblyCanceledSubscriptionShouldNotReceiveOnErrorOrOnCompleteSignals(); 282 } 283 284 @Override @Test 285 public void untested_spec109_subscribeShouldNotThrowNonFatalThrowable() throws Throwable { 286 publisherVerification.untested_spec109_subscribeShouldNotThrowNonFatalThrowable(); 287 } 288 289 @Override @Test 290 public void required_spec109_subscribeThrowNPEOnNullSubscriber() throws Throwable { 291 publisherVerification.required_spec109_subscribeThrowNPEOnNullSubscriber(); 292 } 293 294 @Override @Test 295 public void required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorAfterOnSubscribe() throws Throwable { 296 publisherVerification.required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorAfterOnSubscribe(); 297 } 298 299 @Override @Test 300 public void required_spec109_mustIssueOnSubscribeForNonNullSubscriber() throws Throwable { 301 publisherVerification.required_spec109_mustIssueOnSubscribeForNonNullSubscriber(); 302 } 303 304 @Override @Test 305 public void untested_spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice() throws Throwable { 306 publisherVerification.untested_spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice(); 307 } 308 309 @Override @Test 310 public void optional_spec111_maySupportMultiSubscribe() throws Throwable { 311 publisherVerification.optional_spec111_maySupportMultiSubscribe(); 312 } 313 314 @Override @Test 315 public void optional_spec111_registeredSubscribersMustReceiveOnNextOrOnCompleteSignals() throws Throwable { 316 publisherVerification.optional_spec111_registeredSubscribersMustReceiveOnNextOrOnCompleteSignals(); 317 } 318 319 @Override @Test 320 public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne() throws Throwable { 321 publisherVerification.optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne(); 322 } 323 324 @Override @Test 325 public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront() throws Throwable { 326 publisherVerification.optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront(); 327 } 328 329 @Override @Test 330 public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected() throws Throwable { 331 publisherVerification.optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected(); 332 } 333 334 @Override @Test 335 public void required_spec302_mustAllowSynchronousRequestCallsFromOnNextAndOnSubscribe() throws Throwable { 336 publisherVerification.required_spec302_mustAllowSynchronousRequestCallsFromOnNextAndOnSubscribe(); 337 } 338 339 @Override @Test 340 public void required_spec303_mustNotAllowUnboundedRecursion() throws Throwable { 341 publisherVerification.required_spec303_mustNotAllowUnboundedRecursion(); 342 } 343 344 @Override @Test 345 public void untested_spec304_requestShouldNotPerformHeavyComputations() throws Exception { 346 publisherVerification.untested_spec304_requestShouldNotPerformHeavyComputations(); 347 } 348 349 @Override @Test 350 public void untested_spec305_cancelMustNotSynchronouslyPerformHeavyComputation() throws Exception { 351 publisherVerification.untested_spec305_cancelMustNotSynchronouslyPerformHeavyComputation(); 352 } 353 354 @Override @Test 355 public void required_spec306_afterSubscriptionIsCancelledRequestMustBeNops() throws Throwable { 356 publisherVerification.required_spec306_afterSubscriptionIsCancelledRequestMustBeNops(); 357 } 358 359 @Override @Test 360 public void required_spec307_afterSubscriptionIsCancelledAdditionalCancelationsMustBeNops() throws Throwable { 361 publisherVerification.required_spec307_afterSubscriptionIsCancelledAdditionalCancelationsMustBeNops(); 362 } 363 364 @Override @Test 365 public void required_spec309_requestZeroMustSignalIllegalArgumentException() throws Throwable { 366 publisherVerification.required_spec309_requestZeroMustSignalIllegalArgumentException(); 367 } 368 369 @Override @Test 370 public void required_spec309_requestNegativeNumberMustSignalIllegalArgumentException() throws Throwable { 371 publisherVerification.required_spec309_requestNegativeNumberMustSignalIllegalArgumentException(); 372 } 373 374 @Override @Test 375 public void optional_spec309_requestNegativeNumberMaySignalIllegalArgumentExceptionWithSpecificMessage() throws Throwable { 376 publisherVerification.optional_spec309_requestNegativeNumberMaySignalIllegalArgumentExceptionWithSpecificMessage(); 377 } 378 379 @Override @Test 380 public void required_spec312_cancelMustMakeThePublisherToEventuallyStopSignaling() throws Throwable { 381 publisherVerification.required_spec312_cancelMustMakeThePublisherToEventuallyStopSignaling(); 382 } 383 384 @Override @Test 385 public void required_spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber() throws Throwable { 386 publisherVerification.required_spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber(); 387 } 388 389 @Override @Test 390 public void required_spec317_mustSupportAPendingElementCountUpToLongMaxValue() throws Throwable { 391 publisherVerification.required_spec317_mustSupportAPendingElementCountUpToLongMaxValue(); 392 } 393 394 @Override @Test 395 public void required_spec317_mustSupportACumulativePendingElementCountUpToLongMaxValue() throws Throwable { 396 publisherVerification.required_spec317_mustSupportACumulativePendingElementCountUpToLongMaxValue(); 397 } 398 399 @Override @Test 400 public void required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue() throws Throwable { 401 publisherVerification.required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue(); 402 } 403 404 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#1.4 405 // for multiple subscribers 406 @Test 407 public void required_spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError() throws Throwable { 408 optionalMultipleSubscribersTest(2, new Function<Long,TestSetup>() { 409 @Override 410 public TestSetup apply(Long aLong) throws Throwable { 411 return new TestSetup(env, processorBufferSize) {{ 412 final ManualSubscriberWithErrorCollection<T> sub1 = new ManualSubscriberWithErrorCollection<T>(env); 413 env.subscribe(processor, sub1); 414 415 final ManualSubscriberWithErrorCollection<T> sub2 = new ManualSubscriberWithErrorCollection<T>(env); 416 env.subscribe(processor, sub2); 417 418 sub1.request(1); 419 expectRequest(); 420 final T x = sendNextTFromUpstream(); 421 expectNextElement(sub1, x); 422 sub1.request(1); 423 424 // sub1 has received one element, and has one demand pending 425 // sub2 has not yet requested anything 426 427 final Exception ex = new RuntimeException("Test exception"); 428 sendError(ex); 429 sub1.expectError(ex); 430 sub2.expectError(ex); 431 432 env.verifyNoAsyncErrorsNoDelay(); 433 }}; 434 } 435 }); 436 } 437 438 ////////////////////// SUBSCRIBER RULES VERIFICATION /////////////////////////// 439 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#4.1 440 441 // A Processor 442 // must obey all Subscriber rules on its consuming side 443 public Subscriber<T> createSubscriber(final SubscriberWhiteboxVerification.WhiteboxSubscriberProbe<T> probe) { 444 final Processor<T, T> processor = createIdentityProcessor(processorBufferSize); 445 processor.subscribe( 446 new Subscriber<T>() { 447 private final Promise<Subscription> subs = new Promise<Subscription>(env); 448 449 @Override 450 public void onSubscribe(final Subscription subscription) { 451 env.debug(String.format("whiteboxSubscriber::onSubscribe(%s)", subscription)); 452 if (subs.isCompleted()) subscription.cancel(); // the Probe must also pass subscriber verification 453 454 probe.registerOnSubscribe(new SubscriberWhiteboxVerification.SubscriberPuppet() { 455 456 @Override 457 public void triggerRequest(long elements) { 458 subscription.request(elements); 459 } 460 461 @Override 462 public void signalCancel() { 463 subscription.cancel(); 464 } 465 }); 466 } 467 468 @Override 469 public void onNext(T element) { 470 env.debug(String.format("whiteboxSubscriber::onNext(%s)", element)); 471 probe.registerOnNext(element); 472 } 473 474 @Override 475 public void onComplete() { 476 env.debug("whiteboxSubscriber::onComplete()"); 477 probe.registerOnComplete(); 478 } 479 480 @Override 481 public void onError(Throwable cause) { 482 env.debug(String.format("whiteboxSubscriber::onError(%s)", cause)); 483 probe.registerOnError(cause); 484 } 485 }); 486 487 return processor; // we run the SubscriberVerification against this 488 } 489 490 ////////////////////// OTHER RULE VERIFICATION /////////////////////////// 491 492 // A Processor 493 // must immediately pass on `onError` events received from its upstream to its downstream 494 @Test 495 public void mustImmediatelyPassOnOnErrorEventsReceivedFromItsUpstreamToItsDownstream() throws Exception { 496 new TestSetup(env, processorBufferSize) {{ 497 final ManualSubscriberWithErrorCollection<T> sub = new ManualSubscriberWithErrorCollection<T>(env); 498 env.subscribe(processor, sub); 499 500 final Exception ex = new RuntimeException("Test exception"); 501 sendError(ex); 502 sub.expectError(ex); // "immediately", i.e. without a preceding request 503 504 env.verifyNoAsyncErrorsNoDelay(); 505 }}; 506 } 507 508 /////////////////////// DELEGATED TESTS, A PROCESSOR "IS A" SUBSCRIBER ////////////////////// 509 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#4.1 510 511 @Test 512 public void required_exerciseWhiteboxHappyPath() throws Throwable { 513 subscriberVerification.required_exerciseWhiteboxHappyPath(); 514 } 515 516 @Override @Test 517 public void required_spec201_mustSignalDemandViaSubscriptionRequest() throws Throwable { 518 subscriberVerification.required_spec201_mustSignalDemandViaSubscriptionRequest(); 519 } 520 521 @Override @Test 522 public void untested_spec202_shouldAsynchronouslyDispatch() throws Exception { 523 subscriberVerification.untested_spec202_shouldAsynchronouslyDispatch(); 524 } 525 526 @Override @Test 527 public void required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnComplete() throws Throwable { 528 subscriberVerification.required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnComplete(); 529 } 530 531 @Override @Test 532 public void required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnError() throws Throwable { 533 subscriberVerification.required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnError(); 534 } 535 536 @Override @Test 537 public void untested_spec204_mustConsiderTheSubscriptionAsCancelledInAfterRecievingOnCompleteOrOnError() throws Exception { 538 subscriberVerification.untested_spec204_mustConsiderTheSubscriptionAsCancelledInAfterRecievingOnCompleteOrOnError(); 539 } 540 541 @Override @Test 542 public void required_spec205_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal() throws Throwable { 543 subscriberVerification.required_spec205_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal(); 544 } 545 546 @Override @Test 547 public void untested_spec206_mustCallSubscriptionCancelIfItIsNoLongerValid() throws Exception { 548 subscriberVerification.untested_spec206_mustCallSubscriptionCancelIfItIsNoLongerValid(); 549 } 550 551 @Override @Test 552 public void untested_spec207_mustEnsureAllCallsOnItsSubscriptionTakePlaceFromTheSameThreadOrTakeCareOfSynchronization() throws Exception { 553 subscriberVerification.untested_spec207_mustEnsureAllCallsOnItsSubscriptionTakePlaceFromTheSameThreadOrTakeCareOfSynchronization(); 554 } 555 556 @Override @Test 557 public void required_spec208_mustBePreparedToReceiveOnNextSignalsAfterHavingCalledSubscriptionCancel() throws Throwable { 558 subscriberVerification.required_spec208_mustBePreparedToReceiveOnNextSignalsAfterHavingCalledSubscriptionCancel(); 559 } 560 561 @Override @Test 562 public void required_spec209_mustBePreparedToReceiveAnOnCompleteSignalWithPrecedingRequestCall() throws Throwable { 563 subscriberVerification.required_spec209_mustBePreparedToReceiveAnOnCompleteSignalWithPrecedingRequestCall(); 564 } 565 566 @Override @Test 567 public void required_spec209_mustBePreparedToReceiveAnOnCompleteSignalWithoutPrecedingRequestCall() throws Throwable { 568 subscriberVerification.required_spec209_mustBePreparedToReceiveAnOnCompleteSignalWithoutPrecedingRequestCall(); 569 } 570 571 @Override @Test 572 public void required_spec210_mustBePreparedToReceiveAnOnErrorSignalWithPrecedingRequestCall() throws Throwable { 573 subscriberVerification.required_spec210_mustBePreparedToReceiveAnOnErrorSignalWithPrecedingRequestCall(); 574 } 575 576 @Override @Test 577 public void required_spec210_mustBePreparedToReceiveAnOnErrorSignalWithoutPrecedingRequestCall() throws Throwable { 578 subscriberVerification.required_spec210_mustBePreparedToReceiveAnOnErrorSignalWithoutPrecedingRequestCall(); 579 } 580 581 @Override @Test 582 public void untested_spec211_mustMakeSureThatAllCallsOnItsMethodsHappenBeforeTheProcessingOfTheRespectiveEvents() throws Exception { 583 subscriberVerification.untested_spec211_mustMakeSureThatAllCallsOnItsMethodsHappenBeforeTheProcessingOfTheRespectiveEvents(); 584 } 585 586 @Override @Test 587 public void untested_spec212_mustNotCallOnSubscribeMoreThanOnceBasedOnObjectEquality_specViolation() throws Throwable { 588 subscriberVerification.untested_spec212_mustNotCallOnSubscribeMoreThanOnceBasedOnObjectEquality_specViolation(); 589 } 590 591 @Override @Test 592 public void untested_spec213_failingOnSignalInvocation() throws Exception { 593 subscriberVerification.untested_spec213_failingOnSignalInvocation(); 594 } 595 596 @Override @Test 597 public void required_spec213_onSubscribe_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable { 598 subscriberVerification.required_spec213_onSubscribe_mustThrowNullPointerExceptionWhenParametersAreNull(); 599 } 600 @Override @Test 601 public void required_spec213_onNext_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable { 602 subscriberVerification.required_spec213_onNext_mustThrowNullPointerExceptionWhenParametersAreNull(); 603 } 604 @Override @Test 605 public void required_spec213_onError_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable { 606 subscriberVerification.required_spec213_onError_mustThrowNullPointerExceptionWhenParametersAreNull(); 607 } 608 609 @Override @Test 610 public void untested_spec301_mustNotBeCalledOutsideSubscriberContext() throws Exception { 611 subscriberVerification.untested_spec301_mustNotBeCalledOutsideSubscriberContext(); 612 } 613 614 @Override @Test 615 public void required_spec308_requestMustRegisterGivenNumberElementsToBeProduced() throws Throwable { 616 subscriberVerification.required_spec308_requestMustRegisterGivenNumberElementsToBeProduced(); 617 } 618 619 @Override @Test 620 public void untested_spec310_requestMaySynchronouslyCallOnNextOnSubscriber() throws Exception { 621 subscriberVerification.untested_spec310_requestMaySynchronouslyCallOnNextOnSubscriber(); 622 } 623 624 @Override @Test 625 public void untested_spec311_requestMaySynchronouslyCallOnCompleteOrOnError() throws Exception { 626 subscriberVerification.untested_spec311_requestMaySynchronouslyCallOnCompleteOrOnError(); 627 } 628 629 @Override @Test 630 public void untested_spec314_cancelMayCauseThePublisherToShutdownIfNoOtherSubscriptionExists() throws Exception { 631 subscriberVerification.untested_spec314_cancelMayCauseThePublisherToShutdownIfNoOtherSubscriptionExists(); 632 } 633 634 @Override @Test 635 public void untested_spec315_cancelMustNotThrowExceptionAndMustSignalOnError() throws Exception { 636 subscriberVerification.untested_spec315_cancelMustNotThrowExceptionAndMustSignalOnError(); 637 } 638 639 @Override @Test 640 public void untested_spec316_requestMustNotThrowExceptionAndMustOnErrorTheSubscriber() throws Exception { 641 subscriberVerification.untested_spec316_requestMustNotThrowExceptionAndMustOnErrorTheSubscriber(); 642 } 643 644 /////////////////////// ADDITIONAL "COROLLARY" TESTS ////////////////////// 645 646 // A Processor 647 // must trigger `requestFromUpstream` for elements that have been requested 'long ago' 648 @Test 649 public void required_mustRequestFromUpstreamForElementsThatHaveBeenRequestedLongAgo() throws Throwable { 650 optionalMultipleSubscribersTest(2, new Function<Long,TestSetup>() { 651 @Override 652 public TestSetup apply(Long subscribers) throws Throwable { 653 return new TestSetup(env, processorBufferSize) {{ 654 ManualSubscriber<T> sub1 = newSubscriber(); 655 sub1.request(20); 656 657 long totalRequests = expectRequest(); 658 final T x = sendNextTFromUpstream(); 659 expectNextElement(sub1, x); 660 661 if (totalRequests == 1) { 662 totalRequests += expectRequest(); 663 } 664 final T y = sendNextTFromUpstream(); 665 expectNextElement(sub1, y); 666 667 if (totalRequests == 2) { 668 totalRequests += expectRequest(); 669 } 670 671 final ManualSubscriber<T> sub2 = newSubscriber(); 672 673 // sub1 now has 18 pending 674 // sub2 has 0 pending 675 676 final T z = sendNextTFromUpstream(); 677 expectNextElement(sub1, z); 678 sub2.expectNone(); // since sub2 hasn't requested anything yet 679 680 sub2.request(1); 681 expectNextElement(sub2, z); 682 683 if (totalRequests == 3) { 684 expectRequest(); 685 } 686 687 // to avoid error messages during test harness shutdown 688 sendCompletion(); 689 sub1.expectCompletion(env.defaultTimeoutMillis()); 690 sub2.expectCompletion(env.defaultTimeoutMillis()); 691 692 env.verifyNoAsyncErrorsNoDelay(); 693 }}; 694 } 695 }); 696 } 697 698 /////////////////////// TEST INFRASTRUCTURE ////////////////////// 699 700 public void notVerified() { 701 publisherVerification.notVerified(); 702 } 703 704 public void notVerified(String message) { 705 publisherVerification.notVerified(message); 706 } 707 708 /** 709 * Test for feature that REQUIRES multiple subscribers to be supported by Publisher. 710 */ 711 public void optionalMultipleSubscribersTest(long requiredSubscribersSupport, Function<Long, TestSetup> body) throws Throwable { 712 if (requiredSubscribersSupport > maxSupportedSubscribers()) 713 notVerified(String.format("The Publisher under test only supports %d subscribers, while this test requires at least %d to run.", 714 maxSupportedSubscribers(), requiredSubscribersSupport)); 715 else body.apply(requiredSubscribersSupport); 716 } 717 718 public abstract class TestSetup extends ManualPublisher<T> { 719 final private ManualSubscriber<T> tees; // gives us access to an infinite stream of T values 720 private Set<T> seenTees = new HashSet<T>(); 721 722 final Processor<T, T> processor; 723 724 public TestSetup(TestEnvironment env, int testBufferSize) throws InterruptedException { 725 super(env); 726 tees = env.newManualSubscriber(createHelperPublisher(Long.MAX_VALUE)); 727 processor = createIdentityProcessor(testBufferSize); 728 subscribe(processor); 729 } 730 731 public ManualSubscriber<T> newSubscriber() throws InterruptedException { 732 return env.newManualSubscriber(processor); 733 } 734 735 public T nextT() throws InterruptedException { 736 final T t = tees.requestNextElement(); 737 if (seenTees.contains(t)) { 738 env.flop(String.format("Helper publisher illegally produced the same element %s twice", t)); 739 } 740 seenTees.add(t); 741 return t; 742 } 743 744 public void expectNextElement(ManualSubscriber<T> sub, T expected) throws InterruptedException { 745 final T elem = sub.nextElement(String.format("timeout while awaiting %s", expected)); 746 if (!elem.equals(expected)) { 747 env.flop(String.format("Received `onNext(%s)` on downstream but expected `onNext(%s)`", elem, expected)); 748 } 749 } 750 751 public T sendNextTFromUpstream() throws InterruptedException { 752 final T x = nextT(); 753 sendNext(x); 754 return x; 755 } 756 } 757 758 public class ManualSubscriberWithErrorCollection<A> extends ManualSubscriberWithSubscriptionSupport<A> { 759 Promise<Throwable> error; 760 761 public ManualSubscriberWithErrorCollection(TestEnvironment env) { 762 super(env); 763 error = new Promise<Throwable>(env); 764 } 765 766 @Override 767 public void onError(Throwable cause) { 768 error.complete(cause); 769 } 770 771 public void expectError(Throwable cause) throws InterruptedException { 772 expectError(cause, env.defaultTimeoutMillis()); 773 } 774 775 @SuppressWarnings("ThrowableResultOfMethodCallIgnored") 776 public void expectError(Throwable cause, long timeoutMillis) throws InterruptedException { 777 error.expectCompletion(timeoutMillis, "Did not receive expected error on downstream"); 778 if (!cause.equals(error.value())) { 779 env.flop(String.format("Expected error %s but got %s", cause, error.value())); 780 } 781 } 782 } 783}