001/************************************************************************ 002 * Licensed under Public Domain (CC0) * 003 * * 004 * To the extent possible under law, the person who associated CC0 with * 005 * this code has waived all copyright and related or neighboring * 006 * rights to this code. * 007 * * 008 * You should have received a copy of the CC0 legalcode along with this * 009 * work. If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.* 010 ************************************************************************/ 011 012package org.reactivestreams.tck; 013 014import org.reactivestreams.Processor; 015import org.reactivestreams.Publisher; 016import org.reactivestreams.Subscriber; 017import org.reactivestreams.Subscription; 018import org.reactivestreams.tck.TestEnvironment.ManualPublisher; 019import org.reactivestreams.tck.TestEnvironment.ManualSubscriber; 020import org.reactivestreams.tck.TestEnvironment.ManualSubscriberWithSubscriptionSupport; 021import org.reactivestreams.tck.TestEnvironment.Promise; 022import org.reactivestreams.tck.flow.support.Function; 023import org.reactivestreams.tck.flow.support.SubscriberWhiteboxVerificationRules; 024import org.reactivestreams.tck.flow.support.PublisherVerificationRules; 025import org.testng.annotations.BeforeMethod; 026import org.testng.annotations.Test; 027 028import java.util.HashSet; 029import java.util.Set; 030 031public abstract class IdentityProcessorVerification<T> extends WithHelperPublisher<T> 032 implements SubscriberWhiteboxVerificationRules, PublisherVerificationRules { 033 034 private final TestEnvironment env; 035 036 ////////////////////// DELEGATED TO SPECS ////////////////////// 037 038 // for delegating tests 039 private final SubscriberWhiteboxVerification<T> subscriberVerification; 040 041 // for delegating tests 042 private final PublisherVerification<T> publisherVerification; 043 044 ////////////////// END OF DELEGATED TO SPECS ////////////////// 045 046 // number of elements the processor under test must be able ot buffer, 047 // without dropping elements. Defaults to `TestEnvironment.TEST_BUFFER_SIZE`. 048 private final int processorBufferSize; 049 050 /** 051 * Test class must specify the expected time it takes for the publisher to 052 * shut itself down when the the last downstream {@code Subscription} is cancelled. 053 * 054 * The processor will be required to be able to buffer {@code TestEnvironment.TEST_BUFFER_SIZE} elements. 055 */ 056 @SuppressWarnings("unused") 057 public IdentityProcessorVerification(final TestEnvironment env) { 058 this(env, PublisherVerification.envPublisherReferenceGCTimeoutMillis(), TestEnvironment.TEST_BUFFER_SIZE); 059 } 060 061 /** 062 * Test class must specify the expected time it takes for the publisher to 063 * shut itself down when the the last downstream {@code Subscription} is cancelled. 064 * 065 * The processor will be required to be able to buffer {@code TestEnvironment.TEST_BUFFER_SIZE} elements. 066 * 067 * @param publisherReferenceGCTimeoutMillis used to determine after how much time a reference to a Subscriber should be already dropped by the Publisher. 068 */ 069 @SuppressWarnings("unused") 070 public IdentityProcessorVerification(final TestEnvironment env, long publisherReferenceGCTimeoutMillis) { 071 this(env, publisherReferenceGCTimeoutMillis, TestEnvironment.TEST_BUFFER_SIZE); 072 } 073 074 /** 075 * Test class must specify the expected time it takes for the publisher to 076 * shut itself down when the the last downstream {@code Subscription} is cancelled. 077 * 078 * @param publisherReferenceGCTimeoutMillis used to determine after how much time a reference to a Subscriber should be already dropped by the Publisher. 079 * @param processorBufferSize number of elements the processor is required to be able to buffer. 080 */ 081 public IdentityProcessorVerification(final TestEnvironment env, long publisherReferenceGCTimeoutMillis, int processorBufferSize) { 082 this.env = env; 083 this.processorBufferSize = processorBufferSize; 084 085 this.subscriberVerification = new SubscriberWhiteboxVerification<T>(env) { 086 @Override 087 public Subscriber<T> createSubscriber(WhiteboxSubscriberProbe<T> probe) { 088 return IdentityProcessorVerification.this.createSubscriber(probe); 089 } 090 091 @Override public T createElement(int element) { 092 return IdentityProcessorVerification.this.createElement(element); 093 } 094 095 @Override 096 public Publisher<T> createHelperPublisher(long elements) { 097 return IdentityProcessorVerification.this.createHelperPublisher(elements); 098 } 099 }; 100 101 publisherVerification = new PublisherVerification<T>(env, publisherReferenceGCTimeoutMillis) { 102 @Override 103 public Publisher<T> createPublisher(long elements) { 104 return IdentityProcessorVerification.this.createPublisher(elements); 105 } 106 107 @Override 108 public Publisher<T> createFailedPublisher() { 109 return IdentityProcessorVerification.this.createFailedPublisher(); 110 } 111 112 @Override 113 public long maxElementsFromPublisher() { 114 return IdentityProcessorVerification.this.maxElementsFromPublisher(); 115 } 116 117 @Override 118 public long boundedDepthOfOnNextAndRequestRecursion() { 119 return IdentityProcessorVerification.this.boundedDepthOfOnNextAndRequestRecursion(); 120 } 121 122 @Override 123 public boolean skipStochasticTests() { 124 return IdentityProcessorVerification.this.skipStochasticTests(); 125 } 126 }; 127 } 128 129 /** 130 * This is the main method you must implement in your test incarnation. 131 * It must create a {@link Processor}, which simply forwards all stream elements from its upstream 132 * to its downstream. It must be able to internally buffer the given number of elements. 133 * 134 * @param bufferSize number of elements the processor is required to be able to buffer. 135 */ 136 public abstract Processor<T, T> createIdentityProcessor(int bufferSize); 137 138 /** 139 * By implementing this method, additional TCK tests concerning a "failed" publishers will be run. 140 * 141 * The expected behaviour of the {@link Publisher} returned by this method is hand out a subscription, 142 * followed by signalling {@code onError} on it, as specified by Rule 1.9. 143 * 144 * If you want to ignore these additional tests, return {@code null} from this method. 145 */ 146 public abstract Publisher<T> createFailedPublisher(); 147 148 /** 149 * Override and return lower value if your Publisher is only able to produce a known number of elements. 150 * For example, if it is designed to return at-most-one element, return {@code 1} from this method. 151 * 152 * Defaults to {@code Long.MAX_VALUE - 1}, meaning that the Publisher can be produce a huge but NOT an unbounded number of elements. 153 * 154 * To mark your Publisher will *never* signal an {@code onComplete} override this method and return {@code Long.MAX_VALUE}, 155 * which will result in *skipping all tests which require an onComplete to be triggered* (!). 156 */ 157 public long maxElementsFromPublisher() { 158 return Long.MAX_VALUE - 1; 159 } 160 161 /** 162 * In order to verify rule 3.3 of the reactive streams spec, this number will be used to check if a 163 * {@code Subscription} actually solves the "unbounded recursion" problem by not allowing the number of 164 * recursive calls to exceed the number returned by this method. 165 * 166 * @see <a href="https://github.com/reactive-streams/reactive-streams-jvm#3.3">reactive streams spec, rule 3.3</a> 167 * @see PublisherVerification#required_spec303_mustNotAllowUnboundedRecursion() 168 */ 169 public long boundedDepthOfOnNextAndRequestRecursion() { 170 return 1; 171 } 172 173 /** 174 * Override and return {@code true} in order to skip executing tests marked as {@code Stochastic}. 175 * Such tests MAY sometimes fail even though the impl 176 */ 177 public boolean skipStochasticTests() { 178 return false; 179 } 180 181 /** 182 * Describes the tested implementation in terms of how many subscribers they can support. 183 * Some tests require the {@code Publisher} under test to support multiple Subscribers, 184 * yet the spec does not require all publishers to be able to do so, thus – if an implementation 185 * supports only a limited number of subscribers (e.g. only 1 subscriber, also known as "no fanout") 186 * you MUST return that number from this method by overriding it. 187 */ 188 public long maxSupportedSubscribers() { 189 return Long.MAX_VALUE; 190 } 191 192 /** 193 * Override this method and return {@code true} if the {@link Processor} returned by the 194 * {@link #createIdentityProcessor(int)} coordinates its {@link Subscriber}s 195 * request amounts and only delivers onNext signals if all Subscribers have 196 * indicated (via their Subscription#request(long)) they are ready to receive elements. 197 */ 198 public boolean doesCoordinatedEmission() { 199 return false; 200 } 201 202 ////////////////////// TEST ENV CLEANUP ///////////////////////////////////// 203 204 @BeforeMethod 205 public void setUp() throws Exception { 206 publisherVerification.setUp(); 207 subscriberVerification.setUp(); 208 } 209 210 ////////////////////// PUBLISHER RULES VERIFICATION /////////////////////////// 211 212 // A Processor 213 // must obey all Publisher rules on its publishing side 214 public Publisher<T> createPublisher(long elements) { 215 final Processor<T, T> processor = createIdentityProcessor(processorBufferSize); 216 final Publisher<T> pub = createHelperPublisher(elements); 217 pub.subscribe(processor); 218 return processor; // we run the PublisherVerification against this 219 } 220 221 @Override @Test 222 public void required_validate_maxElementsFromPublisher() throws Exception { 223 publisherVerification.required_validate_maxElementsFromPublisher(); 224 } 225 226 @Override @Test 227 public void required_validate_boundedDepthOfOnNextAndRequestRecursion() throws Exception { 228 publisherVerification.required_validate_boundedDepthOfOnNextAndRequestRecursion(); 229 } 230 231 /////////////////////// DELEGATED TESTS, A PROCESSOR "IS A" PUBLISHER ////////////////////// 232 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#4.1 233 234 @Test 235 public void required_createPublisher1MustProduceAStreamOfExactly1Element() throws Throwable { 236 publisherVerification.required_createPublisher1MustProduceAStreamOfExactly1Element(); 237 } 238 239 @Test 240 public void required_createPublisher3MustProduceAStreamOfExactly3Elements() throws Throwable { 241 publisherVerification.required_createPublisher3MustProduceAStreamOfExactly3Elements(); 242 } 243 244 @Override @Test 245 public void required_spec101_subscriptionRequestMustResultInTheCorrectNumberOfProducedElements() throws Throwable { 246 publisherVerification.required_spec101_subscriptionRequestMustResultInTheCorrectNumberOfProducedElements(); 247 } 248 249 @Override @Test 250 public void required_spec102_maySignalLessThanRequestedAndTerminateSubscription() throws Throwable { 251 publisherVerification.required_spec102_maySignalLessThanRequestedAndTerminateSubscription(); 252 } 253 254 @Override @Test 255 public void stochastic_spec103_mustSignalOnMethodsSequentially() throws Throwable { 256 publisherVerification.stochastic_spec103_mustSignalOnMethodsSequentially(); 257 } 258 259 @Override @Test 260 public void optional_spec104_mustSignalOnErrorWhenFails() throws Throwable { 261 publisherVerification.optional_spec104_mustSignalOnErrorWhenFails(); 262 } 263 264 @Override @Test 265 public void required_spec105_mustSignalOnCompleteWhenFiniteStreamTerminates() throws Throwable { 266 publisherVerification.required_spec105_mustSignalOnCompleteWhenFiniteStreamTerminates(); 267 } 268 269 @Override @Test 270 public void optional_spec105_emptyStreamMustTerminateBySignallingOnComplete() throws Throwable { 271 publisherVerification.optional_spec105_emptyStreamMustTerminateBySignallingOnComplete(); 272 } 273 274 @Override @Test 275 public void untested_spec106_mustConsiderSubscriptionCancelledAfterOnErrorOrOnCompleteHasBeenCalled() throws Throwable { 276 publisherVerification.untested_spec106_mustConsiderSubscriptionCancelledAfterOnErrorOrOnCompleteHasBeenCalled(); 277 } 278 279 @Override @Test 280 public void required_spec107_mustNotEmitFurtherSignalsOnceOnCompleteHasBeenSignalled() throws Throwable { 281 publisherVerification.required_spec107_mustNotEmitFurtherSignalsOnceOnCompleteHasBeenSignalled(); 282 } 283 284 @Override @Test 285 public void untested_spec107_mustNotEmitFurtherSignalsOnceOnErrorHasBeenSignalled() throws Throwable { 286 publisherVerification.untested_spec107_mustNotEmitFurtherSignalsOnceOnErrorHasBeenSignalled(); 287 } 288 289 @Override @Test 290 public void untested_spec108_possiblyCanceledSubscriptionShouldNotReceiveOnErrorOrOnCompleteSignals() throws Throwable { 291 publisherVerification.untested_spec108_possiblyCanceledSubscriptionShouldNotReceiveOnErrorOrOnCompleteSignals(); 292 } 293 294 @Override @Test 295 public void untested_spec109_subscribeShouldNotThrowNonFatalThrowable() throws Throwable { 296 publisherVerification.untested_spec109_subscribeShouldNotThrowNonFatalThrowable(); 297 } 298 299 @Override @Test 300 public void required_spec109_subscribeThrowNPEOnNullSubscriber() throws Throwable { 301 publisherVerification.required_spec109_subscribeThrowNPEOnNullSubscriber(); 302 } 303 304 @Override @Test 305 public void required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorAfterOnSubscribe() throws Throwable { 306 publisherVerification.required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorAfterOnSubscribe(); 307 } 308 309 @Override @Test 310 public void required_spec109_mustIssueOnSubscribeForNonNullSubscriber() throws Throwable { 311 publisherVerification.required_spec109_mustIssueOnSubscribeForNonNullSubscriber(); 312 } 313 314 @Override @Test 315 public void untested_spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice() throws Throwable { 316 publisherVerification.untested_spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice(); 317 } 318 319 @Override @Test 320 public void optional_spec111_maySupportMultiSubscribe() throws Throwable { 321 publisherVerification.optional_spec111_maySupportMultiSubscribe(); 322 } 323 324 @Override @Test 325 public void optional_spec111_registeredSubscribersMustReceiveOnNextOrOnCompleteSignals() throws Throwable { 326 publisherVerification.optional_spec111_registeredSubscribersMustReceiveOnNextOrOnCompleteSignals(); 327 } 328 329 @Override @Test 330 public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne() throws Throwable { 331 publisherVerification.optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne(); 332 } 333 334 @Override @Test 335 public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront() throws Throwable { 336 publisherVerification.optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront(); 337 } 338 339 @Override @Test 340 public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected() throws Throwable { 341 publisherVerification.optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected(); 342 } 343 344 @Override @Test 345 public void required_spec302_mustAllowSynchronousRequestCallsFromOnNextAndOnSubscribe() throws Throwable { 346 publisherVerification.required_spec302_mustAllowSynchronousRequestCallsFromOnNextAndOnSubscribe(); 347 } 348 349 @Override @Test 350 public void required_spec303_mustNotAllowUnboundedRecursion() throws Throwable { 351 publisherVerification.required_spec303_mustNotAllowUnboundedRecursion(); 352 } 353 354 @Override @Test 355 public void untested_spec304_requestShouldNotPerformHeavyComputations() throws Exception { 356 publisherVerification.untested_spec304_requestShouldNotPerformHeavyComputations(); 357 } 358 359 @Override @Test 360 public void untested_spec305_cancelMustNotSynchronouslyPerformHeavyComputation() throws Exception { 361 publisherVerification.untested_spec305_cancelMustNotSynchronouslyPerformHeavyComputation(); 362 } 363 364 @Override @Test 365 public void required_spec306_afterSubscriptionIsCancelledRequestMustBeNops() throws Throwable { 366 publisherVerification.required_spec306_afterSubscriptionIsCancelledRequestMustBeNops(); 367 } 368 369 @Override @Test 370 public void required_spec307_afterSubscriptionIsCancelledAdditionalCancelationsMustBeNops() throws Throwable { 371 publisherVerification.required_spec307_afterSubscriptionIsCancelledAdditionalCancelationsMustBeNops(); 372 } 373 374 @Override @Test 375 public void required_spec309_requestZeroMustSignalIllegalArgumentException() throws Throwable { 376 publisherVerification.required_spec309_requestZeroMustSignalIllegalArgumentException(); 377 } 378 379 @Override @Test 380 public void required_spec309_requestNegativeNumberMustSignalIllegalArgumentException() throws Throwable { 381 publisherVerification.required_spec309_requestNegativeNumberMustSignalIllegalArgumentException(); 382 } 383 384 @Override @Test 385 public void optional_spec309_requestNegativeNumberMaySignalIllegalArgumentExceptionWithSpecificMessage() throws Throwable { 386 publisherVerification.optional_spec309_requestNegativeNumberMaySignalIllegalArgumentExceptionWithSpecificMessage(); 387 } 388 389 @Override @Test 390 public void required_spec312_cancelMustMakeThePublisherToEventuallyStopSignaling() throws Throwable { 391 publisherVerification.required_spec312_cancelMustMakeThePublisherToEventuallyStopSignaling(); 392 } 393 394 @Override @Test 395 public void required_spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber() throws Throwable { 396 publisherVerification.required_spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber(); 397 } 398 399 @Override @Test 400 public void required_spec317_mustSupportAPendingElementCountUpToLongMaxValue() throws Throwable { 401 publisherVerification.required_spec317_mustSupportAPendingElementCountUpToLongMaxValue(); 402 } 403 404 @Override @Test 405 public void required_spec317_mustSupportACumulativePendingElementCountUpToLongMaxValue() throws Throwable { 406 publisherVerification.required_spec317_mustSupportACumulativePendingElementCountUpToLongMaxValue(); 407 } 408 409 @Override @Test 410 public void required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue() throws Throwable { 411 publisherVerification.required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue(); 412 } 413 414 415 /** 416 * Asks for a {@code Processor} that supports at least 2 {@code Subscriber}s at once and checks if two {@code Subscriber}s 417 * receive the same items and a terminal {@code Exception}. 418 * <p> 419 * If the {@code Processor} requests and/or emits items only when all of its {@code Subscriber}s have requested, 420 * override {@link #doesCoordinatedEmission()} and return {@code true} to indicate this property. 421 * <p> 422 * <b>Verifies rule:</b> <a href='https://github.com/reactive-streams/reactive-streams-jvm#1.4'>1.4</a> with multiple 423 * {@code Subscriber}s. 424 * <p> 425 * The test is not executed if {@link IdentityProcessorVerification#maxSupportedSubscribers()} is less than 2. 426 * <p> 427 * If this test fails, the following could be checked within the {@code Processor} implementation: 428 * <ul> 429 * <li>The {@code TestEnvironment} has large enough timeout specified in case the {@code Processor} has some time-delay behavior.</li> 430 * <li>The {@code Processor} is able to fulfill requests of its {@code Subscriber}s independently of each other's requests or 431 * else override {@link #doesCoordinatedEmission()} and return {@code true} to indicate the test {@code Subscriber}s 432 * both have to request first.</li> 433 * </ul> 434 */ 435 @Test 436 public void required_spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError() throws Throwable { 437 optionalMultipleSubscribersTest(2, new Function<Long,TestSetup>() { 438 @Override 439 public TestSetup apply(Long aLong) throws Throwable { 440 return new TestSetup(env, processorBufferSize) {{ 441 final ManualSubscriberWithErrorCollection<T> sub1 = new ManualSubscriberWithErrorCollection<T>(env); 442 env.subscribe(processor, sub1); 443 444 final ManualSubscriberWithErrorCollection<T> sub2 = new ManualSubscriberWithErrorCollection<T>(env); 445 env.subscribe(processor, sub2); 446 447 final Exception ex = new RuntimeException("Test exception"); 448 449 if (doesCoordinatedEmission()) { 450 sub1.request(1); 451 sub2.request(1); 452 453 expectRequest(); 454 455 final T x = sendNextTFromUpstream(); 456 457 expectNextElement(sub1, x); 458 expectNextElement(sub2, x); 459 460 sub1.request(1); 461 sub2.request(1); 462 } else { 463 sub1.request(1); 464 465 expectRequest(env.defaultTimeoutMillis(), 466 "If the Processor coordinates requests/emissions when having multiple Subscribers" 467 + " at once, please override doesCoordinatedEmission() to return true in this " 468 + "IdentityProcessorVerification to allow this test to pass."); 469 470 final T x = sendNextTFromUpstream(); 471 expectNextElement(sub1, x, 472 "If the Processor coordinates requests/emissions when having multiple Subscribers" 473 + " at once, please override doesCoordinatedEmission() to return true in this " 474 + "IdentityProcessorVerification to allow this test to pass."); 475 476 sub1.request(1); 477 478 // sub1 has received one element, and has one demand pending 479 // sub2 has not yet requested anything 480 } 481 sendError(ex); 482 483 sub1.expectError(ex); 484 sub2.expectError(ex); 485 486 env.verifyNoAsyncErrorsNoDelay(); 487 }}; 488 } 489 }); 490 } 491 492 ////////////////////// SUBSCRIBER RULES VERIFICATION /////////////////////////// 493 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#4.1 494 495 // A Processor 496 // must obey all Subscriber rules on its consuming side 497 public Subscriber<T> createSubscriber(final SubscriberWhiteboxVerification.WhiteboxSubscriberProbe<T> probe) { 498 final Processor<T, T> processor = createIdentityProcessor(processorBufferSize); 499 processor.subscribe( 500 new Subscriber<T>() { 501 private final Promise<Subscription> subs = new Promise<Subscription>(env); 502 503 @Override 504 public void onSubscribe(final Subscription subscription) { 505 env.debug(String.format("whiteboxSubscriber::onSubscribe(%s)", subscription)); 506 if (subs.isCompleted()) subscription.cancel(); // the Probe must also pass subscriber verification 507 508 probe.registerOnSubscribe(new SubscriberWhiteboxVerification.SubscriberPuppet() { 509 510 @Override 511 public void triggerRequest(long elements) { 512 subscription.request(elements); 513 } 514 515 @Override 516 public void signalCancel() { 517 subscription.cancel(); 518 } 519 }); 520 } 521 522 @Override 523 public void onNext(T element) { 524 env.debug(String.format("whiteboxSubscriber::onNext(%s)", element)); 525 probe.registerOnNext(element); 526 } 527 528 @Override 529 public void onComplete() { 530 env.debug("whiteboxSubscriber::onComplete()"); 531 probe.registerOnComplete(); 532 } 533 534 @Override 535 public void onError(Throwable cause) { 536 env.debug(String.format("whiteboxSubscriber::onError(%s)", cause)); 537 probe.registerOnError(cause); 538 } 539 }); 540 541 return processor; // we run the SubscriberVerification against this 542 } 543 544 ////////////////////// OTHER RULE VERIFICATION /////////////////////////// 545 546 // A Processor 547 // must immediately pass on `onError` events received from its upstream to its downstream 548 @Test 549 public void mustImmediatelyPassOnOnErrorEventsReceivedFromItsUpstreamToItsDownstream() throws Exception { 550 new TestSetup(env, processorBufferSize) {{ 551 final ManualSubscriberWithErrorCollection<T> sub = new ManualSubscriberWithErrorCollection<T>(env); 552 env.subscribe(processor, sub); 553 554 final Exception ex = new RuntimeException("Test exception"); 555 sendError(ex); 556 sub.expectError(ex); // "immediately", i.e. without a preceding request 557 558 env.verifyNoAsyncErrorsNoDelay(); 559 }}; 560 } 561 562 /////////////////////// DELEGATED TESTS, A PROCESSOR "IS A" SUBSCRIBER ////////////////////// 563 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#4.1 564 565 @Test 566 public void required_exerciseWhiteboxHappyPath() throws Throwable { 567 subscriberVerification.required_exerciseWhiteboxHappyPath(); 568 } 569 570 @Override @Test 571 public void required_spec201_mustSignalDemandViaSubscriptionRequest() throws Throwable { 572 subscriberVerification.required_spec201_mustSignalDemandViaSubscriptionRequest(); 573 } 574 575 @Override @Test 576 public void untested_spec202_shouldAsynchronouslyDispatch() throws Exception { 577 subscriberVerification.untested_spec202_shouldAsynchronouslyDispatch(); 578 } 579 580 @Override @Test 581 public void required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnComplete() throws Throwable { 582 subscriberVerification.required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnComplete(); 583 } 584 585 @Override @Test 586 public void required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnError() throws Throwable { 587 subscriberVerification.required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnError(); 588 } 589 590 @Override @Test 591 public void untested_spec204_mustConsiderTheSubscriptionAsCancelledInAfterRecievingOnCompleteOrOnError() throws Exception { 592 subscriberVerification.untested_spec204_mustConsiderTheSubscriptionAsCancelledInAfterRecievingOnCompleteOrOnError(); 593 } 594 595 @Override @Test 596 public void required_spec205_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal() throws Throwable { 597 subscriberVerification.required_spec205_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal(); 598 } 599 600 @Override @Test 601 public void untested_spec206_mustCallSubscriptionCancelIfItIsNoLongerValid() throws Exception { 602 subscriberVerification.untested_spec206_mustCallSubscriptionCancelIfItIsNoLongerValid(); 603 } 604 605 @Override @Test 606 public void untested_spec207_mustEnsureAllCallsOnItsSubscriptionTakePlaceFromTheSameThreadOrTakeCareOfSynchronization() throws Exception { 607 subscriberVerification.untested_spec207_mustEnsureAllCallsOnItsSubscriptionTakePlaceFromTheSameThreadOrTakeCareOfSynchronization(); 608 } 609 610 @Override @Test 611 public void required_spec208_mustBePreparedToReceiveOnNextSignalsAfterHavingCalledSubscriptionCancel() throws Throwable { 612 subscriberVerification.required_spec208_mustBePreparedToReceiveOnNextSignalsAfterHavingCalledSubscriptionCancel(); 613 } 614 615 @Override @Test 616 public void required_spec209_mustBePreparedToReceiveAnOnCompleteSignalWithPrecedingRequestCall() throws Throwable { 617 subscriberVerification.required_spec209_mustBePreparedToReceiveAnOnCompleteSignalWithPrecedingRequestCall(); 618 } 619 620 @Override @Test 621 public void required_spec209_mustBePreparedToReceiveAnOnCompleteSignalWithoutPrecedingRequestCall() throws Throwable { 622 subscriberVerification.required_spec209_mustBePreparedToReceiveAnOnCompleteSignalWithoutPrecedingRequestCall(); 623 } 624 625 @Override @Test 626 public void required_spec210_mustBePreparedToReceiveAnOnErrorSignalWithPrecedingRequestCall() throws Throwable { 627 subscriberVerification.required_spec210_mustBePreparedToReceiveAnOnErrorSignalWithPrecedingRequestCall(); 628 } 629 630 @Override @Test 631 public void required_spec210_mustBePreparedToReceiveAnOnErrorSignalWithoutPrecedingRequestCall() throws Throwable { 632 subscriberVerification.required_spec210_mustBePreparedToReceiveAnOnErrorSignalWithoutPrecedingRequestCall(); 633 } 634 635 @Override @Test 636 public void untested_spec211_mustMakeSureThatAllCallsOnItsMethodsHappenBeforeTheProcessingOfTheRespectiveEvents() throws Exception { 637 subscriberVerification.untested_spec211_mustMakeSureThatAllCallsOnItsMethodsHappenBeforeTheProcessingOfTheRespectiveEvents(); 638 } 639 640 @Override @Test 641 public void untested_spec212_mustNotCallOnSubscribeMoreThanOnceBasedOnObjectEquality_specViolation() throws Throwable { 642 subscriberVerification.untested_spec212_mustNotCallOnSubscribeMoreThanOnceBasedOnObjectEquality_specViolation(); 643 } 644 645 @Override @Test 646 public void untested_spec213_failingOnSignalInvocation() throws Exception { 647 subscriberVerification.untested_spec213_failingOnSignalInvocation(); 648 } 649 650 @Override @Test 651 public void required_spec213_onSubscribe_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable { 652 subscriberVerification.required_spec213_onSubscribe_mustThrowNullPointerExceptionWhenParametersAreNull(); 653 } 654 @Override @Test 655 public void required_spec213_onNext_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable { 656 subscriberVerification.required_spec213_onNext_mustThrowNullPointerExceptionWhenParametersAreNull(); 657 } 658 @Override @Test 659 public void required_spec213_onError_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable { 660 subscriberVerification.required_spec213_onError_mustThrowNullPointerExceptionWhenParametersAreNull(); 661 } 662 663 @Override @Test 664 public void untested_spec301_mustNotBeCalledOutsideSubscriberContext() throws Exception { 665 subscriberVerification.untested_spec301_mustNotBeCalledOutsideSubscriberContext(); 666 } 667 668 @Override @Test 669 public void required_spec308_requestMustRegisterGivenNumberElementsToBeProduced() throws Throwable { 670 subscriberVerification.required_spec308_requestMustRegisterGivenNumberElementsToBeProduced(); 671 } 672 673 @Override @Test 674 public void untested_spec310_requestMaySynchronouslyCallOnNextOnSubscriber() throws Exception { 675 subscriberVerification.untested_spec310_requestMaySynchronouslyCallOnNextOnSubscriber(); 676 } 677 678 @Override @Test 679 public void untested_spec311_requestMaySynchronouslyCallOnCompleteOrOnError() throws Exception { 680 subscriberVerification.untested_spec311_requestMaySynchronouslyCallOnCompleteOrOnError(); 681 } 682 683 @Override @Test 684 public void untested_spec314_cancelMayCauseThePublisherToShutdownIfNoOtherSubscriptionExists() throws Exception { 685 subscriberVerification.untested_spec314_cancelMayCauseThePublisherToShutdownIfNoOtherSubscriptionExists(); 686 } 687 688 @Override @Test 689 public void untested_spec315_cancelMustNotThrowExceptionAndMustSignalOnError() throws Exception { 690 subscriberVerification.untested_spec315_cancelMustNotThrowExceptionAndMustSignalOnError(); 691 } 692 693 @Override @Test 694 public void untested_spec316_requestMustNotThrowExceptionAndMustOnErrorTheSubscriber() throws Exception { 695 subscriberVerification.untested_spec316_requestMustNotThrowExceptionAndMustOnErrorTheSubscriber(); 696 } 697 698 /////////////////////// ADDITIONAL "COROLLARY" TESTS ////////////////////// 699 700 /** 701 * Asks for a {@code Processor} that supports at least 2 {@code Subscriber}s at once and checks requests 702 * from {@code Subscriber}s will eventually lead to requests towards the upstream of the {@code Processor}. 703 * <p> 704 * If the {@code Processor} requests and/or emits items only when all of its {@code Subscriber}s have requested, 705 * override {@link #doesCoordinatedEmission()} and return {@code true} to indicate this property. 706 * <p> 707 * <b>Verifies rule:</b> <a href='https://github.com/reactive-streams/reactive-streams-jvm#1.4'>2.1</a> with multiple 708 * {@code Subscriber}s. 709 * <p> 710 * The test is not executed if {@link IdentityProcessorVerification#maxSupportedSubscribers()} is less than 2. 711 * <p> 712 * If this test fails, the following could be checked within the {@code Processor} implementation: 713 * <ul> 714 * <li>The {@code TestEnvironment} has large enough timeout specified in case the {@code Processor} has some time-delay behavior.</li> 715 * <li>The {@code Processor} is able to fulfill requests of its {@code Subscriber}s independently of each other's requests or 716 * else override {@link #doesCoordinatedEmission()} and return {@code true} to indicate the test {@code Subscriber}s 717 * both have to request first.</li> 718 * </ul> 719 */ 720 @Test 721 public void required_mustRequestFromUpstreamForElementsThatHaveBeenRequestedLongAgo() throws Throwable { 722 optionalMultipleSubscribersTest(2, new Function<Long,TestSetup>() { 723 @Override 724 public TestSetup apply(Long subscribers) throws Throwable { 725 return new TestSetup(env, processorBufferSize) {{ 726 ManualSubscriber<T> sub1 = newSubscriber(); 727 sub1.request(20); 728 729 long totalRequests = expectRequest(); 730 final T x = sendNextTFromUpstream(); 731 expectNextElement(sub1, x); 732 733 if (totalRequests == 1) { 734 totalRequests += expectRequest(); 735 } 736 final T y = sendNextTFromUpstream(); 737 expectNextElement(sub1, y); 738 739 if (totalRequests == 2) { 740 totalRequests += expectRequest(); 741 } 742 743 final ManualSubscriber<T> sub2 = newSubscriber(); 744 745 // sub1 now has 18 pending 746 // sub2 has 0 pending 747 748 if (doesCoordinatedEmission()) { 749 sub2.expectNone(); // since sub2 hasn't requested anything yet 750 751 sub2.request(1); 752 753 final T z = sendNextTFromUpstream(); 754 expectNextElement(sub1, z); 755 expectNextElement(sub2, z); 756 } else { 757 final T z = sendNextTFromUpstream(); 758 expectNextElement(sub1, z, 759 "If the Processor coordinates requests/emissions when having multiple Subscribers" 760 + " at once, please override doesCoordinatedEmission() to return true in this " 761 + "IdentityProcessorVerification to allow this test to pass."); 762 sub2.expectNone(); // since sub2 hasn't requested anything yet 763 764 sub2.request(1); 765 expectNextElement(sub2, z); 766 } 767 if (totalRequests == 3) { 768 expectRequest(); 769 } 770 771 // to avoid error messages during test harness shutdown 772 sendCompletion(); 773 sub1.expectCompletion(env.defaultTimeoutMillis()); 774 sub2.expectCompletion(env.defaultTimeoutMillis()); 775 776 env.verifyNoAsyncErrorsNoDelay(); 777 }}; 778 } 779 }); 780 } 781 782 /////////////////////// TEST INFRASTRUCTURE ////////////////////// 783 784 public void notVerified() { 785 publisherVerification.notVerified(); 786 } 787 788 public void notVerified(String message) { 789 publisherVerification.notVerified(message); 790 } 791 792 /** 793 * Test for feature that REQUIRES multiple subscribers to be supported by Publisher. 794 */ 795 public void optionalMultipleSubscribersTest(long requiredSubscribersSupport, Function<Long, TestSetup> body) throws Throwable { 796 if (requiredSubscribersSupport > maxSupportedSubscribers()) 797 notVerified(String.format("The Publisher under test only supports %d subscribers, while this test requires at least %d to run.", 798 maxSupportedSubscribers(), requiredSubscribersSupport)); 799 else body.apply(requiredSubscribersSupport); 800 } 801 802 public abstract class TestSetup extends ManualPublisher<T> { 803 final private ManualSubscriber<T> tees; // gives us access to an infinite stream of T values 804 private Set<T> seenTees = new HashSet<T>(); 805 806 final Processor<T, T> processor; 807 808 public TestSetup(TestEnvironment env, int testBufferSize) throws InterruptedException { 809 super(env); 810 tees = env.newManualSubscriber(createHelperPublisher(Long.MAX_VALUE)); 811 processor = createIdentityProcessor(testBufferSize); 812 subscribe(processor); 813 } 814 815 public ManualSubscriber<T> newSubscriber() throws InterruptedException { 816 return env.newManualSubscriber(processor); 817 } 818 819 public T nextT() throws InterruptedException { 820 final T t = tees.requestNextElement(); 821 if (seenTees.contains(t)) { 822 env.flop(String.format("Helper publisher illegally produced the same element %s twice", t)); 823 } 824 seenTees.add(t); 825 return t; 826 } 827 828 public void expectNextElement(ManualSubscriber<T> sub, T expected) throws InterruptedException { 829 final T elem = sub.nextElement(String.format("timeout while awaiting %s", expected)); 830 if (!elem.equals(expected)) { 831 env.flop(String.format("Received `onNext(%s)` on downstream but expected `onNext(%s)`", elem, expected)); 832 } 833 } 834 835 public void expectNextElement(ManualSubscriber<T> sub, T expected, String errorMessageAddendum) throws InterruptedException { 836 final T elem = sub.nextElement(String.format("timeout while awaiting %s. %s", expected, errorMessageAddendum)); 837 if (!elem.equals(expected)) { 838 env.flop(String.format("Received `onNext(%s)` on downstream but expected `onNext(%s)`", elem, expected)); 839 } 840 } 841 842 public T sendNextTFromUpstream() throws InterruptedException { 843 final T x = nextT(); 844 sendNext(x); 845 return x; 846 } 847 } 848 849 public class ManualSubscriberWithErrorCollection<A> extends ManualSubscriberWithSubscriptionSupport<A> { 850 Promise<Throwable> error; 851 852 public ManualSubscriberWithErrorCollection(TestEnvironment env) { 853 super(env); 854 error = new Promise<Throwable>(env); 855 } 856 857 @Override 858 public void onError(Throwable cause) { 859 error.complete(cause); 860 } 861 862 public void expectError(Throwable cause) throws InterruptedException { 863 expectError(cause, env.defaultTimeoutMillis()); 864 } 865 866 @SuppressWarnings("ThrowableResultOfMethodCallIgnored") 867 public void expectError(Throwable cause, long timeoutMillis) throws InterruptedException { 868 error.expectCompletion(timeoutMillis, "Did not receive expected error on downstream"); 869 if (!cause.equals(error.value())) { 870 env.flop(String.format("Expected error %s but got %s", cause, error.value())); 871 } 872 } 873 } 874}